package org.syndicate_lang.actors.example.example2; import org.syndicate_lang.actors.Actor; import org.syndicate_lang.actors.Entity; import org.syndicate_lang.actors.Ref; import org.syndicate_lang.actors.Turn; import java.util.ArrayList; import java.util.List; import static java.lang.Integer.parseInt; public class Main extends Entity { public static void main(String[] args) throws InterruptedException { Actor.convenientLogging(); Turn.forActor(new Actor().daemonize(), t -> { new Main(parseInt(args[0]), parseInt(args[1])).boot(t); }); Actor.awaitAll(); } public final int _nActors; public final int _nRounds; public int _remainingToReceive; public long _startTime = 0; public Main(int nActors, int nRounds) { this._nActors = nActors; this._nRounds = nRounds; this._remainingToReceive = nActors; } public void boot(Turn t) { t.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); final List _forwarders = new ArrayList<>(); final Ref me = t.ref(this); Ref previous = null; for (int i = 0; i < _nActors; i++) { Ref current = Actor.forEntity(new Forwarder(me, this._nRounds)); t.assert_(current, new ILink()); _forwarders.add(current); if (previous != null) { t.message_(current, new IForwarder.SetPeer(previous)); } previous = current; } t.message_(_forwarders.get(0), new IForwarder.SetPeer(_forwarders.get(_nActors - 1))); t.later(t0 -> { t0.log().info("Start"); this._startTime = System.currentTimeMillis(); _forwarders.forEach(a -> t0.message_(a, new IForwarder.HandleMessage(0))); }); } @Override public void message_(Turn turn, Object body) { if (body instanceof IForwarder.HandleMessage op) { this._remainingToReceive--; if (this._remainingToReceive == 0) { double delta = (System.currentTimeMillis() - this._startTime) / 1000.0; long nMessages = (long) _nActors * (long) _nRounds; double hz = nMessages / delta; turn.quit(); turn.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz", nMessages, delta, hz)); } } else if (body instanceof IForwarder.SetPeer) { // Do nothing. } } }