From af611e1b2b38bd5e4b0f3f98e3131750f89deb57 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 14 Apr 2021 21:42:50 +0200 Subject: [PATCH] First broad step toward novy --- .../syndicate_lang/actors/AbstractProxy.java | 38 ----- .../java/org/syndicate_lang/actors/Actor.java | 118 +++++---------- .../org/syndicate_lang/actors/AsyncProxy.java | 35 ----- .../org/syndicate_lang/actors/Entity.java | 20 +++ .../org/syndicate_lang/actors/IEntity.java | 8 + .../java/org/syndicate_lang/actors/Ref.java | 24 +++ .../org/syndicate_lang/actors/Remote.java | 86 ----------- .../org/syndicate_lang/actors/SyncProxy.java | 28 ---- .../java/org/syndicate_lang/actors/Turn.java | 137 ++++++++++++++++++ .../actors/example/example1/Main.java | 10 +- .../actors/example/example2/Forwarder.java | 12 +- .../actors/example/example2/IForwarder.java | 4 +- .../actors/example/example2/Main.java | 14 +- 13 files changed, 242 insertions(+), 292 deletions(-) delete mode 100644 src/main/java/org/syndicate_lang/actors/AbstractProxy.java delete mode 100644 src/main/java/org/syndicate_lang/actors/AsyncProxy.java create mode 100644 src/main/java/org/syndicate_lang/actors/Entity.java create mode 100644 src/main/java/org/syndicate_lang/actors/IEntity.java create mode 100644 src/main/java/org/syndicate_lang/actors/Ref.java delete mode 100644 src/main/java/org/syndicate_lang/actors/Remote.java delete mode 100644 src/main/java/org/syndicate_lang/actors/SyncProxy.java create mode 100644 src/main/java/org/syndicate_lang/actors/Turn.java diff --git a/src/main/java/org/syndicate_lang/actors/AbstractProxy.java b/src/main/java/org/syndicate_lang/actors/AbstractProxy.java deleted file mode 100644 index 10bfb09..0000000 --- a/src/main/java/org/syndicate_lang/actors/AbstractProxy.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.syndicate_lang.actors; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; - -public abstract class AbstractProxy implements InvocationHandler { - protected final Remote _ref; - - public AbstractProxy(Remote ref) { - this._ref = ref; - } - - public Remote ref() { - return this._ref; - } - - abstract boolean isSync(); - - private static Method toStringMethod; - - static { - try { - toStringMethod = Object.class.getMethod("toString"); - } catch (NoSuchMethodException e) { - toStringMethod = null; - } - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) { - if (method.equals(toStringMethod)) { - return this._ref.toString(); - } - return dispatch(method, args); - } - - abstract Object dispatch(Method method, Object[] args); -} diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java index a4e3050..487ade5 100644 --- a/src/main/java/org/syndicate_lang/actors/Actor.java +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -1,11 +1,10 @@ package org.syndicate_lang.actors; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -24,9 +23,7 @@ public class Actor implements Executor { private boolean _alive = true; private Throwable _exitReason = null; private boolean _isCounted = true; - private Set _links = null; - private Set>> _monitors = null; - private Consumer _exitTrap = null; + private Map _outbound; private final static class WorkItem extends AtomicReference { Runnable work; @@ -48,21 +45,26 @@ public class Actor implements Executor { private final AtomicLong workItemCount = new AtomicLong(0); public Actor() { - this("" + _actorId.incrementAndGet()); + this(null, null); } public Actor(String debugName) { - this._name = debugName; + this(debugName, null); + } + + public Actor(String debugName, Map outbound) { + this._name = debugName == null ? "" + _actorId.incrementAndGet() : debugName; this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")"); + this._outbound = outbound == null ? new HashMap<>() : outbound; _count.incrementAndGet(); } - public static Remote forObject(T o) { + public static Ref forEntity(IEntity o) { return new Actor().ref(o); } - public static Promise> boot(ThrowingSupplier f) { - final Promise> p = new Promise<>(); + public static Promise boot(ThrowingSupplier f) { + final Promise p = new Promise<>(); final Actor a = new Actor(); a.execute( () -> p.resolveCalling(() -> a.ref(f.get())), @@ -102,12 +104,24 @@ public class Actor implements Executor { } } + public void _injectOutbound(Long handle, Ref peer) { + _outbound.put(handle, peer); + } + + public Ref _lookupOutbound(Long handle) { + return _outbound.get(handle); + } + + public Ref _extractOutbound(Long handle) { + return _outbound.remove(handle); + } + public String toString() { return super.toString() + "(" + this._name + ")"; } - public Remote ref(T o) { - return new Remote<>(this, o); + public Ref ref(IEntity o) { + return new Ref(this, o); } private void _performSync(Runnable work, Runnable ifNotAlive) { @@ -129,13 +143,16 @@ public class Actor implements Executor { } public Promise stop() { - return stop(null); + return stop(true, null); } public Promise stop(Throwable reason) { + return stop(true, reason); + } + public Promise stop(boolean normally, Throwable reason) { Promise p = new Promise<>(); this.execute(() -> { - this._stop(true, reason); + this._stop(normally, reason); p.resolve(); }, p::resolve); return p; @@ -150,20 +167,7 @@ public class Actor implements Executor { } else { log().log(Level.SEVERE, "Actor terminated with error", reason); } - Set linkedPeers = _links; - if (linkedPeers != null) { - _links = null; - for (var peer : linkedPeers) { - peer.notifyExit(this); - } - } - Set>> monitoringPeers = _monitors; - if (monitoringPeers != null) { - _monitors = null; - for (var handler : monitoringPeers) { - handler.async(Consumer::accept); - } - } + Turn.forActor(this, t -> _outbound.forEach(t::_retract_), true); _releaseCount(); } } @@ -240,60 +244,4 @@ public class Actor implements Executor { System.setProperty("java.util.logging.SimpleFormatter.format", "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n"); } - - public void link(Actor peer) { - this.linkPeer(peer); - peer.linkPeer(this); - } - - public void unlink(Actor peer) { - this.unlinkPeer(peer); - peer.unlinkPeer(this); - } - - private synchronized void linkPeer(Actor peer) { - if (this._alive) { - if (_links == null) _links = new HashSet<>(); - _links.add(peer); - } else { - peer.notifyExit(this); - } - } - - private void notifyExit(final Actor exitingPeer) { - this.execute(() -> { - this.unlinkPeer(exitingPeer); - if (this._exitTrap != null) { - this._exitTrap.accept(exitingPeer); - } else { - this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer)); - } - }); - } - - private synchronized void unlinkPeer(Actor peer) { - if (_links != null) { - _links.remove(peer); - if (_links.isEmpty()) { - _links = null; - } - } - } - - public void trapExits(Consumer handler) { - this._exitTrap = handler; - } - - public void monitor(Actor peer, Consumer handler) { - peer.installMonitor(this.ref(handler)); - } - - private synchronized void installMonitor(Remote> handler) { - if (this._alive) { - if (_monitors == null) _monitors = new HashSet<>(); - _monitors.add(handler); - } else { - handler.async(Consumer::accept); - } - } } diff --git a/src/main/java/org/syndicate_lang/actors/AsyncProxy.java b/src/main/java/org/syndicate_lang/actors/AsyncProxy.java deleted file mode 100644 index e089699..0000000 --- a/src/main/java/org/syndicate_lang/actors/AsyncProxy.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.syndicate_lang.actors; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -public class AsyncProxy extends AbstractProxy { - public AsyncProxy(Remote ref) { - super(ref); - } - - @Override - public Object dispatch(Method method, Object[] args) { - if (method.getReturnType().equals(void.class)) { - this._ref.async((v, _ac) -> { - try { - method.invoke(v, args); - } catch (IllegalAccessException e) { - throw new ProxyFailure(e); - } catch (InvocationTargetException e) { - throw new ProxyFailure(e.getCause()); - } - }); - return null; - } else { - System.err.println(method.getReturnType()); - throw new UnsupportedOperationException( - "Cannot invoke non-void-returning method '" + method + "' asynchronously via AsyncProxy"); - } - } - - @Override - boolean isSync() { - return false; - } -} diff --git a/src/main/java/org/syndicate_lang/actors/Entity.java b/src/main/java/org/syndicate_lang/actors/Entity.java new file mode 100644 index 0000000..636bc04 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/Entity.java @@ -0,0 +1,20 @@ +package org.syndicate_lang.actors; + +public class Entity implements IEntity { + @Override + public void assert_(Turn turn, Object assertion, Long handle) { + } + + @Override + public void retract_(Turn turn, Long handle) { + } + + @Override + public void sync_(Turn turn, Ref peer) { + turn.message_(peer, true); + } + + @Override + public void message_(Turn turn, Object body) { + } +} diff --git a/src/main/java/org/syndicate_lang/actors/IEntity.java b/src/main/java/org/syndicate_lang/actors/IEntity.java new file mode 100644 index 0000000..3419508 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/IEntity.java @@ -0,0 +1,8 @@ +package org.syndicate_lang.actors; + +public interface IEntity { + void assert_(Turn turn, Object assertion, Long handle); + void retract_(Turn turn, Long handle); + void sync_(Turn turn, Ref peer); + void message_(Turn turn, Object body); +} diff --git a/src/main/java/org/syndicate_lang/actors/Ref.java b/src/main/java/org/syndicate_lang/actors/Ref.java new file mode 100644 index 0000000..ee3d271 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/Ref.java @@ -0,0 +1,24 @@ +package org.syndicate_lang.actors; + +public class Ref { + private final Actor _actor; + private final IEntity _target; + + public Ref(Actor actor, IEntity target) { + this._actor = actor; + this._target = target; + } + + public Actor getActor() { + return _actor; + } + + IEntity getEntity() { + return _target; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")"; + } +} diff --git a/src/main/java/org/syndicate_lang/actors/Remote.java b/src/main/java/org/syndicate_lang/actors/Remote.java deleted file mode 100644 index 0b003bf..0000000 --- a/src/main/java/org/syndicate_lang/actors/Remote.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.syndicate_lang.actors; - -import java.lang.reflect.Proxy; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; - -public class Remote { - private final Actor _actor; - private final T _target; - - public Remote(Actor actor, T target) { - this._actor = actor; - this._target = target; - } - - public Actor getActor() { - return _actor; - } - - public void async(BiConsumer f) { - this._actor.execute(() -> f.accept(this._target, this._actor)); - } - - public void async(long delayMilliseconds, BiConsumer f) { - this._actor.later(delayMilliseconds, () -> f.accept(this._target, this._actor)); - } - - public Promise syncVoid(BiConsumer f) { - return this.sync((t, ac) -> { - f.accept(t, ac); - return null; - }); - } - - public Promise syncVoid(long delayMilliseconds, BiConsumer f) { - return this.sync(delayMilliseconds, (t, ac) -> { - f.accept(t, ac); - return null; - }); - } - - public Promise sync(BiFunction f) { - Promise p = new Promise<>(); - this._actor.execute( - () -> p.resolveWith(f.apply(this._target, this._actor)), - () -> p.rejectWith(this._actor.getExitReason())); - return p; - } - - public Promise sync(long delayMilliseconds, BiFunction f) { - Promise p = new Promise<>(); - this._actor.later( - delayMilliseconds, - () -> p.resolveWith(f.apply(this._target, this._actor)), - () -> p.rejectWith(this._actor.getExitReason())); - return p; - } - - private void checkTargetInstance(Class c) { - if (!c.isInstance(this._target)) { - throw new IllegalArgumentException("target is not an instance of " + c); - } - } - - @SuppressWarnings("unchecked") - public I syncProxy(Class c) { - checkTargetInstance(c); - return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new SyncProxy<>(this)); - } - - @SuppressWarnings("unchecked") - public I asyncProxy(Class c) { - checkTargetInstance(c); - return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new AsyncProxy<>(this)); - } - - @SuppressWarnings("unchecked") - public static Remote from(I proxy) { - return ((AbstractProxy) Proxy.getInvocationHandler(proxy)).ref(); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")"; - } -} diff --git a/src/main/java/org/syndicate_lang/actors/SyncProxy.java b/src/main/java/org/syndicate_lang/actors/SyncProxy.java deleted file mode 100644 index 73208a8..0000000 --- a/src/main/java/org/syndicate_lang/actors/SyncProxy.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.syndicate_lang.actors; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -public class SyncProxy extends AbstractProxy { - public SyncProxy(Remote ref) { - super(ref); - } - - @Override - public Object dispatch(Method method, Object[] args) { - return this._ref.sync((v, _ac) -> { - try { - return method.invoke(v, args); - } catch (IllegalAccessException e) { - throw new ProxyFailure(e); - } catch (InvocationTargetException e) { - throw new ProxyFailure(e.getCause()); - } - }).await(); - } - - @Override - boolean isSync() { - return true; - } -} diff --git a/src/main/java/org/syndicate_lang/actors/Turn.java b/src/main/java/org/syndicate_lang/actors/Turn.java new file mode 100644 index 0000000..b33afa7 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/Turn.java @@ -0,0 +1,137 @@ +package org.syndicate_lang.actors; + +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class Turn { + private final static AtomicLong nextHandle = new AtomicLong(0); + + private final Actor _actor; + private Map>> _pending = null; + private boolean _complete = false; + + public static void forActor(Actor a, Consumer f) { + Turn.forActor(a, f, false); + } + + public static void forActor(Actor a, Consumer f, boolean zombieTurn) { + if ((a.getExitReason() == null) == zombieTurn) return; + Turn t = new Turn(a); + try { + f.accept(t); + t.commit(); + } catch (Exception e) { + a.stop(false, e); + } + } + + private Turn(Actor a) { + this._actor = a; + } + + public Actor getActor() { + return _actor; + } + + private void commit() { + if (_pending != null) { + _pending.forEach((ac, q) -> ac.execute(() -> q.forEach(f -> Turn.forActor(ac, f)))); + _pending = null; + } + _complete = true; + } + + private void enqueue(Actor target, Consumer action) { + if (_complete) throw new IllegalStateException("Attempt to reuse a committed Turn"); + if (_pending == null) _pending = new HashMap<>(); + _pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action); + } + + public void freshen(Consumer action) { + if (!_complete) throw new IllegalStateException(("Attempt to freshen a non-stale Turn")); + Turn.forActor(this._actor, action); + } + + public Ref ref(IEntity o) { + return _actor.ref(o); + } + + public void spawn(Consumer bootProc) { + this.spawn(bootProc, new HashSet<>()); + } + + public void spawn(Consumer bootProc, Set initialAssertions) { + this.enqueue(this._actor, t -> { + Map newOutbound = new HashMap<>(); + initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k))); + Actor newActor = new Actor(null, newOutbound); + newActor.execute(() -> Turn.forActor(newActor, bootProc)); + }); + } + + public void quit() { + this._actor.stop(); + } + + public void crash(Throwable e) { + this._actor.stop(false, e); + } + + public Long assert_(Ref target, Object assertion) { + Long h = Turn.nextHandle.getAndIncrement(); + _assert_(target, assertion, h); + return h; + } + + private void _assert_(Ref target, Object assertion, Long h) { + Object a = assertion; // TODO: runRewrites from target + if (a != null) { + this.enqueue(target.getActor(), t -> { + _actor._injectOutbound(h, target); + target.getEntity().assert_(t, a, h); + }); + } + } + + public void retract_(Long h) { + if (h == null) return; + Ref peer = this._actor._lookupOutbound(h); + if (peer == null) return; + _retract_(h, peer); + } + + public Long replace_(Ref peer, Long h, Object assertion) { + var newHandle = assert_(peer, assertion); + retract_(h); + return newHandle; + } + + void _retract_(Long handle, Ref peer) { + this.enqueue(peer.getActor(), t -> { + this._actor._extractOutbound(handle); + peer.getEntity().retract_(t, handle); + }); + } + + public Promise sync_(Ref peer) { + Promise p = new Promise<>(); + this._sync_(peer, this.ref(new Entity() { + public void message_(Turn t, Object _message) { + p.resolveWith(t); + } + })); + return p; + } + + private void _sync_(Ref peer, Ref callback) { + this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback)); + } + + public void message_(Ref peer, Object body) { + Object a = body; // TODO runRewrites + if (a != null) { + this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body)); + } + } +} diff --git a/src/test/java/org/syndicate_lang/actors/example/example1/Main.java b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java index 1a77bf1..803becf 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example1/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java @@ -1,14 +1,14 @@ package org.syndicate_lang.actors.example.example1; import org.syndicate_lang.actors.Actor; -import org.syndicate_lang.actors.Remote; +import org.syndicate_lang.actors.Ref; public class Main { public static void main(String[] args) throws InterruptedException { Actor.convenientLogging(); - final var vh = Actor.forObject(new ValueHolder<>("There")); + final var vh = Actor.forEntity(new ValueHolder<>("There")); vh.getActor().daemonize(); - final var m = Actor.forObject(new Main()); + final var m = Actor.forEntity(new Main()); m.async(10, (m_, ac) -> m_.run(ac, vh)); @SuppressWarnings("unchecked") @@ -17,7 +17,7 @@ public class Main { System.out.println("Value: " + vv.get()); vv.set("Second"); System.out.println("Value: " + vv.get()); - System.out.println("Underlying: " + Remote.from(vv)); + System.out.println("Underlying: " + Ref.from(vv)); Actor.awaitAll(); System.out.println("Overall main returning"); @@ -26,7 +26,7 @@ public class Main { private Actor me; private int greetingCounter = 0; - public void run(Actor me, Remote> vh) { + public void run(Actor me, Ref> vh) { this.me = me; this.greet((String) vh.syncProxy(IValueHolder.class).get()); vh.syncVoid((v, _ac) -> v.set("World")); diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java index a948ca8..d751393 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java @@ -1,26 +1,26 @@ package org.syndicate_lang.actors.example.example2; import org.syndicate_lang.actors.Actor; -import org.syndicate_lang.actors.Remote; +import org.syndicate_lang.actors.Ref; public class Forwarder implements IForwarder { - private final Remote _main; + private final Ref _main; private final int _nRounds; - private Remote _peer = null; + private Ref _peer = null; - public Forwarder(Remote main, int nRounds) { + public Forwarder(Ref main, int nRounds) { this._main = main; this._nRounds = nRounds; } @Override - public void setPeer(Remote peer) { + public void setPeer(Ref peer) { this._peer = peer; } @Override public void handleMessage(Actor _ac, final int hopCount) { - Remote target = hopCount >= this._nRounds - 1 ? _main : _peer; + Ref target = hopCount >= this._nRounds - 1 ? _main : _peer; target.async((f, ac) -> f.handleMessage(ac, hopCount + 1)); } } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java index ac548b5..a10296f 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java @@ -1,9 +1,9 @@ package org.syndicate_lang.actors.example.example2; import org.syndicate_lang.actors.Actor; -import org.syndicate_lang.actors.Remote; +import org.syndicate_lang.actors.Ref; public interface IForwarder { - void setPeer(Remote peer); + void setPeer(Ref peer); void handleMessage(Actor ac, int hopCount); } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java index 0b3307d..15ec4c7 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java @@ -1,7 +1,7 @@ package org.syndicate_lang.actors.example.example2; import org.syndicate_lang.actors.Actor; -import org.syndicate_lang.actors.Remote; +import org.syndicate_lang.actors.Ref; import java.util.ArrayList; import java.util.List; @@ -12,7 +12,7 @@ public class Main implements IForwarder { public static void main(String[] args) throws InterruptedException { Actor.convenientLogging(); - Actor.forObject(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await(); + Actor.forEntity(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await(); Actor.awaitAll(); } @@ -29,11 +29,11 @@ public class Main implements IForwarder { public void boot(Actor ac) { ac.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); - final List> _actors = new ArrayList<>(); - final Remote me = ac.ref(this); - Remote previous = null; + final List> _actors = new ArrayList<>(); + final Ref me = ac.ref(this); + Ref previous = null; for (int i = 0; i < _nActors; i++) { - Remote current = Actor.forObject(new Forwarder(me, this._nRounds)); + Ref current = Actor.forEntity(new Forwarder(me, this._nRounds)); ac.link(current.getActor()); _actors.add(current); if (previous != null) { @@ -49,7 +49,7 @@ public class Main implements IForwarder { } @Override - public void setPeer(Remote peer) { + public void setPeer(Ref peer) { // Do nothing. }