Direct reference to peers in Forwarder actors
This commit is contained in:
parent
26e29ac1ce
commit
76c1ad3268
|
@ -4,27 +4,23 @@ import java.util.List;
|
||||||
// import java.util.Random;
|
// import java.util.Random;
|
||||||
|
|
||||||
public class Forwarder implements IForwarder {
|
public class Forwarder implements IForwarder {
|
||||||
private final int _index;
|
|
||||||
private final List<IForwarder> _actors;
|
|
||||||
private final IForwarder _main;
|
private final IForwarder _main;
|
||||||
private final int _nActors;
|
|
||||||
private final int _nRounds;
|
private final int _nRounds;
|
||||||
// private final Random _random;
|
private IForwarder _peer = null;
|
||||||
|
|
||||||
public Forwarder(int index, List<IForwarder> actors, IForwarder main, int nActors, int nRounds) {
|
public Forwarder(IForwarder main, int nRounds) {
|
||||||
this._index = index;
|
|
||||||
this._actors = actors;
|
|
||||||
this._main = main;
|
this._main = main;
|
||||||
this._nActors = nActors;
|
|
||||||
this._nRounds = nRounds;
|
this._nRounds = nRounds;
|
||||||
// this._random = new Random();
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPeer(IForwarder peer) {
|
||||||
|
this._peer = peer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleMessage(int hopCount) {
|
public void handleMessage(int hopCount) {
|
||||||
int index = (this._index + 1) % this._nActors;
|
IForwarder target = hopCount >= this._nRounds - 1 ? _main : _peer;
|
||||||
// int index = _random.nextInt(this._nActors);
|
|
||||||
IForwarder target = hopCount >= this._nRounds - 1 ? _main : this._actors.get(index);
|
|
||||||
target.handleMessage(hopCount + 1);
|
target.handleMessage(hopCount + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,5 +3,6 @@ package org.syndicate_lang.actors.example.example2;
|
||||||
import org.syndicate_lang.actors.Actor;
|
import org.syndicate_lang.actors.Actor;
|
||||||
|
|
||||||
public interface IForwarder {
|
public interface IForwarder {
|
||||||
|
void setPeer(IForwarder peer);
|
||||||
void handleMessage(int hopCount);
|
void handleMessage(int hopCount);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,24 @@ public class Main implements IForwarder {
|
||||||
Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
|
Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
|
||||||
List<IForwarder> _actors = new ArrayList<>();
|
List<IForwarder> _actors = new ArrayList<>();
|
||||||
final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class);
|
final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class);
|
||||||
|
IForwarder previous = null;
|
||||||
for (int i = 0; i < _nActors; i++) {
|
for (int i = 0; i < _nActors; i++) {
|
||||||
Remote<IForwarder> a = Actor.forObject(new Forwarder(i, _actors, me, this._nActors, this._nRounds));
|
Remote<IForwarder> a = Actor.forObject(new Forwarder(me, this._nRounds));
|
||||||
a.getActor().link();
|
a.getActor().link();
|
||||||
_actors.add(a.asyncProxy(IForwarder.class));
|
IForwarder current = a.asyncProxy(IForwarder.class);
|
||||||
|
_actors.add(current);
|
||||||
|
if (previous != null) current.setPeer(previous);
|
||||||
|
previous = current;
|
||||||
}
|
}
|
||||||
|
_actors.get(0).setPeer(_actors.get(_nActors - 1));
|
||||||
Actor.log().info("Start");
|
Actor.log().info("Start");
|
||||||
this._startTime = System.currentTimeMillis();
|
this._startTime = System.currentTimeMillis();
|
||||||
_actors.forEach((a) -> a.handleMessage(0));
|
_actors.forEach(a -> a.handleMessage(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPeer(IForwarder peer) {
|
||||||
|
// Do nothing.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -46,7 +56,7 @@ public class Main implements IForwarder {
|
||||||
this._remainingToReceive--;
|
this._remainingToReceive--;
|
||||||
if (this._remainingToReceive == 0) {
|
if (this._remainingToReceive == 0) {
|
||||||
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
|
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
|
||||||
long nMessages = _nActors * _nRounds;
|
long nMessages = (long) _nActors * (long) _nRounds;
|
||||||
double hz = nMessages / delta;
|
double hz = nMessages / delta;
|
||||||
Actor.current().stop();
|
Actor.current().stop();
|
||||||
Actor.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
|
Actor.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
|
||||||
|
|
Loading…
Reference in New Issue