package org.syndicate_lang.actors; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; /** * I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat. */ public class Actor implements Executor { private final static ThreadLocal _currentActor = new ThreadLocal<>(); private final static AtomicLong _count = new AtomicLong(0); protected final static ScheduledExecutorService _executor = Executors.newScheduledThreadPool(4); private final String _name; private final Logger _logger; private boolean _alive = true; private Throwable _exitReason = null; private boolean _isCounted = true; public static Actor current() { return _currentActor.get(); } public static Logger log() { return current().getLogger(); } public static Remote ref(T o) { return current().proxyFor(o); } public Actor() { this("" + System.currentTimeMillis()); } public Actor(String debugName) { this._name = debugName; this._logger = Logger.getLogger(Actor.class.getCanonicalName() + "(" + this._name + ")"); _count.incrementAndGet(); } public static Remote forObject(T o) { return new Actor().proxyFor(o); } public String getName() { return _name; } public Throwable getExitReason() { return _exitReason; } public Logger getLogger() { return _logger; } public synchronized boolean isDaemon() { return _alive && !_isCounted; } public synchronized Actor daemonize() { this._releaseCount(); return this; } private void _releaseCount() { if (_isCounted) { _isCounted = false; synchronized (_count) { if (_count.decrementAndGet() == 0) { _count.notifyAll(); } } } } public String toString() { return super.toString() + "(" + this._name + ")"; } public Remote proxyFor(T o) { return new Remote(this, o); } private void _perform(Runnable work, Runnable ifNotAlive) { synchronized (this) { _currentActor.set(this); try { if (!_alive) { if (ifNotAlive != null) ifNotAlive.run(); } else { try { work.run(); } catch (Throwable exn) { this._stop(false, exn); } } } finally { _currentActor.set(null); } } } public Promise stop() { return stop(null); } public Promise stop(Throwable reason) { if (current() == this) { this._stop(true, reason); return Promise.resolved(); } else { Promise p = new Promise<>(); this.execute(() -> { this._stop(true, reason); p.resolve(); }, p::resolve); return p; } } private synchronized void _stop(boolean normally, Throwable reason) { if (_alive) { _alive = false; _exitReason = reason; if (normally) { getLogger().log(Level.INFO, "Actor stopped", reason); } else { getLogger().log(Level.SEVERE, "Actor terminated with error", reason); } _releaseCount(); } } @Override public void execute(Runnable work) { this.execute(work, null); } public void execute(Runnable work, Runnable ifNotAlive) { this.later(0, work, ifNotAlive); } public void later(long delayMilliseconds, Runnable work) { this.later(delayMilliseconds, work, null); } public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) { _executor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); } public PeriodicTimer every(long periodMilliseconds, Runnable f) { return every(0, periodMilliseconds, f); } public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { return new PeriodicTimer( _executor.scheduleAtFixedRate( () -> this._perform(f, null), initialDelayMilliseconds, periodMilliseconds, TimeUnit.MILLISECONDS)); } public static boolean awaitAll() throws InterruptedException { while (_count.get() > 0) { synchronized (_count) { _count.wait(1000); } } _executor.shutdown(); return _executor.awaitTermination(5, TimeUnit.MINUTES); } }