package org.syndicate_lang.actors; import java.util.HashSet; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; /** * I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat. */ public class Actor implements Executor { private final static AtomicLong _count = new AtomicLong(0); private final static AtomicLong _actorId = new AtomicLong(0); protected final static ExecutorService _executor = Executors.newWorkStealingPool(); protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); private final String _name; private final Logger _logger; private boolean _alive = true; private Throwable _exitReason = null; private boolean _isCounted = true; private Set _links = null; private Set>> _monitors = null; private Consumer _exitTrap = null; private final static class WorkItem extends AtomicReference { Runnable work; Runnable ifNotAlive; public WorkItem(Runnable work, Runnable ifNotAlive) { this.work = work; this.ifNotAlive = ifNotAlive; } public final void clear() { this.work = null; this.ifNotAlive = null; } } private WorkItem head = new WorkItem(null, null); private final AtomicReference tail = new AtomicReference<>(head); private final AtomicLong workItemCount = new AtomicLong(0); public Actor() { this("" + _actorId.incrementAndGet()); } public Actor(String debugName) { this._name = debugName; this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")"); _count.incrementAndGet(); } public static Remote forObject(T o) { return new Actor().ref(o); } public static Promise> boot(ThrowingSupplier f) { final Promise> p = new Promise<>(); final Actor a = new Actor(); a.execute( () -> p.resolveCalling(() -> a.ref(f.get())), () -> p.rejectWith(new ActorTerminated(a))); return p; } public String getName() { return _name; } public Throwable getExitReason() { return _exitReason; } public Logger log() { return _logger; } public synchronized boolean isDaemon() { return _alive && !_isCounted; } public synchronized Actor daemonize() { this._releaseCount(); return this; } private void _releaseCount() { if (_isCounted) { _isCounted = false; synchronized (_count) { if (_count.decrementAndGet() == 0) { _count.notifyAll(); } } } } public String toString() { return super.toString() + "(" + this._name + ")"; } public Remote ref(T o) { return new Remote<>(this, o); } private void _performSync(Runnable work, Runnable ifNotAlive) { synchronized (this) { _perform(work, ifNotAlive); } } private void _perform(Runnable work, Runnable ifNotAlive) { if (!_alive) { if (ifNotAlive != null) ifNotAlive.run(); } else { try { work.run(); } catch (Throwable exn) { this._stop(false, exn); } } } public Promise stop() { return stop(null); } public Promise stop(Throwable reason) { Promise p = new Promise<>(); this.execute(() -> { this._stop(true, reason); p.resolve(); }, p::resolve); return p; } private synchronized void _stop(boolean normally, Throwable reason) { if (_alive) { _alive = false; _exitReason = reason; if (normally) { log().log(Level.FINE, "Actor stopped", reason); } else { log().log(Level.SEVERE, "Actor terminated with error", reason); } Set linkedPeers = _links; if (linkedPeers != null) { _links = null; for (var peer : linkedPeers) { peer.notifyExit(this); } } Set>> monitoringPeers = _monitors; if (monitoringPeers != null) { _monitors = null; for (var handler : monitoringPeers) { handler.async(Consumer::accept); } } _releaseCount(); } } @Override public void execute(Runnable work) { this.execute(work, null); } public void execute(Runnable work, Runnable ifNotAlive) { { WorkItem i = new WorkItem(work, ifNotAlive); tail.getAndSet(i).set(i); } if (workItemCount.getAndIncrement() == 0) { _executor.execute(() -> { synchronized (this) { long batch = workItemCount.get(); while (batch > 0) { for (int count = 0; count < batch; count++) { WorkItem i = null; while (i == null) i = head.get(); head = i; this._perform(i.work, i.ifNotAlive); i.clear(); } batch = workItemCount.addAndGet(-batch); } } }); } } public void later(long delayMilliseconds, Runnable work) { this.later(delayMilliseconds, work, null); } public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) { if (delayMilliseconds == 0) { this.execute(work, ifNotAlive); } else { _scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); } } public PeriodicTimer every(long periodMilliseconds, Runnable f) { return every(0, periodMilliseconds, f); } public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { return new PeriodicTimer( _scheduledExecutor.scheduleAtFixedRate( () -> this._performSync(f, null), initialDelayMilliseconds, periodMilliseconds, TimeUnit.MILLISECONDS)); } public static void awaitAll() throws InterruptedException { while (_count.get() > 0) { synchronized (_count) { _count.wait(1000); } } _executor.shutdown(); _scheduledExecutor.shutdown(); //noinspection ResultOfMethodCallIgnored _executor.awaitTermination(5, TimeUnit.MINUTES); //noinspection ResultOfMethodCallIgnored _scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES); } public static void convenientLogging() { System.setProperty("java.util.logging.SimpleFormatter.format", "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n"); } public void link(Actor peer) { this.linkPeer(peer); peer.linkPeer(this); } public void unlink(Actor peer) { this.unlinkPeer(peer); peer.unlinkPeer(this); } private synchronized void linkPeer(Actor peer) { if (this._alive) { if (_links == null) _links = new HashSet<>(); _links.add(peer); } else { peer.notifyExit(this); } } private void notifyExit(final Actor exitingPeer) { this.execute(() -> { this.unlinkPeer(exitingPeer); if (this._exitTrap != null) { this._exitTrap.accept(exitingPeer); } else { this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer)); } }); } private synchronized void unlinkPeer(Actor peer) { if (_links != null) { _links.remove(peer); if (_links.isEmpty()) { _links = null; } } } public void trapExits(Consumer handler) { this._exitTrap = handler; } public void monitor(Actor peer, Consumer handler) { peer.installMonitor(this.ref(handler)); } private synchronized void installMonitor(Remote> handler) { if (this._alive) { if (_monitors == null) _monitors = new HashSet<>(); _monitors.add(handler); } else { handler.async(Consumer::accept); } } }