diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java index c34e8a7..a4e3050 100644 --- a/src/main/java/org/syndicate_lang/actors/Actor.java +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -1,13 +1,10 @@ package org.syndicate_lang.actors; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -16,7 +13,6 @@ import java.util.logging.Logger; * I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat. */ public class Actor implements Executor { - private final static ThreadLocal _currentActor = new ThreadLocal<>(); private final static AtomicLong _count = new AtomicLong(0); private final static AtomicLong _actorId = new AtomicLong(0); protected final static ExecutorService _executor = Executors.newWorkStealingPool(); @@ -29,7 +25,7 @@ public class Actor implements Executor { private Throwable _exitReason = null; private boolean _isCounted = true; private Set _links = null; - private Map>> _monitors = null; + private Set>> _monitors = null; private Consumer _exitTrap = null; private final static class WorkItem extends AtomicReference { @@ -51,18 +47,6 @@ public class Actor implements Executor { private final AtomicReference tail = new AtomicReference<>(head); private final AtomicLong workItemCount = new AtomicLong(0); - public static Actor current() { - return _currentActor.get(); - } - - public static Logger log() { - return current().getLogger(); - } - - public static Remote ref(T o) { - return current().proxyFor(o); - } - public Actor() { this("" + _actorId.incrementAndGet()); } @@ -74,14 +58,14 @@ public class Actor implements Executor { } public static Remote forObject(T o) { - return new Actor().proxyFor(o); + return new Actor().ref(o); } public static Promise> boot(ThrowingSupplier f) { - Promise> p = new Promise<>(); - Actor a = new Actor(); + final Promise> p = new Promise<>(); + final Actor a = new Actor(); a.execute( - () -> p.resolveCalling(() -> Actor.ref(f.get())), + () -> p.resolveCalling(() -> a.ref(f.get())), () -> p.rejectWith(new ActorTerminated(a))); return p; } @@ -94,7 +78,7 @@ public class Actor implements Executor { return _exitReason; } - public Logger getLogger() { + public Logger log() { return _logger; } @@ -122,18 +106,13 @@ public class Actor implements Executor { return super.toString() + "(" + this._name + ")"; } - public Remote proxyFor(T o) { + public Remote ref(T o) { return new Remote<>(this, o); } private void _performSync(Runnable work, Runnable ifNotAlive) { synchronized (this) { - _currentActor.set(this); - try { - _perform(work, ifNotAlive); - } finally { - _currentActor.set(null); - } + _perform(work, ifNotAlive); } } @@ -154,17 +133,12 @@ public class Actor implements Executor { } public Promise stop(Throwable reason) { - if (current() == this) { + Promise p = new Promise<>(); + this.execute(() -> { this._stop(true, reason); - return Promise.resolved(); - } else { - Promise p = new Promise<>(); - this.execute(() -> { - this._stop(true, reason); - p.resolve(); - }, p::resolve); - return p; - } + p.resolve(); + }, p::resolve); + return p; } private synchronized void _stop(boolean normally, Throwable reason) { @@ -172,9 +146,9 @@ public class Actor implements Executor { _alive = false; _exitReason = reason; if (normally) { - getLogger().log(Level.FINE, "Actor stopped", reason); + log().log(Level.FINE, "Actor stopped", reason); } else { - getLogger().log(Level.SEVERE, "Actor terminated with error", reason); + log().log(Level.SEVERE, "Actor terminated with error", reason); } Set linkedPeers = _links; if (linkedPeers != null) { @@ -183,12 +157,11 @@ public class Actor implements Executor { peer.notifyExit(this); } } - Map>> monitoringPeers = _monitors; + Set>> monitoringPeers = _monitors; if (monitoringPeers != null) { _monitors = null; - for (var entry : monitoringPeers.entrySet()) { - final var ref = entry.getKey(); - entry.getValue().async((h) -> h.accept(this, ref)); + for (var handler : monitoringPeers) { + handler.async(Consumer::accept); } } _releaseCount(); @@ -208,21 +181,16 @@ public class Actor implements Executor { if (workItemCount.getAndIncrement() == 0) { _executor.execute(() -> { synchronized (this) { - _currentActor.set(this); - try { - long batch = workItemCount.get(); - while (batch > 0) { - for (int count = 0; count < batch; count++) { - WorkItem i = null; - while (i == null) i = head.get(); - head = i; - this._perform(i.work, i.ifNotAlive); - i.clear(); - } - batch = workItemCount.addAndGet(-batch); + long batch = workItemCount.get(); + while (batch > 0) { + for (int count = 0; count < batch; count++) { + WorkItem i = null; + while (i == null) i = head.get(); + head = i; + this._perform(i.work, i.ifNotAlive); + i.clear(); } - } finally { - _currentActor.set(null); + batch = workItemCount.addAndGet(-batch); } } }); @@ -273,14 +241,12 @@ public class Actor implements Executor { "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n"); } - public void link() { - final Actor peer = Actor.current(); + public void link(Actor peer) { this.linkPeer(peer); peer.linkPeer(this); } - public void unlink() { - final Actor peer = Actor.current(); + public void unlink(Actor peer) { this.unlinkPeer(peer); peer.unlinkPeer(this); } @@ -318,18 +284,16 @@ public class Actor implements Executor { this._exitTrap = handler; } - public synchronized Object monitor(Consumer handler) { - Object ref = new Object(); - monitor(ref, (actor, _ref) -> handler.accept(actor)); - return ref; + public void monitor(Actor peer, Consumer handler) { + peer.installMonitor(this.ref(handler)); } - public synchronized void monitor(final Object ref, BiConsumer handler) { + private synchronized void installMonitor(Remote> handler) { if (this._alive) { - if (_monitors == null) _monitors = new HashMap<>(); - _monitors.put(ref, Actor.ref(handler)); + if (_monitors == null) _monitors = new HashSet<>(); + _monitors.add(handler); } else { - Actor.ref(handler).async((h) -> h.accept(this, ref)); + handler.async(Consumer::accept); } } } diff --git a/src/main/java/org/syndicate_lang/actors/AsyncProxy.java b/src/main/java/org/syndicate_lang/actors/AsyncProxy.java index 3d62f70..e089699 100644 --- a/src/main/java/org/syndicate_lang/actors/AsyncProxy.java +++ b/src/main/java/org/syndicate_lang/actors/AsyncProxy.java @@ -11,7 +11,7 @@ public class AsyncProxy extends AbstractProxy { @Override public Object dispatch(Method method, Object[] args) { if (method.getReturnType().equals(void.class)) { - this._ref.async((v) -> { + this._ref.async((v, _ac) -> { try { method.invoke(v, args); } catch (IllegalAccessException e) { diff --git a/src/main/java/org/syndicate_lang/actors/Promise.java b/src/main/java/org/syndicate_lang/actors/Promise.java index 2510e9b..59b512a 100644 --- a/src/main/java/org/syndicate_lang/actors/Promise.java +++ b/src/main/java/org/syndicate_lang/actors/Promise.java @@ -112,8 +112,8 @@ public class Promise implements Future { } } - public Promise andThen(Function ok) { - return this.andThen(ok, null); + public Promise andThen(Actor a, Function ok) { + return this.andThen(a, ok, null); } public void resolveCalling(ThrowingSupplier f) { @@ -124,10 +124,8 @@ public class Promise implements Future { } } - public synchronized Promise andThen(final Function ok, final Function fail) { - Actor a0 = Actor.current(); + public synchronized Promise andThen(Actor a0, final Function ok, final Function fail) { final Actor a = a0 != null ? a0 : new Actor(); - Promise p = new Promise<>(); this.whenFulfilled((t) -> a.execute( () -> p.resolveCalling(() -> ok.apply(t)), @@ -166,29 +164,17 @@ public class Promise implements Future { } public T _await(long delay, TimeUnit unit) throws TimeoutException, InterruptedException { - Actor a = Actor.current(); - if (a == null) { - Semaphore s = new Semaphore(0); - this.whenFulfilled((_t) -> s.release()); - this.whenRejected((_e) -> s.release()); + this.whenFulfilled((_t) -> { synchronized (this) { this.notifyAll(); } }); + this.whenRejected((_e) -> { synchronized (this) { this.notifyAll(); } }); + synchronized (this) { if (delay == -1) { - s.acquire(); + while (this.isPending()) this.wait(); } else { - if (!s.tryAcquire(delay, unit)) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION; - } - } else { - this.whenFulfilled((_t) -> { synchronized (a) { a.notifyAll(); } }); - this.whenRejected((_e) -> { synchronized (a) { a.notifyAll(); } }); - synchronized (a) { - if (delay == -1) { - while (this.isPending()) a.wait(); - } else { - long targetTime = System.currentTimeMillis() + unit.toMillis(delay); - while (this.isPending()) { - long now = System.currentTimeMillis(); - if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION; - a.wait(targetTime - now); - } + long targetTime = System.currentTimeMillis() + unit.toMillis(delay); + while (this.isPending()) { + long now = System.currentTimeMillis(); + if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION; + this.wait(targetTime - now); } } } diff --git a/src/main/java/org/syndicate_lang/actors/Remote.java b/src/main/java/org/syndicate_lang/actors/Remote.java index bbf4ac0..0b003bf 100644 --- a/src/main/java/org/syndicate_lang/actors/Remote.java +++ b/src/main/java/org/syndicate_lang/actors/Remote.java @@ -1,8 +1,8 @@ package org.syndicate_lang.actors; import java.lang.reflect.Proxy; -import java.util.function.Consumer; -import java.util.function.Function; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; public class Remote { private final Actor _actor; @@ -17,41 +17,41 @@ public class Remote { return _actor; } - public void async(Consumer f) { - this._actor.execute(() -> f.accept(this._target)); + public void async(BiConsumer f) { + this._actor.execute(() -> f.accept(this._target, this._actor)); } - public void async(long delayMilliseconds, Consumer f) { - this._actor.later(delayMilliseconds, () -> f.accept(this._target)); + public void async(long delayMilliseconds, BiConsumer f) { + this._actor.later(delayMilliseconds, () -> f.accept(this._target, this._actor)); } - public Promise syncVoid(Consumer f) { - return this.sync((t) -> { - f.accept(t); + public Promise syncVoid(BiConsumer f) { + return this.sync((t, ac) -> { + f.accept(t, ac); return null; }); } - public Promise syncVoid(long delayMilliseconds, Consumer f) { - return this.sync(delayMilliseconds, (t) -> { - f.accept(t); + public Promise syncVoid(long delayMilliseconds, BiConsumer f) { + return this.sync(delayMilliseconds, (t, ac) -> { + f.accept(t, ac); return null; }); } - public Promise sync(Function f) { + public Promise sync(BiFunction f) { Promise p = new Promise<>(); this._actor.execute( - () -> p.resolveWith(f.apply(this._target)), + () -> p.resolveWith(f.apply(this._target, this._actor)), () -> p.rejectWith(this._actor.getExitReason())); return p; } - public Promise sync(long delayMilliseconds, Function f) { + public Promise sync(long delayMilliseconds, BiFunction f) { Promise p = new Promise<>(); this._actor.later( delayMilliseconds, - () -> p.resolveWith(f.apply(this._target)), + () -> p.resolveWith(f.apply(this._target, this._actor)), () -> p.rejectWith(this._actor.getExitReason())); return p; } diff --git a/src/main/java/org/syndicate_lang/actors/SyncProxy.java b/src/main/java/org/syndicate_lang/actors/SyncProxy.java index 23e8bd8..73208a8 100644 --- a/src/main/java/org/syndicate_lang/actors/SyncProxy.java +++ b/src/main/java/org/syndicate_lang/actors/SyncProxy.java @@ -10,7 +10,7 @@ public class SyncProxy extends AbstractProxy { @Override public Object dispatch(Method method, Object[] args) { - return this._ref.sync((v) -> { + return this._ref.sync((v, _ac) -> { try { return method.invoke(v, args); } catch (IllegalAccessException e) { diff --git a/src/test/java/org/syndicate_lang/actors/example/example1/Main.java b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java index 869996d..1a77bf1 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example1/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java @@ -9,7 +9,7 @@ public class Main { final var vh = Actor.forObject(new ValueHolder<>("There")); vh.getActor().daemonize(); final var m = Actor.forObject(new Main()); - m.async(10, (m_) -> m_.run(vh)); + m.async(10, (m_, ac) -> m_.run(ac, vh)); @SuppressWarnings("unchecked") IValueHolder vv = vh.syncProxy(IValueHolder.class); @@ -23,18 +23,20 @@ public class Main { System.out.println("Overall main returning"); } + private Actor me; private int greetingCounter = 0; - public void run(Remote> vh) { + public void run(Actor me, Remote> vh) { + this.me = me; this.greet((String) vh.syncProxy(IValueHolder.class).get()); - vh.syncVoid((v) -> v.set("World")); - Actor.current().every(1000, () -> { - if (greetingCounter >= 3) Actor.current().stop(); - this.greet(vh.sync(ValueHolder::get).await()); + vh.syncVoid((v, _ac) -> v.set("World")); + me.every(1000, () -> { + if (greetingCounter >= 3) me.stop(); + this.greet(vh.sync((v, _ac) -> v.get()).await()); }); } public void greet(String who) { - Actor.log().info((greetingCounter++) + ": Hi " + who); + me.log().info((greetingCounter++) + ": Hi " + who); } } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java index 0f4f7ae..a948ca8 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java @@ -1,5 +1,6 @@ package org.syndicate_lang.actors.example.example2; +import org.syndicate_lang.actors.Actor; import org.syndicate_lang.actors.Remote; public class Forwarder implements IForwarder { @@ -18,8 +19,8 @@ public class Forwarder implements IForwarder { } @Override - public void handleMessage(final int hopCount) { + public void handleMessage(Actor _ac, final int hopCount) { Remote target = hopCount >= this._nRounds - 1 ? _main : _peer; - target.async(f -> f.handleMessage(hopCount + 1)); + target.async((f, ac) -> f.handleMessage(ac, hopCount + 1)); } } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java index a004187..ac548b5 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java @@ -1,8 +1,9 @@ package org.syndicate_lang.actors.example.example2; +import org.syndicate_lang.actors.Actor; import org.syndicate_lang.actors.Remote; public interface IForwarder { void setPeer(Remote peer); - void handleMessage(int hopCount); + void handleMessage(Actor ac, int hopCount); } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java index 4beff67..0b3307d 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java @@ -27,25 +27,25 @@ public class Main implements IForwarder { this._remainingToReceive = nActors; } - public void boot() { - Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); + public void boot(Actor ac) { + ac.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); final List> _actors = new ArrayList<>(); - final Remote me = Actor.ref(this); + final Remote me = ac.ref(this); Remote previous = null; for (int i = 0; i < _nActors; i++) { Remote current = Actor.forObject(new Forwarder(me, this._nRounds)); - current.getActor().link(); + ac.link(current.getActor()); _actors.add(current); if (previous != null) { final var p = previous; - current.async(f -> f.setPeer(p)); + current.async((f, _ac) -> f.setPeer(p)); } previous = current; } - _actors.get(0).async(f -> f.setPeer(_actors.get(_nActors - 1))); - Actor.log().info("Start"); + _actors.get(0).async((f, _ac) -> f.setPeer(_actors.get(_nActors - 1))); + ac.log().info("Start"); this._startTime = System.currentTimeMillis(); - _actors.forEach(a -> a.async(f -> f.handleMessage(0))); + _actors.forEach(a -> a.async((f, ac2) -> f.handleMessage(ac2, 0))); } @Override @@ -54,14 +54,14 @@ public class Main implements IForwarder { } @Override - public void handleMessage(int hopCount) { + public void handleMessage(Actor ac, int hopCount) { this._remainingToReceive--; if (this._remainingToReceive == 0) { double delta = (System.currentTimeMillis() - this._startTime) / 1000.0; long nMessages = (long) _nActors * (long) _nRounds; double hz = nMessages / delta; - Actor.current().stop(); - Actor.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz", + ac.stop(); + ac.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz", nMessages, delta, hz));