2020-12-04 18:57:47 +00:00
|
|
|
package org.syndicate_lang.actors;
|
|
|
|
|
2020-12-05 22:50:41 +00:00
|
|
|
import java.util.HashMap;
|
|
|
|
import java.util.HashSet;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Set;
|
2020-12-04 18:57:47 +00:00
|
|
|
import java.util.concurrent.*;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
2020-12-09 15:12:58 +00:00
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
2020-12-05 23:09:06 +00:00
|
|
|
import java.util.function.BiConsumer;
|
2020-12-05 22:50:41 +00:00
|
|
|
import java.util.function.Consumer;
|
2020-12-04 18:57:47 +00:00
|
|
|
import java.util.logging.Level;
|
|
|
|
import java.util.logging.Logger;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat.
|
|
|
|
*/
|
|
|
|
public class Actor implements Executor {
|
|
|
|
private final static ThreadLocal<Actor> _currentActor = new ThreadLocal<>();
|
|
|
|
private final static AtomicLong _count = new AtomicLong(0);
|
2020-12-04 22:25:40 +00:00
|
|
|
private final static AtomicLong _actorId = new AtomicLong(0);
|
2020-12-04 23:01:28 +00:00
|
|
|
protected final static ExecutorService _executor = Executors.newWorkStealingPool();
|
2020-12-04 22:25:40 +00:00
|
|
|
protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
|
2020-12-04 18:57:47 +00:00
|
|
|
|
|
|
|
private final String _name;
|
|
|
|
private final Logger _logger;
|
|
|
|
|
|
|
|
private boolean _alive = true;
|
|
|
|
private Throwable _exitReason = null;
|
|
|
|
private boolean _isCounted = true;
|
2020-12-05 22:50:41 +00:00
|
|
|
private Set<Actor> _links = null;
|
2020-12-05 23:09:06 +00:00
|
|
|
private Map<Object, Remote<BiConsumer<Actor, Object>>> _monitors = null;
|
2020-12-05 22:50:41 +00:00
|
|
|
private Consumer<Actor> _exitTrap = null;
|
2020-12-04 18:57:47 +00:00
|
|
|
|
2020-12-09 15:12:58 +00:00
|
|
|
private final static class WorkItem extends AtomicReference<WorkItem> {
|
|
|
|
Runnable work;
|
|
|
|
Runnable ifNotAlive;
|
|
|
|
|
|
|
|
public WorkItem(Runnable work, Runnable ifNotAlive) {
|
|
|
|
this.work = work;
|
|
|
|
this.ifNotAlive = ifNotAlive;
|
|
|
|
}
|
|
|
|
|
|
|
|
public final void clear() {
|
|
|
|
this.work = null;
|
|
|
|
this.ifNotAlive = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private WorkItem head = new WorkItem(null, null);
|
|
|
|
private final AtomicReference<WorkItem> tail = new AtomicReference<>(head);
|
|
|
|
private final AtomicLong workItemCount = new AtomicLong(0);
|
|
|
|
|
2020-12-04 18:57:47 +00:00
|
|
|
public static Actor current() {
|
|
|
|
return _currentActor.get();
|
|
|
|
}
|
|
|
|
|
|
|
|
public static Logger log() {
|
|
|
|
return current().getLogger();
|
|
|
|
}
|
|
|
|
|
|
|
|
public static<T> Remote<T> ref(T o) {
|
|
|
|
return current().proxyFor(o);
|
|
|
|
}
|
|
|
|
|
|
|
|
public Actor() {
|
2020-12-04 22:25:40 +00:00
|
|
|
this("" + _actorId.incrementAndGet());
|
2020-12-04 18:57:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public Actor(String debugName) {
|
|
|
|
this._name = debugName;
|
2020-12-04 22:25:40 +00:00
|
|
|
this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")");
|
2020-12-04 18:57:47 +00:00
|
|
|
_count.incrementAndGet();
|
|
|
|
}
|
|
|
|
|
|
|
|
public static<T> Remote<T> forObject(T o) {
|
|
|
|
return new Actor().proxyFor(o);
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:52:16 +00:00
|
|
|
public static<T> Promise<Remote<T>> boot(ThrowingSupplier<T> f) {
|
2020-12-05 22:50:41 +00:00
|
|
|
Promise<Remote<T>> p = new Promise<>();
|
|
|
|
Actor a = new Actor();
|
|
|
|
a.execute(
|
|
|
|
() -> p.resolveCalling(() -> Actor.ref(f.get())),
|
|
|
|
() -> p.rejectWith(new ActorTerminated(a)));
|
|
|
|
return p;
|
|
|
|
}
|
|
|
|
|
2020-12-04 18:57:47 +00:00
|
|
|
public String getName() {
|
|
|
|
return _name;
|
|
|
|
}
|
|
|
|
|
|
|
|
public Throwable getExitReason() {
|
|
|
|
return _exitReason;
|
|
|
|
}
|
|
|
|
|
|
|
|
public Logger getLogger() {
|
|
|
|
return _logger;
|
|
|
|
}
|
|
|
|
|
|
|
|
public synchronized boolean isDaemon() {
|
|
|
|
return _alive && !_isCounted;
|
|
|
|
}
|
|
|
|
|
|
|
|
public synchronized Actor daemonize() {
|
|
|
|
this._releaseCount();
|
|
|
|
return this;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void _releaseCount() {
|
|
|
|
if (_isCounted) {
|
|
|
|
_isCounted = false;
|
|
|
|
synchronized (_count) {
|
|
|
|
if (_count.decrementAndGet() == 0) {
|
|
|
|
_count.notifyAll();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public String toString() {
|
|
|
|
return super.toString() + "(" + this._name + ")";
|
|
|
|
}
|
|
|
|
|
|
|
|
public<T> Remote<T> proxyFor(T o) {
|
2020-12-05 23:09:06 +00:00
|
|
|
return new Remote<>(this, o);
|
2020-12-04 18:57:47 +00:00
|
|
|
}
|
|
|
|
|
2020-12-09 15:12:58 +00:00
|
|
|
private void _performSync(Runnable work, Runnable ifNotAlive) {
|
2020-12-04 18:57:47 +00:00
|
|
|
synchronized (this) {
|
|
|
|
_currentActor.set(this);
|
|
|
|
try {
|
2020-12-09 15:12:58 +00:00
|
|
|
_perform(work, ifNotAlive);
|
2020-12-04 18:57:47 +00:00
|
|
|
} finally {
|
|
|
|
_currentActor.set(null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-09 15:12:58 +00:00
|
|
|
private void _perform(Runnable work, Runnable ifNotAlive) {
|
|
|
|
if (!_alive) {
|
|
|
|
if (ifNotAlive != null) ifNotAlive.run();
|
|
|
|
} else {
|
|
|
|
try {
|
|
|
|
work.run();
|
|
|
|
} catch (Throwable exn) {
|
|
|
|
this._stop(false, exn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-04 18:57:47 +00:00
|
|
|
public Promise<?> stop() {
|
|
|
|
return stop(null);
|
|
|
|
}
|
|
|
|
|
|
|
|
public Promise<?> stop(Throwable reason) {
|
|
|
|
if (current() == this) {
|
|
|
|
this._stop(true, reason);
|
|
|
|
return Promise.resolved();
|
|
|
|
} else {
|
|
|
|
Promise<?> p = new Promise<>();
|
|
|
|
this.execute(() -> {
|
|
|
|
this._stop(true, reason);
|
|
|
|
p.resolve();
|
|
|
|
}, p::resolve);
|
|
|
|
return p;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private synchronized void _stop(boolean normally, Throwable reason) {
|
|
|
|
if (_alive) {
|
|
|
|
_alive = false;
|
|
|
|
_exitReason = reason;
|
|
|
|
if (normally) {
|
2020-12-04 22:25:40 +00:00
|
|
|
getLogger().log(Level.FINE, "Actor stopped", reason);
|
2020-12-04 18:57:47 +00:00
|
|
|
} else {
|
|
|
|
getLogger().log(Level.SEVERE, "Actor terminated with error", reason);
|
|
|
|
}
|
2020-12-05 22:50:41 +00:00
|
|
|
Set<Actor> linkedPeers = _links;
|
|
|
|
if (linkedPeers != null) {
|
|
|
|
_links = null;
|
|
|
|
for (var peer : linkedPeers) {
|
|
|
|
peer.notifyExit(this);
|
|
|
|
}
|
|
|
|
}
|
2020-12-05 23:09:06 +00:00
|
|
|
Map<Object, Remote<BiConsumer<Actor, Object>>> monitoringPeers = _monitors;
|
2020-12-05 22:50:41 +00:00
|
|
|
if (monitoringPeers != null) {
|
|
|
|
_monitors = null;
|
|
|
|
for (var entry : monitoringPeers.entrySet()) {
|
|
|
|
final var ref = entry.getKey();
|
2020-12-05 23:09:06 +00:00
|
|
|
entry.getValue().async((h) -> h.accept(this, ref));
|
2020-12-05 22:50:41 +00:00
|
|
|
}
|
|
|
|
}
|
2020-12-04 18:57:47 +00:00
|
|
|
_releaseCount();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void execute(Runnable work) {
|
|
|
|
this.execute(work, null);
|
|
|
|
}
|
|
|
|
|
|
|
|
public void execute(Runnable work, Runnable ifNotAlive) {
|
|
|
|
this.later(0, work, ifNotAlive);
|
|
|
|
}
|
|
|
|
|
|
|
|
public void later(long delayMilliseconds, Runnable work) {
|
|
|
|
this.later(delayMilliseconds, work, null);
|
|
|
|
}
|
|
|
|
|
|
|
|
public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) {
|
2020-12-04 22:25:40 +00:00
|
|
|
if (delayMilliseconds == 0) {
|
2020-12-09 15:12:58 +00:00
|
|
|
{
|
|
|
|
WorkItem i = new WorkItem(work, ifNotAlive);
|
|
|
|
tail.getAndSet(i).set(i);
|
|
|
|
}
|
|
|
|
if (workItemCount.getAndIncrement() == 0) {
|
|
|
|
_executor.execute(() -> {
|
|
|
|
synchronized (this) {
|
|
|
|
_currentActor.set(this);
|
|
|
|
try {
|
|
|
|
long batch = workItemCount.get();
|
|
|
|
while (batch > 0) {
|
|
|
|
for (int count = 0; count < batch; count++) {
|
|
|
|
WorkItem i = null;
|
|
|
|
while (i == null) i = head.get();
|
|
|
|
head = i;
|
|
|
|
this._perform(i.work, i.ifNotAlive);
|
|
|
|
i.clear();
|
|
|
|
}
|
|
|
|
batch = workItemCount.addAndGet(-batch);
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
_currentActor.set(null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
2020-12-04 22:25:40 +00:00
|
|
|
} else {
|
2020-12-09 15:12:58 +00:00
|
|
|
_scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS);
|
2020-12-04 22:25:40 +00:00
|
|
|
}
|
2020-12-04 18:57:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public PeriodicTimer every(long periodMilliseconds, Runnable f) {
|
|
|
|
return every(0, periodMilliseconds, f);
|
|
|
|
}
|
|
|
|
|
|
|
|
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) {
|
|
|
|
return new PeriodicTimer(
|
2020-12-04 22:25:40 +00:00
|
|
|
_scheduledExecutor.scheduleAtFixedRate(
|
2020-12-09 15:12:58 +00:00
|
|
|
() -> this._performSync(f, null),
|
2020-12-04 18:57:47 +00:00
|
|
|
initialDelayMilliseconds,
|
|
|
|
periodMilliseconds,
|
|
|
|
TimeUnit.MILLISECONDS));
|
|
|
|
}
|
|
|
|
|
2020-12-04 22:25:40 +00:00
|
|
|
public static void awaitAll() throws InterruptedException {
|
2020-12-04 18:57:47 +00:00
|
|
|
while (_count.get() > 0) {
|
|
|
|
synchronized (_count) {
|
|
|
|
_count.wait(1000);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_executor.shutdown();
|
2020-12-04 22:25:40 +00:00
|
|
|
_scheduledExecutor.shutdown();
|
2020-12-09 15:12:58 +00:00
|
|
|
//noinspection ResultOfMethodCallIgnored
|
2020-12-04 22:25:40 +00:00
|
|
|
_executor.awaitTermination(5, TimeUnit.MINUTES);
|
2020-12-09 15:12:58 +00:00
|
|
|
//noinspection ResultOfMethodCallIgnored
|
2020-12-04 22:25:40 +00:00
|
|
|
_scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES);
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void convenientLogging() {
|
|
|
|
System.setProperty("java.util.logging.SimpleFormatter.format",
|
|
|
|
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
|
2020-12-04 18:57:47 +00:00
|
|
|
}
|
2020-12-05 22:50:41 +00:00
|
|
|
|
|
|
|
public void link() {
|
|
|
|
final Actor peer = Actor.current();
|
|
|
|
this.linkPeer(peer);
|
|
|
|
peer.linkPeer(this);
|
|
|
|
}
|
|
|
|
|
|
|
|
public void unlink() {
|
|
|
|
final Actor peer = Actor.current();
|
|
|
|
this.unlinkPeer(peer);
|
|
|
|
peer.unlinkPeer(this);
|
|
|
|
}
|
|
|
|
|
|
|
|
private synchronized void linkPeer(Actor peer) {
|
|
|
|
if (this._alive) {
|
|
|
|
if (_links == null) _links = new HashSet<>();
|
|
|
|
_links.add(peer);
|
|
|
|
} else {
|
|
|
|
peer.notifyExit(this);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void notifyExit(final Actor exitingPeer) {
|
|
|
|
this.execute(() -> {
|
|
|
|
this.unlinkPeer(exitingPeer);
|
|
|
|
if (this._exitTrap != null) {
|
|
|
|
this._exitTrap.accept(exitingPeer);
|
|
|
|
} else {
|
|
|
|
this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer));
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
private synchronized void unlinkPeer(Actor peer) {
|
|
|
|
if (_links != null) {
|
|
|
|
_links.remove(peer);
|
|
|
|
if (_links.isEmpty()) {
|
|
|
|
_links = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public void trapExits(Consumer<Actor> handler) {
|
|
|
|
this._exitTrap = handler;
|
|
|
|
}
|
|
|
|
|
|
|
|
public synchronized Object monitor(Consumer<Actor> handler) {
|
|
|
|
Object ref = new Object();
|
|
|
|
monitor(ref, (actor, _ref) -> handler.accept(actor));
|
|
|
|
return ref;
|
|
|
|
}
|
|
|
|
|
2020-12-05 23:09:06 +00:00
|
|
|
public synchronized void monitor(final Object ref, BiConsumer<Actor, Object> handler) {
|
2020-12-05 22:50:41 +00:00
|
|
|
if (this._alive) {
|
|
|
|
if (_monitors == null) _monitors = new HashMap<>();
|
|
|
|
_monitors.put(ref, Actor.ref(handler));
|
|
|
|
} else {
|
2020-12-05 23:09:06 +00:00
|
|
|
Actor.ref(handler).async((h) -> h.accept(this, ref));
|
2020-12-05 22:50:41 +00:00
|
|
|
}
|
|
|
|
}
|
2020-12-04 18:57:47 +00:00
|
|
|
}
|