diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java index 9e5225a..5020d2f 100644 --- a/src/main/java/org/syndicate_lang/actors/Actor.java +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -168,7 +168,7 @@ public class Actor implements Executor { } else { log().log(Level.SEVERE, "Actor terminated with error", reason); } - Turn.forActor(this, t -> _outbound.forEach(t::_retract_), true); + Turn._forActor(this, t -> _outbound.forEach(t::_retract_)); _releaseCount(); } } @@ -245,4 +245,8 @@ public class Actor implements Executor { System.setProperty("java.util.logging.SimpleFormatter.format", "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n"); } + + public boolean isAlive() { + return _alive; + } } diff --git a/src/main/java/org/syndicate_lang/actors/ProxyFailure.java b/src/main/java/org/syndicate_lang/actors/ProxyFailure.java deleted file mode 100644 index af01894..0000000 --- a/src/main/java/org/syndicate_lang/actors/ProxyFailure.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.syndicate_lang.actors; - -public class ProxyFailure extends RuntimeException { - public ProxyFailure(Throwable t) { - super(t); - } -} diff --git a/src/main/java/org/syndicate_lang/actors/Turn.java b/src/main/java/org/syndicate_lang/actors/Turn.java index b482261..691f41b 100644 --- a/src/main/java/org/syndicate_lang/actors/Turn.java +++ b/src/main/java/org/syndicate_lang/actors/Turn.java @@ -3,6 +3,7 @@ package org.syndicate_lang.actors; import java.util.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.logging.Logger; public class Turn { private final static AtomicLong nextHandle = new AtomicLong(0); @@ -12,11 +13,10 @@ public class Turn { private boolean _complete = false; public static void forActor(Actor a, Consumer f) { - Turn.forActor(a, f, false); + a.execute(() -> Turn._forActor(a, f)); } - public static void forActor(Actor a, Consumer f, boolean zombieTurn) { - if ((a.getExitReason() == null) == zombieTurn) return; + static void _forActor(Actor a, Consumer f) { Turn t = new Turn(a); try { f.accept(t); @@ -34,6 +34,10 @@ public class Turn { return _actor; } + public Logger log() { + return _actor.log(); + } + private void commit() { if (_pending != null) { _pending.forEach((ac, q) -> ac.execute(() -> Turn.forActor(ac, t -> q.forEach(f -> f.accept(t))))); @@ -133,6 +137,10 @@ public class Turn { } } + public void later(Consumer action) { + this.later(0, action); + } + public void later(long delayMilliseconds, Consumer action) { _actor.later(delayMilliseconds, () -> Turn.forActor(_actor, action)); } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java index d751393..44b0ddc 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java @@ -1,26 +1,31 @@ package org.syndicate_lang.actors.example.example2; -import org.syndicate_lang.actors.Actor; +import org.syndicate_lang.actors.Entity; import org.syndicate_lang.actors.Ref; +import org.syndicate_lang.actors.Turn; -public class Forwarder implements IForwarder { - private final Ref _main; +public class Forwarder extends Entity { + private final Ref _main; private final int _nRounds; - private Ref _peer = null; + private Ref _peer = null; - public Forwarder(Ref main, int nRounds) { + public Forwarder(Ref main, int nRounds) { this._main = main; this._nRounds = nRounds; } @Override - public void setPeer(Ref peer) { - this._peer = peer; + public void retract_(Turn turn, Long handle) { + turn.quit(); } @Override - public void handleMessage(Actor _ac, final int hopCount) { - Ref target = hopCount >= this._nRounds - 1 ? _main : _peer; - target.async((f, ac) -> f.handleMessage(ac, hopCount + 1)); + public void message_(Turn turn, Object body) { + if (body instanceof IForwarder.SetPeer op) { + this._peer = op.peer(); + } else if (body instanceof IForwarder.HandleMessage op) { + Ref target = op.hopCount() >= this._nRounds - 1 ? _main : _peer; + turn.message_(target, new IForwarder.HandleMessage(op.hopCount() + 1)); + } } } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java index a10296f..197edcc 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java @@ -1,9 +1,8 @@ package org.syndicate_lang.actors.example.example2; -import org.syndicate_lang.actors.Actor; import org.syndicate_lang.actors.Ref; public interface IForwarder { - void setPeer(Ref peer); - void handleMessage(Actor ac, int hopCount); + record SetPeer(Ref peer) implements IForwarder {} + record HandleMessage(int hopCount) implements IForwarder {} } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/ILink.java b/src/test/java/org/syndicate_lang/actors/example/example2/ILink.java new file mode 100644 index 0000000..256a70d --- /dev/null +++ b/src/test/java/org/syndicate_lang/actors/example/example2/ILink.java @@ -0,0 +1,3 @@ +package org.syndicate_lang.actors.example.example2; + +public record ILink() {} diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java index 15ec4c7..dcfd8e3 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java @@ -1,18 +1,21 @@ package org.syndicate_lang.actors.example.example2; import org.syndicate_lang.actors.Actor; +import org.syndicate_lang.actors.Entity; import org.syndicate_lang.actors.Ref; +import org.syndicate_lang.actors.Turn; import java.util.ArrayList; import java.util.List; import static java.lang.Integer.parseInt; -public class Main implements IForwarder { - +public class Main extends Entity { public static void main(String[] args) throws InterruptedException { Actor.convenientLogging(); - Actor.forEntity(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await(); + Turn.forActor(new Actor().daemonize(), t -> { + new Main(parseInt(args[0]), parseInt(args[1])).boot(t); + }); Actor.awaitAll(); } @@ -27,44 +30,43 @@ public class Main implements IForwarder { this._remainingToReceive = nActors; } - public void boot(Actor ac) { - ac.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); - final List> _actors = new ArrayList<>(); - final Ref me = ac.ref(this); - Ref previous = null; + public void boot(Turn t) { + t.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); + final List _forwarders = new ArrayList<>(); + final Ref me = t.ref(this); + Ref previous = null; for (int i = 0; i < _nActors; i++) { - Ref current = Actor.forEntity(new Forwarder(me, this._nRounds)); - ac.link(current.getActor()); - _actors.add(current); + Ref current = Actor.forEntity(new Forwarder(me, this._nRounds)); + t.assert_(current, new ILink()); + _forwarders.add(current); if (previous != null) { - final var p = previous; - current.async((f, _ac) -> f.setPeer(p)); + t.message_(current, new IForwarder.SetPeer(previous)); } previous = current; } - _actors.get(0).async((f, _ac) -> f.setPeer(_actors.get(_nActors - 1))); - ac.log().info("Start"); + t.message_(_forwarders.get(0), new IForwarder.SetPeer(_forwarders.get(_nActors - 1))); + t.log().info("Start"); this._startTime = System.currentTimeMillis(); - _actors.forEach(a -> a.async((f, ac2) -> f.handleMessage(ac2, 0))); + t.later(t0 -> + _forwarders.forEach(a -> t0.message_(a, new IForwarder.HandleMessage(0)))); } @Override - public void setPeer(Ref peer) { - // Do nothing. - } - - @Override - public void handleMessage(Actor ac, int hopCount) { - this._remainingToReceive--; - if (this._remainingToReceive == 0) { - double delta = (System.currentTimeMillis() - this._startTime) / 1000.0; - long nMessages = (long) _nActors * (long) _nRounds; - double hz = nMessages / delta; - ac.stop(); - ac.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz", - nMessages, - delta, - hz)); + public void message_(Turn turn, Object body) { + if (body instanceof IForwarder.SetPeer) { + // Do nothing. + } else if (body instanceof IForwarder.HandleMessage op) { + this._remainingToReceive--; + if (this._remainingToReceive == 0) { + double delta = (System.currentTimeMillis() - this._startTime) / 1000.0; + long nMessages = (long) _nActors * (long) _nRounds; + double hz = nMessages / delta; + turn.quit(); + turn.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz", + nMessages, + delta, + hz)); + } } } }