package org.syndicate_lang.actors; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; /** * I represent the shared execution context for a collection of objects; I am roughly analogous to the E concept of a Vat. */ public class Actor extends ForkJoinTask { private final static AtomicLong _count = new AtomicLong(0); private final static AtomicLong _actorId = new AtomicLong(0); protected final static ForkJoinPool _executor = new ForkJoinPool( Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true ); protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); private final String _name; private final Logger _logger; final Turn _turn = new Turn(this); private boolean _alive = true; private Throwable _exitReason = null; private boolean _isCounted = true; private Map _outbound; public final static class WorkItem extends AtomicReference { Consumer work; Runnable ifNotAlive; public WorkItem(Consumer work, Runnable ifNotAlive) { this.work = work; this.ifNotAlive = ifNotAlive; } public final void clear() { this.work = null; this.ifNotAlive = null; } } private WorkItem head = new WorkItem(null, null); private final AtomicReference tail = new AtomicReference<>(head); private final AtomicLong workItemCount = new AtomicLong(0); public Actor() { this(null, null); } public Actor(String debugName) { this(debugName, null); } public Actor(String debugName, Map outbound) { this._name = debugName == null ? "" + _actorId.incrementAndGet() : debugName; this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")"); this._outbound = outbound == null ? new HashMap<>() : outbound; _count.incrementAndGet(); } public static Ref forEntity(IEntity o) { return new Actor().ref(o); } public static Promise boot(Function f) { final Promise p = new Promise<>(); final Actor a = new Actor(); a.execute(new WorkItem( t -> p.resolveCalling(() -> a.ref(f.apply(t))), () -> p.rejectWith(new ActorTerminated(a)))); return p; } public String getName() { return _name; } public Throwable getExitReason() { return _exitReason; } public Logger log() { return _logger; } public synchronized boolean isDaemon() { return _alive && !_isCounted; } public void daemonize() { this.scheduleTurn(_t -> this._releaseCount()); } private void _releaseCount() { if (_isCounted) { _isCounted = false; synchronized (_count) { if (_count.decrementAndGet() == 0) { _count.notifyAll(); } } } } void _injectOutbound(Long handle, Ref peer) { synchronized (_outbound) { _outbound.put(handle, peer); } } Ref _lookupOutbound(Long handle) { synchronized (_outbound) { return _outbound.get(handle); } } Ref _extractOutbound(Long handle) { synchronized (_outbound) { return _outbound.remove(handle); } } public String toString() { return super.toString() + "(" + this._name + ")"; } public Ref ref(IEntity o) { return new Ref(this, o); } public Actor scheduleTurn(Consumer f) { this.execute(new WorkItem(f, null)); return this; } void _performTurn(Consumer f) { try { f.accept(_turn); _turn.commit(); } catch (Exception e) { this.stop(false, e); } } private void _perform(WorkItem item) { if (!_alive) { if (item.ifNotAlive != null) item.ifNotAlive.run(); } else { this._performTurn(item.work); } } public Promise stop() { return stop(true, null); } public Promise stop(Throwable reason) { return stop(true, reason); } public Promise stop(boolean normally, Throwable reason) { Promise p = new Promise<>(); this.execute(new WorkItem( _f -> { this._stop(normally, reason); p.resolve(); }, p::resolve)); return p; } private synchronized void _stop(boolean normally, Throwable reason) { if (_alive) { _alive = false; _exitReason = reason; if (normally) { log().log(Level.FINE, "Actor stopped", reason); } else { log().log(Level.SEVERE, "Actor terminated with error", reason); } this._performTurn(t -> { synchronized(_outbound) { _outbound.forEach(t::_retract_); } }); _releaseCount(); } } synchronized void _runWorkItems() { long batch = workItemCount.get(); while (batch > 0) { for (int count = 0; count < batch; count++) { WorkItem i = null; while (i == null) i = head.get(); head = i; this._perform(i); i.clear(); } batch = workItemCount.addAndGet(-batch); } } @Override public final Void getRawResult() { return null; } @Override public final void setRawResult(Void v) { } @Override public final boolean exec() { this._runWorkItems(); return false; } public void execute(WorkItem item) { tail.getAndSet(item).set(item); if (workItemCount.getAndIncrement() == 0) { _executor.execute(this); } } public void later(long delayMilliseconds, WorkItem item) { if (delayMilliseconds == 0) { this.execute(item); } else { _scheduledExecutor.schedule( () -> this.execute(item), delayMilliseconds, TimeUnit.MILLISECONDS); } } public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer f) { final var self = this; final var callback = new Runnable() { PeriodicTimer timer; public void run() { while (this.timer == null) {} // There's a race. Spin. self.execute(new WorkItem(f, this.timer::cancel)); } }; callback.timer = new PeriodicTimer( _scheduledExecutor.scheduleAtFixedRate( callback, initialDelayMilliseconds, periodMilliseconds, TimeUnit.MILLISECONDS)); return callback.timer; } public static void awaitAll() throws InterruptedException { while (_count.get() > 0) { synchronized (_count) { _count.wait(1000); } } _executor.shutdown(); _scheduledExecutor.shutdown(); //noinspection ResultOfMethodCallIgnored _executor.awaitTermination(5, TimeUnit.MINUTES); //noinspection ResultOfMethodCallIgnored _scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES); } public static void convenientLogging() { System.setProperty("java.util.logging.SimpleFormatter.format", "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n"); } public boolean isAlive() { return _alive; } }