From f8a7d95e55513de66e1621322a2d631d6c1548b2 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 5 Dec 2020 23:50:41 +0100 Subject: [PATCH] Links and monitors --- .../java/org/syndicate_lang/actors/Actor.java | 93 +++++++++++++ .../actors/IMonitorHandler.java | 5 + .../org/syndicate_lang/actors/Promise.java | 128 +++++++++++------- .../actors/example/example2/Forwarder.java | 2 - .../actors/example/example2/IForwarder.java | 3 - .../actors/example/example2/Main.java | 22 ++- 6 files changed, 184 insertions(+), 69 deletions(-) create mode 100644 src/main/java/org/syndicate_lang/actors/IMonitorHandler.java diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java index c1474dc..6c79f4d 100644 --- a/src/main/java/org/syndicate_lang/actors/Actor.java +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -1,7 +1,13 @@ package org.syndicate_lang.actors; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -21,6 +27,9 @@ public class Actor implements Executor { private boolean _alive = true; private Throwable _exitReason = null; private boolean _isCounted = true; + private Set _links = null; + private Map> _monitors = null; + private Consumer _exitTrap = null; public static Actor current() { return _currentActor.get(); @@ -48,6 +57,15 @@ public class Actor implements Executor { return new Actor().proxyFor(o); } + public static Promise> boot(Supplier f) { + Promise> p = new Promise<>(); + Actor a = new Actor(); + a.execute( + () -> p.resolveCalling(() -> Actor.ref(f.get())), + () -> p.rejectWith(new ActorTerminated(a))); + return p; + } + public String getName() { return _name; } @@ -134,6 +152,21 @@ public class Actor implements Executor { } else { getLogger().log(Level.SEVERE, "Actor terminated with error", reason); } + Set linkedPeers = _links; + if (linkedPeers != null) { + _links = null; + for (var peer : linkedPeers) { + peer.notifyExit(this); + } + } + Map> monitoringPeers = _monitors; + if (monitoringPeers != null) { + _monitors = null; + for (var entry : monitoringPeers.entrySet()) { + final var ref = entry.getKey(); + entry.getValue().async((h) -> h.handleMonitor(this, ref)); + } + } _releaseCount(); } } @@ -188,4 +221,64 @@ public class Actor implements Executor { System.setProperty("java.util.logging.SimpleFormatter.format", "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n"); } + + public void link() { + final Actor peer = Actor.current(); + this.linkPeer(peer); + peer.linkPeer(this); + } + + public void unlink() { + final Actor peer = Actor.current(); + this.unlinkPeer(peer); + peer.unlinkPeer(this); + } + + private synchronized void linkPeer(Actor peer) { + if (this._alive) { + if (_links == null) _links = new HashSet<>(); + _links.add(peer); + } else { + peer.notifyExit(this); + } + } + + private void notifyExit(final Actor exitingPeer) { + this.execute(() -> { + this.unlinkPeer(exitingPeer); + if (this._exitTrap != null) { + this._exitTrap.accept(exitingPeer); + } else { + this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer)); + } + }); + } + + private synchronized void unlinkPeer(Actor peer) { + if (_links != null) { + _links.remove(peer); + if (_links.isEmpty()) { + _links = null; + } + } + } + + public void trapExits(Consumer handler) { + this._exitTrap = handler; + } + + public synchronized Object monitor(Consumer handler) { + Object ref = new Object(); + monitor(ref, (actor, _ref) -> handler.accept(actor)); + return ref; + } + + public synchronized void monitor(final Object ref, IMonitorHandler handler) { + if (this._alive) { + if (_monitors == null) _monitors = new HashMap<>(); + _monitors.put(ref, Actor.ref(handler)); + } else { + Actor.ref(handler).async((h) -> h.handleMonitor(this, ref)); + } + } } diff --git a/src/main/java/org/syndicate_lang/actors/IMonitorHandler.java b/src/main/java/org/syndicate_lang/actors/IMonitorHandler.java new file mode 100644 index 0000000..0b76986 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/IMonitorHandler.java @@ -0,0 +1,5 @@ +package org.syndicate_lang.actors; + +public interface IMonitorHandler { + void handleMonitor(Actor exitingPeer, Object ref); +} diff --git a/src/main/java/org/syndicate_lang/actors/Promise.java b/src/main/java/org/syndicate_lang/actors/Promise.java index 1d1de0e..673ed45 100644 --- a/src/main/java/org/syndicate_lang/actors/Promise.java +++ b/src/main/java/org/syndicate_lang/actors/Promise.java @@ -5,13 +5,17 @@ import java.util.List; import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; public class Promise implements Future { - public static enum State { + + public static final TimeoutException TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION = new TimeoutException("Waiting for promise resolution"); + + public enum State { PENDING, FULFILLED, REJECTED - }; + } private volatile State _state = State.PENDING; private T _value = null; @@ -65,17 +69,20 @@ public class Promise implements Future { this.resolveWith(null); } - public synchronized void resolveWith(T t) { - if (this.isPending()) { - this._value = t; - this._state = State.FULFILLED; - var worklist = _resolvers; - _resolvers = null; - _rejecters = null; - if (worklist != null) { - for (var callback : worklist) { - callback.accept(t); - } + public void resolveWith(T t) { + List> worklist = null; + synchronized (this) { + if (this.isPending()) { + this._value = t; + this._state = State.FULFILLED; + worklist = _resolvers; + _resolvers = null; + _rejecters = null; + } + } + if (worklist != null) { + for (var callback : worklist) { + callback.accept(t); } } } @@ -84,21 +91,24 @@ public class Promise implements Future { if (this == t) { throw new IllegalArgumentException("cannot chain promise immediately to itself"); } - t.whenFulfilled((v) -> this.resolveWith(v)); - t.whenRejected((e) -> this.rejectWith(e)); + t.whenFulfilled(this::resolveWith); + t.whenRejected(this::rejectWith); } public void rejectWith(Throwable e) { - if (this.isPending()) { - this._reason = e; - this._state = State.REJECTED; - var worklist = _rejecters; - _resolvers = null; - _rejecters = null; - if (worklist != null) { - for (var callback : worklist) { - callback.accept(e); - } + List> worklist = null; + synchronized (this) { + if (this.isPending()) { + this._reason = e; + this._state = State.REJECTED; + worklist = _rejecters; + _resolvers = null; + _rejecters = null; + } + } + if (worklist != null) { + for (var callback : worklist) { + callback.accept(e); } } } @@ -107,29 +117,31 @@ public class Promise implements Future { return this.andThen(ok, null); } + public void resolveCalling(Supplier f) { + try { + this.resolveWith(f.get()); + } catch (Throwable e) { + this.rejectWith(e); + } + } + public synchronized Promise andThen(final Function ok, final Function fail) { Actor a0 = Actor.current(); final Actor a = a0 != null ? a0 : new Actor(); Promise p = new Promise<>(); - this.whenFulfilled((t) -> a.execute(() -> { - try { - p.resolveWith(ok.apply(t)); - } catch (Throwable e) { - p.rejectWith(e); - } - }, () -> p.rejectWith(new ActorTerminated(a)))); - this.whenRejected((e) -> a.execute(() -> { - try { - if (fail == null) { - p.rejectWith(e); - } else { - p.resolveWith(fail.apply(e)); - } - } catch (Throwable e2) { - p.rejectWith(e2); - } - }, () -> p.rejectWith(new ActorTerminated(a)))); + this.whenFulfilled((t) -> a.execute( + () -> p.resolveCalling(() -> ok.apply(t)), + () -> p.rejectWith(new ActorTerminated(a)))); + this.whenRejected((e) -> a.execute( + () -> { + if (fail == null) { + p.rejectWith(e); + } else { + p.resolveCalling(() -> fail.apply(e)); + } + }, + () -> p.rejectWith(new ActorTerminated(a)))); return p; } @@ -155,14 +167,30 @@ public class Promise implements Future { } public T _await(long delay, TimeUnit unit) throws TimeoutException, InterruptedException { - Semaphore s = new Semaphore(0); - this.whenFulfilled((_t) -> s.release()); - this.whenRejected((_e) -> s.release()); - if (delay == -1) { - s.acquire(); + Actor a = Actor.current(); + if (a == null) { + Semaphore s = new Semaphore(0); + this.whenFulfilled((_t) -> s.release()); + this.whenRejected((_e) -> s.release()); + if (delay == -1) { + s.acquire(); + } else { + if (!s.tryAcquire(delay, unit)) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION; + } } else { - if (!s.tryAcquire(delay, unit)) { - throw new TimeoutException(); + this.whenFulfilled((_t) -> { synchronized (a) { a.notifyAll(); } }); + this.whenRejected((_e) -> { synchronized (a) { a.notifyAll(); } }); + synchronized (a) { + if (delay == -1) { + while (this.isPending()) a.wait(); + } else { + long targetTime = System.currentTimeMillis() + unit.toMillis(delay); + while (this.isPending()) { + long now = System.currentTimeMillis(); + if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION; + a.wait(targetTime - now); + } + } } } if (this.isFulfilled()) { diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java index 074a4fd..0c53d0a 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java @@ -1,7 +1,5 @@ package org.syndicate_lang.actors.example.example2; -import org.syndicate_lang.actors.Actor; - import java.util.List; public class Forwarder implements IForwarder { diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java index c3bb01a..98bf62d 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java @@ -4,7 +4,4 @@ import org.syndicate_lang.actors.Actor; public interface IForwarder { void handleMessage(int hopCount); - default void shutdown() { - Actor.current().stop(); - } } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java index f0bb0e1..ccef5c8 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java @@ -1,6 +1,7 @@ package org.syndicate_lang.actors.example.example2; import org.syndicate_lang.actors.Actor; +import org.syndicate_lang.actors.Remote; import java.util.ArrayList; import java.util.List; @@ -9,8 +10,6 @@ import static java.lang.Integer.parseInt; public class Main implements IForwarder { - private List _actors; - public static void main(String[] args) throws InterruptedException { Actor.convenientLogging(); Actor.forObject(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await(); @@ -29,27 +28,22 @@ public class Main implements IForwarder { public void boot() { Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); - this._actors = new ArrayList<>(); - IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class); + List _actors = new ArrayList<>(); + final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class); for (int i = 0; i < _nActors; i++) { - this._actors.add(Actor.forObject( - new Forwarder(i, this._actors, me, this._nRounds)) - .asyncProxy(IForwarder.class)); -// Actor.log().info(this._actors.get(this._actors.size()-1).toString()); + Remote a = Actor.forObject(new Forwarder(i, _actors, me, this._nRounds)); + a.getActor().link(); + _actors.add(a.asyncProxy(IForwarder.class)); } Actor.log().info("Start"); - this._actors.forEach((a) -> a.handleMessage(0)); + _actors.forEach((a) -> a.handleMessage(0)); } @Override public void handleMessage(int hopCount) { this._remainingToReceive--; -// Actor.log().info(String.format("hopCount: %d, remainingToReceive: %d", -// hopCount, -// this._remainingToReceive)); if (this._remainingToReceive == 0) { - this._actors.forEach(IForwarder::shutdown); - this.shutdown(); + Actor.current().stop(); Actor.log().info("Stop after " + (_nActors * _nRounds) + " messages"); } }