Implement ring benchmark; repair some errors

This commit is contained in:
Tony Garnock-Jones 2023-10-28 13:23:29 +02:00
parent c7718b3ddd
commit 637f1da5f4
7 changed files with 70 additions and 56 deletions

View File

@ -168,7 +168,7 @@ public class Actor implements Executor {
} else {
log().log(Level.SEVERE, "Actor terminated with error", reason);
}
Turn.forActor(this, t -> _outbound.forEach(t::_retract_), true);
Turn._forActor(this, t -> _outbound.forEach(t::_retract_));
_releaseCount();
}
}
@ -245,4 +245,8 @@ public class Actor implements Executor {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
}
public boolean isAlive() {
return _alive;
}
}

View File

@ -1,7 +0,0 @@
package org.syndicate_lang.actors;
public class ProxyFailure extends RuntimeException {
public ProxyFailure(Throwable t) {
super(t);
}
}

View File

@ -3,6 +3,7 @@ package org.syndicate_lang.actors;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Logger;
public class Turn {
private final static AtomicLong nextHandle = new AtomicLong(0);
@ -12,11 +13,10 @@ public class Turn {
private boolean _complete = false;
public static void forActor(Actor a, Consumer<Turn> f) {
Turn.forActor(a, f, false);
a.execute(() -> Turn._forActor(a, f));
}
public static void forActor(Actor a, Consumer<Turn> f, boolean zombieTurn) {
if ((a.getExitReason() == null) == zombieTurn) return;
static void _forActor(Actor a, Consumer<Turn> f) {
Turn t = new Turn(a);
try {
f.accept(t);
@ -34,6 +34,10 @@ public class Turn {
return _actor;
}
public Logger log() {
return _actor.log();
}
private void commit() {
if (_pending != null) {
_pending.forEach((ac, q) -> ac.execute(() -> Turn.forActor(ac, t -> q.forEach(f -> f.accept(t)))));
@ -133,6 +137,10 @@ public class Turn {
}
}
public void later(Consumer<Turn> action) {
this.later(0, action);
}
public void later(long delayMilliseconds, Consumer<Turn> action) {
_actor.later(delayMilliseconds, () -> Turn.forActor(_actor, action));
}

View File

@ -1,26 +1,31 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Entity;
import org.syndicate_lang.actors.Ref;
import org.syndicate_lang.actors.Turn;
public class Forwarder implements IForwarder {
private final Ref<IForwarder> _main;
public class Forwarder extends Entity {
private final Ref _main;
private final int _nRounds;
private Ref<IForwarder> _peer = null;
private Ref _peer = null;
public Forwarder(Ref<IForwarder> main, int nRounds) {
public Forwarder(Ref main, int nRounds) {
this._main = main;
this._nRounds = nRounds;
}
@Override
public void setPeer(Ref<IForwarder> peer) {
this._peer = peer;
public void retract_(Turn turn, Long handle) {
turn.quit();
}
@Override
public void handleMessage(Actor _ac, final int hopCount) {
Ref<IForwarder> target = hopCount >= this._nRounds - 1 ? _main : _peer;
target.async((f, ac) -> f.handleMessage(ac, hopCount + 1));
public void message_(Turn turn, Object body) {
if (body instanceof IForwarder.SetPeer op) {
this._peer = op.peer();
} else if (body instanceof IForwarder.HandleMessage op) {
Ref target = op.hopCount() >= this._nRounds - 1 ? _main : _peer;
turn.message_(target, new IForwarder.HandleMessage(op.hopCount() + 1));
}
}
}

View File

@ -1,9 +1,8 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Ref;
public interface IForwarder {
void setPeer(Ref<IForwarder> peer);
void handleMessage(Actor ac, int hopCount);
record SetPeer(Ref peer) implements IForwarder {}
record HandleMessage(int hopCount) implements IForwarder {}
}

View File

@ -0,0 +1,3 @@
package org.syndicate_lang.actors.example.example2;
public record ILink() {}

View File

@ -1,18 +1,21 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Entity;
import org.syndicate_lang.actors.Ref;
import org.syndicate_lang.actors.Turn;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
public class Main implements IForwarder {
public class Main extends Entity {
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
Actor.forEntity(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await();
Turn.forActor(new Actor().daemonize(), t -> {
new Main(parseInt(args[0]), parseInt(args[1])).boot(t);
});
Actor.awaitAll();
}
@ -27,44 +30,43 @@ public class Main implements IForwarder {
this._remainingToReceive = nActors;
}
public void boot(Actor ac) {
ac.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
final List<Ref<IForwarder>> _actors = new ArrayList<>();
final Ref<IForwarder> me = ac.ref(this);
Ref<IForwarder> previous = null;
public void boot(Turn t) {
t.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
final List<Ref> _forwarders = new ArrayList<>();
final Ref me = t.ref(this);
Ref previous = null;
for (int i = 0; i < _nActors; i++) {
Ref<IForwarder> current = Actor.forEntity(new Forwarder(me, this._nRounds));
ac.link(current.getActor());
_actors.add(current);
Ref current = Actor.forEntity(new Forwarder(me, this._nRounds));
t.assert_(current, new ILink());
_forwarders.add(current);
if (previous != null) {
final var p = previous;
current.async((f, _ac) -> f.setPeer(p));
t.message_(current, new IForwarder.SetPeer(previous));
}
previous = current;
}
_actors.get(0).async((f, _ac) -> f.setPeer(_actors.get(_nActors - 1)));
ac.log().info("Start");
t.message_(_forwarders.get(0), new IForwarder.SetPeer(_forwarders.get(_nActors - 1)));
t.log().info("Start");
this._startTime = System.currentTimeMillis();
_actors.forEach(a -> a.async((f, ac2) -> f.handleMessage(ac2, 0)));
t.later(t0 ->
_forwarders.forEach(a -> t0.message_(a, new IForwarder.HandleMessage(0))));
}
@Override
public void setPeer(Ref<IForwarder> peer) {
// Do nothing.
}
@Override
public void handleMessage(Actor ac, int hopCount) {
this._remainingToReceive--;
if (this._remainingToReceive == 0) {
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
long nMessages = (long) _nActors * (long) _nRounds;
double hz = nMessages / delta;
ac.stop();
ac.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
nMessages,
delta,
hz));
public void message_(Turn turn, Object body) {
if (body instanceof IForwarder.SetPeer) {
// Do nothing.
} else if (body instanceof IForwarder.HandleMessage op) {
this._remainingToReceive--;
if (this._remainingToReceive == 0) {
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
long nMessages = (long) _nActors * (long) _nRounds;
double hz = nMessages / delta;
turn.quit();
turn.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
nMessages,
delta,
hz));
}
}
}
}