package org.syndicate_lang.actors.example.example2; import org.syndicate_lang.actors.Actor; import org.syndicate_lang.actors.Remote; import java.util.ArrayList; import java.util.List; import static java.lang.Integer.parseInt; public class Main implements IForwarder { public static void main(String[] args) throws InterruptedException { Actor.convenientLogging(); Actor.forObject(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await(); Actor.awaitAll(); } public final int _nActors; public final int _nRounds; public int _remainingToReceive; public long _startTime = 0; public Main(int nActors, int nRounds) { this._nActors = nActors; this._nRounds = nRounds; this._remainingToReceive = nActors; } public void boot() { Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors()); List _actors = new ArrayList<>(); final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class); for (int i = 0; i < _nActors; i++) { Remote a = Actor.forObject(new Forwarder(i, _actors, me, this._nRounds)); a.getActor().link(); _actors.add(a.asyncProxy(IForwarder.class)); } Actor.log().info("Start"); this._startTime = System.currentTimeMillis(); _actors.forEach((a) -> a.handleMessage(0)); } @Override public void handleMessage(int hopCount) { this._remainingToReceive--; if (this._remainingToReceive == 0) { long delta = System.currentTimeMillis() - this._startTime; long nMessages = _nActors * _nRounds; double hz = nMessages / (delta / 1000.0); Actor.current().stop(); Actor.log().info("Stop after " + nMessages + " messages; " + hz + " Hz"); } } }