Compare commits
31 Commits
Author | SHA1 | Date |
---|---|---|
Tony Garnock-Jones | 27ab682bca | |
Tony Garnock-Jones | 6d94e728af | |
Tony Garnock-Jones | 7526caa179 | |
Tony Garnock-Jones | e483a57df9 | |
Tony Garnock-Jones | 90147b4970 | |
Tony Garnock-Jones | 9c7b71fa51 | |
Tony Garnock-Jones | 882b47602d | |
Tony Garnock-Jones | d91bb6dc02 | |
Tony Garnock-Jones | 9e971de6b0 | |
Tony Garnock-Jones | 94646a92de | |
Tony Garnock-Jones | d1acf60d1b | |
Tony Garnock-Jones | 072119d6c0 | |
Tony Garnock-Jones | 6904ef76df | |
Tony Garnock-Jones | 636da3f28f | |
Tony Garnock-Jones | 732bb0066d | |
Tony Garnock-Jones | 7d9174c363 | |
Tony Garnock-Jones | fd8b445482 | |
Tony Garnock-Jones | 5401d2d8e9 | |
Tony Garnock-Jones | f55fd2c668 | |
Tony Garnock-Jones | c79724a098 | |
Tony Garnock-Jones | f08dccaf07 | |
Tony Garnock-Jones | e37ec7e642 | |
Tony Garnock-Jones | de6b5dfe18 | |
Tony Garnock-Jones | e7d7942e9b | |
Tony Garnock-Jones | 85c106b6ca | |
Tony Garnock-Jones | cb3fd44529 | |
Tony Garnock-Jones | 637f1da5f4 | |
Tony Garnock-Jones | c7718b3ddd | |
Tony Garnock-Jones | d6c8a80995 | |
Tony Garnock-Jones | af611e1b2b | |
Tony Garnock-Jones | 2ad5ac75ee |
|
@ -1,2 +1,3 @@
|
|||
out/
|
||||
doc/
|
||||
/out/
|
||||
/doc/
|
||||
/profile.jfr
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" project-jdk-name="11" project-jdk-type="JavaSDK">
|
||||
<component name="ProjectRootManager" version="2" languageLevel="JDK_16" project-jdk-name="17" project-jdk-type="JavaSDK">
|
||||
<output url="file://$PROJECT_DIR$/out" />
|
||||
</component>
|
||||
</project>
|
12
Makefile
12
Makefile
|
@ -1,3 +1,6 @@
|
|||
NACTORS=1000000
|
||||
NROUNDS=200
|
||||
|
||||
all:
|
||||
ant jar
|
||||
|
||||
|
@ -5,7 +8,10 @@ clean:
|
|||
ant clean
|
||||
|
||||
ziprun: all
|
||||
java -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main 1000000 200
|
||||
java -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main $(NACTORS) $(NROUNDS)
|
||||
|
||||
ziprun1000: all
|
||||
java -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main 1000000 1000
|
||||
profile.jfr: all
|
||||
java -XX:StartFlightRecording=settings=profile,filename=$@ -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main $(NACTORS) $(NROUNDS)
|
||||
|
||||
ziprun1000:
|
||||
$(MAKE) NROUNDS=1000 ziprun
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
public abstract class AbstractProxy<T> implements InvocationHandler {
|
||||
protected final Remote<T> _ref;
|
||||
|
||||
public AbstractProxy(Remote<T> ref) {
|
||||
this._ref = ref;
|
||||
}
|
||||
|
||||
public Remote<T> ref() {
|
||||
return this._ref;
|
||||
}
|
||||
|
||||
abstract boolean isSync();
|
||||
|
||||
private static Method toStringMethod;
|
||||
|
||||
static {
|
||||
try {
|
||||
toStringMethod = Object.class.getMethod("toString");
|
||||
} catch (NoSuchMethodException e) {
|
||||
toStringMethod = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args) {
|
||||
if (method.equals(toStringMethod)) {
|
||||
return this._ref.toString();
|
||||
}
|
||||
return dispatch(method, args);
|
||||
}
|
||||
|
||||
abstract Object dispatch(Method method, Object[] args);
|
||||
}
|
|
@ -1,42 +1,43 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat.
|
||||
* I represent the shared execution context for a collection of objects; I am roughly analogous to the E concept of a Vat.
|
||||
*/
|
||||
public class Actor implements Executor {
|
||||
private final static ThreadLocal<Actor> _currentActor = new ThreadLocal<>();
|
||||
public class Actor extends ForkJoinTask<Void> {
|
||||
private final static AtomicLong _count = new AtomicLong(0);
|
||||
private final static AtomicLong _actorId = new AtomicLong(0);
|
||||
protected final static ExecutorService _executor = Executors.newWorkStealingPool();
|
||||
protected final static ForkJoinPool _executor = new ForkJoinPool(
|
||||
Runtime.getRuntime().availableProcessors(),
|
||||
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
|
||||
null,
|
||||
true
|
||||
);
|
||||
protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
|
||||
private final String _name;
|
||||
private final Logger _logger;
|
||||
final Turn _turn = new Turn(this);
|
||||
|
||||
private boolean _alive = true;
|
||||
private Throwable _exitReason = null;
|
||||
private boolean _isCounted = true;
|
||||
private Set<Actor> _links = null;
|
||||
private Map<Object, Remote<BiConsumer<Actor, Object>>> _monitors = null;
|
||||
private Consumer<Actor> _exitTrap = null;
|
||||
private Map<Long, Ref> _outbound;
|
||||
|
||||
private final static class WorkItem extends AtomicReference<WorkItem> {
|
||||
Runnable work;
|
||||
public final static class WorkItem extends AtomicReference<WorkItem> {
|
||||
Consumer<Turn> work;
|
||||
Runnable ifNotAlive;
|
||||
|
||||
public WorkItem(Runnable work, Runnable ifNotAlive) {
|
||||
public WorkItem(Consumer<Turn> work, Runnable ifNotAlive) {
|
||||
this.work = work;
|
||||
this.ifNotAlive = ifNotAlive;
|
||||
}
|
||||
|
@ -51,38 +52,31 @@ public class Actor implements Executor {
|
|||
private final AtomicReference<WorkItem> tail = new AtomicReference<>(head);
|
||||
private final AtomicLong workItemCount = new AtomicLong(0);
|
||||
|
||||
public static Actor current() {
|
||||
return _currentActor.get();
|
||||
}
|
||||
|
||||
public static Logger log() {
|
||||
return current().getLogger();
|
||||
}
|
||||
|
||||
public static<T> Remote<T> ref(T o) {
|
||||
return current().proxyFor(o);
|
||||
}
|
||||
|
||||
public Actor() {
|
||||
this("" + _actorId.incrementAndGet());
|
||||
this(null, null);
|
||||
}
|
||||
|
||||
public Actor(String debugName) {
|
||||
this._name = debugName;
|
||||
this(debugName, null);
|
||||
}
|
||||
|
||||
public Actor(String debugName, Map<Long, Ref> outbound) {
|
||||
this._name = debugName == null ? "" + _actorId.incrementAndGet() : debugName;
|
||||
this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")");
|
||||
this._outbound = outbound == null ? new HashMap<>() : outbound;
|
||||
_count.incrementAndGet();
|
||||
}
|
||||
|
||||
public static<T> Remote<T> forObject(T o) {
|
||||
return new Actor().proxyFor(o);
|
||||
public static Ref forEntity(IEntity o) {
|
||||
return new Actor().ref(o);
|
||||
}
|
||||
|
||||
public static<T> Promise<Remote<T>> boot(ThrowingSupplier<T> f) {
|
||||
Promise<Remote<T>> p = new Promise<>();
|
||||
Actor a = new Actor();
|
||||
a.execute(
|
||||
() -> p.resolveCalling(() -> Actor.ref(f.get())),
|
||||
() -> p.rejectWith(new ActorTerminated(a)));
|
||||
public static Promise<Ref> boot(Function<Turn, IEntity> f) {
|
||||
final Promise<Ref> p = new Promise<>();
|
||||
final Actor a = new Actor();
|
||||
a.execute(new WorkItem(
|
||||
t -> p.resolveCalling(() -> a.ref(f.apply(t))),
|
||||
() -> p.rejectWith(new ActorTerminated(a))));
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -94,7 +88,7 @@ public class Actor implements Executor {
|
|||
return _exitReason;
|
||||
}
|
||||
|
||||
public Logger getLogger() {
|
||||
public Logger log() {
|
||||
return _logger;
|
||||
}
|
||||
|
||||
|
@ -102,9 +96,8 @@ public class Actor implements Executor {
|
|||
return _alive && !_isCounted;
|
||||
}
|
||||
|
||||
public synchronized Actor daemonize() {
|
||||
this._releaseCount();
|
||||
return this;
|
||||
public void daemonize() {
|
||||
this.scheduleTurn(_t -> this._releaseCount());
|
||||
}
|
||||
|
||||
private void _releaseCount() {
|
||||
|
@ -118,53 +111,67 @@ public class Actor implements Executor {
|
|||
}
|
||||
}
|
||||
|
||||
void _injectOutbound(Long handle, Ref peer) {
|
||||
synchronized (_outbound) {
|
||||
_outbound.put(handle, peer);
|
||||
}
|
||||
}
|
||||
|
||||
Ref _lookupOutbound(Long handle) {
|
||||
synchronized (_outbound) {
|
||||
return _outbound.get(handle);
|
||||
}
|
||||
}
|
||||
|
||||
Ref _extractOutbound(Long handle) {
|
||||
synchronized (_outbound) {
|
||||
return _outbound.remove(handle);
|
||||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return super.toString() + "(" + this._name + ")";
|
||||
}
|
||||
|
||||
public<T> Remote<T> proxyFor(T o) {
|
||||
return new Remote<>(this, o);
|
||||
public<T> Ref ref(IEntity o) {
|
||||
return new Ref(this, o);
|
||||
}
|
||||
|
||||
private void _performSync(Runnable work, Runnable ifNotAlive) {
|
||||
synchronized (this) {
|
||||
_currentActor.set(this);
|
||||
try {
|
||||
_perform(work, ifNotAlive);
|
||||
} finally {
|
||||
_currentActor.set(null);
|
||||
}
|
||||
public Actor scheduleTurn(Consumer<Turn> f) {
|
||||
this.execute(new WorkItem(f, null));
|
||||
return this;
|
||||
}
|
||||
|
||||
void _performTurn(Consumer<Turn> f) {
|
||||
try {
|
||||
f.accept(_turn);
|
||||
_turn.commit();
|
||||
} catch (Exception e) {
|
||||
this.stop(false, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void _perform(Runnable work, Runnable ifNotAlive) {
|
||||
private void _perform(WorkItem item) {
|
||||
if (!_alive) {
|
||||
if (ifNotAlive != null) ifNotAlive.run();
|
||||
if (item.ifNotAlive != null) item.ifNotAlive.run();
|
||||
} else {
|
||||
try {
|
||||
work.run();
|
||||
} catch (Throwable exn) {
|
||||
this._stop(false, exn);
|
||||
}
|
||||
this._performTurn(item.work);
|
||||
}
|
||||
}
|
||||
|
||||
public Promise<?> stop() {
|
||||
return stop(null);
|
||||
return stop(true, null);
|
||||
}
|
||||
|
||||
public Promise<?> stop(Throwable reason) {
|
||||
if (current() == this) {
|
||||
this._stop(true, reason);
|
||||
return Promise.resolved();
|
||||
} else {
|
||||
Promise<?> p = new Promise<>();
|
||||
this.execute(() -> {
|
||||
this._stop(true, reason);
|
||||
p.resolve();
|
||||
}, p::resolve);
|
||||
return p;
|
||||
}
|
||||
return stop(true, reason);
|
||||
}
|
||||
public Promise<?> stop(boolean normally, Throwable reason) {
|
||||
Promise<?> p = new Promise<>();
|
||||
this.execute(new WorkItem(
|
||||
_f -> { this._stop(normally, reason); p.resolve(); },
|
||||
p::resolve));
|
||||
return p;
|
||||
}
|
||||
|
||||
private synchronized void _stop(boolean normally, Throwable reason) {
|
||||
|
@ -172,86 +179,71 @@ public class Actor implements Executor {
|
|||
_alive = false;
|
||||
_exitReason = reason;
|
||||
if (normally) {
|
||||
getLogger().log(Level.FINE, "Actor stopped", reason);
|
||||
log().log(Level.FINE, "Actor stopped", reason);
|
||||
} else {
|
||||
getLogger().log(Level.SEVERE, "Actor terminated with error", reason);
|
||||
log().log(Level.SEVERE, "Actor terminated with error", reason);
|
||||
}
|
||||
Set<Actor> linkedPeers = _links;
|
||||
if (linkedPeers != null) {
|
||||
_links = null;
|
||||
for (var peer : linkedPeers) {
|
||||
peer.notifyExit(this);
|
||||
this._performTurn(t -> {
|
||||
synchronized(_outbound) {
|
||||
_outbound.forEach(t::_retract_);
|
||||
}
|
||||
}
|
||||
Map<Object, Remote<BiConsumer<Actor, Object>>> monitoringPeers = _monitors;
|
||||
if (monitoringPeers != null) {
|
||||
_monitors = null;
|
||||
for (var entry : monitoringPeers.entrySet()) {
|
||||
final var ref = entry.getKey();
|
||||
entry.getValue().async((h) -> h.accept(this, ref));
|
||||
}
|
||||
}
|
||||
});
|
||||
_releaseCount();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable work) {
|
||||
this.execute(work, null);
|
||||
synchronized void _runWorkItems() {
|
||||
long batch = workItemCount.get();
|
||||
while (batch > 0) {
|
||||
for (int count = 0; count < batch; count++) {
|
||||
WorkItem i = null;
|
||||
while (i == null) i = head.get();
|
||||
head = i;
|
||||
this._perform(i);
|
||||
i.clear();
|
||||
}
|
||||
batch = workItemCount.addAndGet(-batch);
|
||||
}
|
||||
}
|
||||
|
||||
public void execute(Runnable work, Runnable ifNotAlive) {
|
||||
{
|
||||
WorkItem i = new WorkItem(work, ifNotAlive);
|
||||
tail.getAndSet(i).set(i);
|
||||
}
|
||||
@Override public final Void getRawResult() { return null; }
|
||||
@Override public final void setRawResult(Void v) { }
|
||||
@Override public final boolean exec() { this._runWorkItems(); return false; }
|
||||
|
||||
public void execute(WorkItem item) {
|
||||
tail.getAndSet(item).set(item);
|
||||
if (workItemCount.getAndIncrement() == 0) {
|
||||
_executor.execute(() -> {
|
||||
synchronized (this) {
|
||||
_currentActor.set(this);
|
||||
try {
|
||||
long batch = workItemCount.get();
|
||||
while (batch > 0) {
|
||||
for (int count = 0; count < batch; count++) {
|
||||
WorkItem i = null;
|
||||
while (i == null) i = head.get();
|
||||
head = i;
|
||||
this._perform(i.work, i.ifNotAlive);
|
||||
i.clear();
|
||||
}
|
||||
batch = workItemCount.addAndGet(-batch);
|
||||
}
|
||||
} finally {
|
||||
_currentActor.set(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
_executor.execute(this);
|
||||
}
|
||||
}
|
||||
|
||||
public void later(long delayMilliseconds, Runnable work) {
|
||||
this.later(delayMilliseconds, work, null);
|
||||
}
|
||||
|
||||
public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) {
|
||||
public void later(long delayMilliseconds, WorkItem item) {
|
||||
if (delayMilliseconds == 0) {
|
||||
this.execute(work, ifNotAlive);
|
||||
this.execute(item);
|
||||
} else {
|
||||
_scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS);
|
||||
_scheduledExecutor.schedule(
|
||||
() -> this.execute(item),
|
||||
delayMilliseconds,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
public PeriodicTimer every(long periodMilliseconds, Runnable f) {
|
||||
return every(0, periodMilliseconds, f);
|
||||
}
|
||||
|
||||
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) {
|
||||
return new PeriodicTimer(
|
||||
_scheduledExecutor.scheduleAtFixedRate(
|
||||
() -> this._performSync(f, null),
|
||||
initialDelayMilliseconds,
|
||||
periodMilliseconds,
|
||||
TimeUnit.MILLISECONDS));
|
||||
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> f) {
|
||||
final var self = this;
|
||||
final var callback = new Runnable() {
|
||||
PeriodicTimer timer;
|
||||
public void run() {
|
||||
while (this.timer == null) {} // There's a race. Spin.
|
||||
self.execute(new WorkItem(f, this.timer::cancel));
|
||||
}
|
||||
};
|
||||
callback.timer = new PeriodicTimer(
|
||||
_scheduledExecutor.scheduleAtFixedRate(
|
||||
callback,
|
||||
initialDelayMilliseconds,
|
||||
periodMilliseconds,
|
||||
TimeUnit.MILLISECONDS));
|
||||
return callback.timer;
|
||||
}
|
||||
|
||||
public static void awaitAll() throws InterruptedException {
|
||||
|
@ -273,63 +265,7 @@ public class Actor implements Executor {
|
|||
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
|
||||
}
|
||||
|
||||
public void link() {
|
||||
final Actor peer = Actor.current();
|
||||
this.linkPeer(peer);
|
||||
peer.linkPeer(this);
|
||||
}
|
||||
|
||||
public void unlink() {
|
||||
final Actor peer = Actor.current();
|
||||
this.unlinkPeer(peer);
|
||||
peer.unlinkPeer(this);
|
||||
}
|
||||
|
||||
private synchronized void linkPeer(Actor peer) {
|
||||
if (this._alive) {
|
||||
if (_links == null) _links = new HashSet<>();
|
||||
_links.add(peer);
|
||||
} else {
|
||||
peer.notifyExit(this);
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyExit(final Actor exitingPeer) {
|
||||
this.execute(() -> {
|
||||
this.unlinkPeer(exitingPeer);
|
||||
if (this._exitTrap != null) {
|
||||
this._exitTrap.accept(exitingPeer);
|
||||
} else {
|
||||
this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private synchronized void unlinkPeer(Actor peer) {
|
||||
if (_links != null) {
|
||||
_links.remove(peer);
|
||||
if (_links.isEmpty()) {
|
||||
_links = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void trapExits(Consumer<Actor> handler) {
|
||||
this._exitTrap = handler;
|
||||
}
|
||||
|
||||
public synchronized Object monitor(Consumer<Actor> handler) {
|
||||
Object ref = new Object();
|
||||
monitor(ref, (actor, _ref) -> handler.accept(actor));
|
||||
return ref;
|
||||
}
|
||||
|
||||
public synchronized void monitor(final Object ref, BiConsumer<Actor, Object> handler) {
|
||||
if (this._alive) {
|
||||
if (_monitors == null) _monitors = new HashMap<>();
|
||||
_monitors.put(ref, Actor.ref(handler));
|
||||
} else {
|
||||
Actor.ref(handler).async((h) -> h.accept(this, ref));
|
||||
}
|
||||
public boolean isAlive() {
|
||||
return _alive;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
public class AsyncProxy<T> extends AbstractProxy<T> {
|
||||
public AsyncProxy(Remote<T> ref) {
|
||||
super(ref);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object dispatch(Method method, Object[] args) {
|
||||
if (method.getReturnType().equals(void.class)) {
|
||||
this._ref.async((v) -> {
|
||||
try {
|
||||
method.invoke(v, args);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new ProxyFailure(e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new ProxyFailure(e.getCause());
|
||||
}
|
||||
});
|
||||
return null;
|
||||
} else {
|
||||
System.err.println(method.getReturnType());
|
||||
throw new UnsupportedOperationException(
|
||||
"Cannot invoke non-void-returning method '" + method + "' asynchronously via AsyncProxy");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSync() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
public class Entity implements IEntity {
|
||||
@Override
|
||||
public void assert_(Turn turn, Object assertion, Long handle) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void retract_(Turn turn, Long handle) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync_(Turn turn, Ref peer) {
|
||||
turn.message_(peer, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void message_(Turn turn, Object body) {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
public interface IEntity {
|
||||
void assert_(Turn turn, Object assertion, Long handle);
|
||||
void retract_(Turn turn, Long handle);
|
||||
void sync_(Turn turn, Ref peer);
|
||||
void message_(Turn turn, Object body);
|
||||
}
|
|
@ -112,8 +112,8 @@ public class Promise<T> implements Future<T> {
|
|||
}
|
||||
}
|
||||
|
||||
public<R> Promise<R> andThen(Function<T, R> ok) {
|
||||
return this.andThen(ok, null);
|
||||
public<R> Promise<R> andThen(Actor a, Function<T, R> ok) {
|
||||
return this.andThen(a, ok, null);
|
||||
}
|
||||
|
||||
public void resolveCalling(ThrowingSupplier<T> f) {
|
||||
|
@ -124,23 +124,21 @@ public class Promise<T> implements Future<T> {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized<R> Promise<R> andThen(final Function<T, R> ok, final Function<Throwable, R> fail) {
|
||||
Actor a0 = Actor.current();
|
||||
public synchronized<R> Promise<R> andThen(Actor a0, final Function<T, R> ok, final Function<Throwable, R> fail) {
|
||||
final Actor a = a0 != null ? a0 : new Actor();
|
||||
|
||||
Promise<R> p = new Promise<>();
|
||||
this.whenFulfilled((t) -> a.execute(
|
||||
() -> p.resolveCalling(() -> ok.apply(t)),
|
||||
() -> p.rejectWith(new ActorTerminated(a))));
|
||||
this.whenRejected((e) -> a.execute(
|
||||
() -> {
|
||||
if (fail == null) {
|
||||
p.rejectWith(e);
|
||||
} else {
|
||||
p.resolveCalling(() -> fail.apply(e));
|
||||
}
|
||||
},
|
||||
() -> p.rejectWith(new ActorTerminated(a))));
|
||||
this.whenFulfilled((t) -> a.execute(new Actor.WorkItem(
|
||||
_turn -> p.resolveCalling(() -> ok.apply(t)),
|
||||
() -> p.rejectWith(new ActorTerminated(a)))));
|
||||
this.whenRejected((e) -> a.execute(new Actor.WorkItem(
|
||||
_turn -> {
|
||||
if (fail == null) {
|
||||
p.rejectWith(e);
|
||||
} else {
|
||||
p.resolveCalling(() -> fail.apply(e));
|
||||
}
|
||||
},
|
||||
() -> p.rejectWith(new ActorTerminated(a)))));
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -166,29 +164,17 @@ public class Promise<T> implements Future<T> {
|
|||
}
|
||||
|
||||
public T _await(long delay, TimeUnit unit) throws TimeoutException, InterruptedException {
|
||||
Actor a = Actor.current();
|
||||
if (a == null) {
|
||||
Semaphore s = new Semaphore(0);
|
||||
this.whenFulfilled((_t) -> s.release());
|
||||
this.whenRejected((_e) -> s.release());
|
||||
this.whenFulfilled((_t) -> { synchronized (this) { this.notifyAll(); } });
|
||||
this.whenRejected((_e) -> { synchronized (this) { this.notifyAll(); } });
|
||||
synchronized (this) {
|
||||
if (delay == -1) {
|
||||
s.acquire();
|
||||
while (this.isPending()) this.wait();
|
||||
} else {
|
||||
if (!s.tryAcquire(delay, unit)) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
|
||||
}
|
||||
} else {
|
||||
this.whenFulfilled((_t) -> { synchronized (a) { a.notifyAll(); } });
|
||||
this.whenRejected((_e) -> { synchronized (a) { a.notifyAll(); } });
|
||||
synchronized (a) {
|
||||
if (delay == -1) {
|
||||
while (this.isPending()) a.wait();
|
||||
} else {
|
||||
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
|
||||
while (this.isPending()) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
|
||||
a.wait(targetTime - now);
|
||||
}
|
||||
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
|
||||
while (this.isPending()) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
|
||||
this.wait(targetTime - now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
public class ProxyFailure extends RuntimeException {
|
||||
public ProxyFailure(Throwable t) {
|
||||
super(t);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
public class Ref {
|
||||
private final Actor _actor;
|
||||
private final IEntity _target;
|
||||
|
||||
public Ref(Actor actor, IEntity target) {
|
||||
this._actor = actor;
|
||||
this._target = target;
|
||||
}
|
||||
|
||||
public Actor getActor() {
|
||||
return _actor;
|
||||
}
|
||||
|
||||
IEntity getEntity() {
|
||||
return _target;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")";
|
||||
}
|
||||
}
|
|
@ -1,86 +0,0 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class Remote<T> {
|
||||
private final Actor _actor;
|
||||
private final T _target;
|
||||
|
||||
public Remote(Actor actor, T target) {
|
||||
this._actor = actor;
|
||||
this._target = target;
|
||||
}
|
||||
|
||||
public Actor getActor() {
|
||||
return _actor;
|
||||
}
|
||||
|
||||
public void async(Consumer<T> f) {
|
||||
this._actor.execute(() -> f.accept(this._target));
|
||||
}
|
||||
|
||||
public void async(long delayMilliseconds, Consumer<T> f) {
|
||||
this._actor.later(delayMilliseconds, () -> f.accept(this._target));
|
||||
}
|
||||
|
||||
public Promise<?> syncVoid(Consumer<T> f) {
|
||||
return this.sync((t) -> {
|
||||
f.accept(t);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public Promise<?> syncVoid(long delayMilliseconds, Consumer<T> f) {
|
||||
return this.sync(delayMilliseconds, (t) -> {
|
||||
f.accept(t);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public<R> Promise<R> sync(Function<T, R> f) {
|
||||
Promise<R> p = new Promise<>();
|
||||
this._actor.execute(
|
||||
() -> p.resolveWith(f.apply(this._target)),
|
||||
() -> p.rejectWith(this._actor.getExitReason()));
|
||||
return p;
|
||||
}
|
||||
|
||||
public<R> Promise<R> sync(long delayMilliseconds, Function<T, R> f) {
|
||||
Promise<R> p = new Promise<>();
|
||||
this._actor.later(
|
||||
delayMilliseconds,
|
||||
() -> p.resolveWith(f.apply(this._target)),
|
||||
() -> p.rejectWith(this._actor.getExitReason()));
|
||||
return p;
|
||||
}
|
||||
|
||||
private<I> void checkTargetInstance(Class<I> c) {
|
||||
if (!c.isInstance(this._target)) {
|
||||
throw new IllegalArgumentException("target is not an instance of " + c);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public<I> I syncProxy(Class<I> c) {
|
||||
checkTargetInstance(c);
|
||||
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new SyncProxy<>(this));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public<I> I asyncProxy(Class<I> c) {
|
||||
checkTargetInstance(c);
|
||||
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new AsyncProxy<>(this));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static<I, T extends I> Remote<T> from(I proxy) {
|
||||
return ((AbstractProxy<T>) Proxy.getInvocationHandler(proxy)).ref();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")";
|
||||
}
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
public class SyncProxy<T> extends AbstractProxy<T> {
|
||||
public SyncProxy(Remote<T> ref) {
|
||||
super(ref);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object dispatch(Method method, Object[] args) {
|
||||
return this._ref.sync((v) -> {
|
||||
try {
|
||||
return method.invoke(v, args);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new ProxyFailure(e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new ProxyFailure(e.getCause());
|
||||
}
|
||||
}).await();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSync() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
package org.syndicate_lang.actors;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class Turn {
|
||||
private final static AtomicLong nextHandle = new AtomicLong(0);
|
||||
|
||||
private final Actor _actor;
|
||||
|
||||
private Actor _pendingTarget = null;
|
||||
private Consumer<Turn> _pending0 = null;
|
||||
private Consumer<Turn> _pending1 = null;
|
||||
private Consumer<Turn> _pending2 = null;
|
||||
private Map<Actor, List<Consumer<Turn>>> _pending = null;
|
||||
|
||||
Turn(Actor a) {
|
||||
this._actor = a;
|
||||
}
|
||||
|
||||
public Actor getActor() {
|
||||
return _actor;
|
||||
}
|
||||
|
||||
public Logger log() {
|
||||
return _actor.log();
|
||||
}
|
||||
|
||||
void commit() {
|
||||
if (_pendingTarget != null) {
|
||||
var ac = _pendingTarget;
|
||||
Consumer<Turn> q0 = _pending0, q1 = _pending1, q2 = _pending2;
|
||||
_pendingTarget = null;
|
||||
_pending0 = _pending1 = _pending2 = null;
|
||||
ac.scheduleTurn(t -> {
|
||||
q0.accept(t);
|
||||
if (q1 != null) q1.accept(t);
|
||||
if (q2 != null) q2.accept(t);
|
||||
});
|
||||
} else if (_pending != null) {
|
||||
_pending.forEach((ac, q) -> ac.scheduleTurn(t -> q.forEach(f -> f.accept(t))));
|
||||
_pending = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void enqueue(Actor target, Consumer<Turn> action) {
|
||||
if (_pending == null) {
|
||||
if (_pendingTarget == null) {
|
||||
_pendingTarget = target;
|
||||
_pending0 = action;
|
||||
return;
|
||||
}
|
||||
if (_pendingTarget == target) {
|
||||
if (_pending1 == null) { _pending1 = action; return; }
|
||||
if (_pending2 == null) { _pending2 = action; return; }
|
||||
}
|
||||
_pending = new HashMap<>();
|
||||
var q = new LinkedList<Consumer<Turn>>();
|
||||
_pending.put(_pendingTarget, q);
|
||||
q.add(_pending0);
|
||||
if (_pending1 != null) q.add(_pending1);
|
||||
if (_pending2 != null) q.add(_pending2);
|
||||
_pendingTarget = null;
|
||||
_pending0 = _pending1 = _pending2 = null;
|
||||
}
|
||||
_pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action);
|
||||
}
|
||||
|
||||
public Ref ref(IEntity o) {
|
||||
return _actor.ref(o);
|
||||
}
|
||||
|
||||
public void spawn(Consumer<Turn> bootProc) {
|
||||
this.spawn(bootProc, new HashSet<>());
|
||||
}
|
||||
|
||||
public void spawn(Consumer<Turn> bootProc, Set<Long> initialAssertions) {
|
||||
this.enqueue(this._actor, t -> {
|
||||
Map<Long, Ref> newOutbound = new HashMap<>();
|
||||
initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k)));
|
||||
Actor newActor = new Actor(null, newOutbound);
|
||||
newActor.scheduleTurn(bootProc);
|
||||
});
|
||||
}
|
||||
|
||||
public void quit() {
|
||||
this._actor.stop();
|
||||
}
|
||||
|
||||
public void crash(Throwable e) {
|
||||
this._actor.stop(false, e);
|
||||
}
|
||||
|
||||
public Long assert_(Ref target, Object assertion) {
|
||||
Long h = Turn.nextHandle.getAndIncrement();
|
||||
_assert_(target, assertion, h);
|
||||
return h;
|
||||
}
|
||||
|
||||
private void _assert_(Ref target, Object assertion, Long h) {
|
||||
Object a = assertion; // TODO: runRewrites from target
|
||||
if (a != null) {
|
||||
this.enqueue(target.getActor(), t -> {
|
||||
_actor._injectOutbound(h, target);
|
||||
target.getEntity().assert_(t, a, h);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void retract_(Long h) {
|
||||
if (h == null) return;
|
||||
Ref peer = this._actor._lookupOutbound(h);
|
||||
if (peer == null) return;
|
||||
_retract_(h, peer);
|
||||
}
|
||||
|
||||
public Long replace_(Ref peer, Long h, Object assertion) {
|
||||
var newHandle = assert_(peer, assertion);
|
||||
retract_(h);
|
||||
return newHandle;
|
||||
}
|
||||
|
||||
void _retract_(Long handle, Ref peer) {
|
||||
this.enqueue(peer.getActor(), t -> {
|
||||
this._actor._extractOutbound(handle);
|
||||
peer.getEntity().retract_(t, handle);
|
||||
});
|
||||
}
|
||||
|
||||
public void sync_(Ref peer, Consumer<Turn> k) {
|
||||
this._sync_(peer, this.ref(new Entity() {
|
||||
public void message_(Turn t, Object _message) {
|
||||
k.accept(t);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void _sync_(Ref peer, Ref callback) {
|
||||
this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback));
|
||||
}
|
||||
|
||||
public void message_(Ref peer, Object body) {
|
||||
Object a = body; // TODO runRewrites
|
||||
if (a != null) {
|
||||
this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body));
|
||||
}
|
||||
}
|
||||
|
||||
public void later(Consumer<Turn> action) {
|
||||
this.later(0, action);
|
||||
}
|
||||
|
||||
public void later(long delayMilliseconds, Consumer<Turn> action) {
|
||||
_actor.later(delayMilliseconds, new Actor.WorkItem(action, null));
|
||||
}
|
||||
|
||||
public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) {
|
||||
return every(0, periodMilliseconds, action);
|
||||
}
|
||||
|
||||
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) {
|
||||
return _actor.every(initialDelayMilliseconds, periodMilliseconds, action);
|
||||
}
|
||||
}
|
|
@ -1,6 +1,8 @@
|
|||
package org.syndicate_lang.actors.example.example1;
|
||||
|
||||
public interface IValueHolder<T> {
|
||||
T get();
|
||||
void set(T newValue);
|
||||
}
|
||||
import org.syndicate_lang.actors.Ref;
|
||||
|
||||
public interface IValueHolder {
|
||||
record Get(Ref k) implements IValueHolder {}
|
||||
record Set<T>(T newValue) implements IValueHolder {}
|
||||
}
|
|
@ -1,40 +1,62 @@
|
|||
package org.syndicate_lang.actors.example.example1;
|
||||
|
||||
import org.syndicate_lang.actors.Actor;
|
||||
import org.syndicate_lang.actors.Remote;
|
||||
import org.syndicate_lang.actors.Entity;
|
||||
import org.syndicate_lang.actors.Ref;
|
||||
import org.syndicate_lang.actors.Turn;
|
||||
|
||||
public class Main {
|
||||
public class Main extends Entity {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
Actor.convenientLogging();
|
||||
final var vh = Actor.forObject(new ValueHolder<>("There"));
|
||||
vh.getActor().daemonize();
|
||||
final var m = Actor.forObject(new Main());
|
||||
m.async(10, (m_) -> m_.run(vh));
|
||||
new Actor().scheduleTurn(t -> {
|
||||
final var vh = Actor.forEntity(new ValueHolder<>("There"));
|
||||
vh.getActor().daemonize();
|
||||
Actor.boot(u -> {
|
||||
Main main = new Main();
|
||||
u.later(10, v -> main.run(v, vh));
|
||||
return main;
|
||||
});
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
IValueHolder<String> vv = vh.syncProxy(IValueHolder.class);
|
||||
|
||||
System.out.println("Value: " + vv.get());
|
||||
vv.set("Second");
|
||||
System.out.println("Value: " + vv.get());
|
||||
System.out.println("Underlying: " + Remote.from(vv));
|
||||
t.message_(vh, new IValueHolder.Get(t.ref(new Entity() {
|
||||
public void message_(Turn t0, Object v0) {
|
||||
System.out.println("Value: " + v0);
|
||||
t0.message_(vh, new IValueHolder.Set<>("Second"));
|
||||
t0.message_(vh, new IValueHolder.Get(t0.ref(new Entity() {
|
||||
public void message_(Turn t1, Object v1) {
|
||||
System.out.println("Value: " + v1);
|
||||
System.out.println("Cell: " + vh);
|
||||
}
|
||||
})));
|
||||
}
|
||||
})));
|
||||
}).daemonize();
|
||||
|
||||
Actor.awaitAll();
|
||||
System.out.println("Overall main returning");
|
||||
}
|
||||
|
||||
private Actor me;
|
||||
private int greetingCounter = 0;
|
||||
|
||||
public void run(Remote<ValueHolder<String>> vh) {
|
||||
this.greet((String) vh.syncProxy(IValueHolder.class).get());
|
||||
vh.syncVoid((v) -> v.set("World"));
|
||||
Actor.current().every(1000, () -> {
|
||||
if (greetingCounter >= 3) Actor.current().stop();
|
||||
this.greet(vh.sync(ValueHolder::get).await());
|
||||
});
|
||||
public void run(Turn t, Ref vh) {
|
||||
me = t.getActor();
|
||||
t.message_(vh, new IValueHolder.Get(t.ref(new Entity() {
|
||||
public void message_(Turn t1, Object v1) {
|
||||
Main.this.greet((String) v1);
|
||||
t1.message_(vh, new IValueHolder.Set<>("World"));
|
||||
t1.every(1000, t -> {
|
||||
if (greetingCounter >= 3) t.quit();
|
||||
t.message_(vh, new IValueHolder.Get(t.ref(new Entity() {
|
||||
public void message_(Turn t2, Object v2) {
|
||||
Main.this.greet((String) v2);
|
||||
}
|
||||
})));
|
||||
});
|
||||
}
|
||||
})));
|
||||
}
|
||||
|
||||
public void greet(String who) {
|
||||
Actor.log().info((greetingCounter++) + ": Hi " + who);
|
||||
me.log().info((greetingCounter++) + ": Hi " + who);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,17 +1,23 @@
|
|||
package org.syndicate_lang.actors.example.example1;
|
||||
|
||||
public class ValueHolder<T> implements IValueHolder<T> {
|
||||
import org.syndicate_lang.actors.Entity;
|
||||
import org.syndicate_lang.actors.Turn;
|
||||
|
||||
public class ValueHolder<T> extends Entity {
|
||||
private T value;
|
||||
|
||||
public ValueHolder(T initialValue) {
|
||||
this.value = initialValue;
|
||||
}
|
||||
|
||||
public T get() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
public void set(T newValue) {
|
||||
this.value = newValue;
|
||||
@Override
|
||||
public void message_(Turn turn, Object body) {
|
||||
if (body instanceof IValueHolder.Get op) {
|
||||
turn.message_(op.k(), this.value);
|
||||
} else if (body instanceof IValueHolder.Set) {
|
||||
@SuppressWarnings("unchecked")
|
||||
IValueHolder.Set<T> op = (IValueHolder.Set<T>) body;
|
||||
this.value = op.newValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,25 +1,31 @@
|
|||
package org.syndicate_lang.actors.example.example2;
|
||||
|
||||
import org.syndicate_lang.actors.Remote;
|
||||
import org.syndicate_lang.actors.Entity;
|
||||
import org.syndicate_lang.actors.Ref;
|
||||
import org.syndicate_lang.actors.Turn;
|
||||
|
||||
public class Forwarder implements IForwarder {
|
||||
private final Remote<IForwarder> _main;
|
||||
public class Forwarder extends Entity {
|
||||
private final Ref _main;
|
||||
private final int _nRounds;
|
||||
private Remote<IForwarder> _peer = null;
|
||||
private Ref _peer = null;
|
||||
|
||||
public Forwarder(Remote<IForwarder> main, int nRounds) {
|
||||
public Forwarder(Ref main, int nRounds) {
|
||||
this._main = main;
|
||||
this._nRounds = nRounds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPeer(Remote<IForwarder> peer) {
|
||||
this._peer = peer;
|
||||
public void retract_(Turn turn, Long handle) {
|
||||
turn.quit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(final int hopCount) {
|
||||
Remote<IForwarder> target = hopCount >= this._nRounds - 1 ? _main : _peer;
|
||||
target.async(f -> f.handleMessage(hopCount + 1));
|
||||
public void message_(Turn turn, Object body) {
|
||||
if (body instanceof IForwarder.HandleMessage op) {
|
||||
Ref target = op.hopCount() >= this._nRounds - 1 ? _main : _peer;
|
||||
turn.message_(target, new IForwarder.HandleMessage(op.hopCount() + 1));
|
||||
} else if (body instanceof IForwarder.SetPeer op) {
|
||||
this._peer = op.peer();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package org.syndicate_lang.actors.example.example2;
|
||||
|
||||
import org.syndicate_lang.actors.Remote;
|
||||
import org.syndicate_lang.actors.Ref;
|
||||
|
||||
public interface IForwarder {
|
||||
void setPeer(Remote<IForwarder> peer);
|
||||
void handleMessage(int hopCount);
|
||||
record SetPeer(Ref peer) implements IForwarder {}
|
||||
record HandleMessage(int hopCount) implements IForwarder {}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
package org.syndicate_lang.actors.example.example2;
|
||||
|
||||
public record ILink() {}
|
|
@ -1,18 +1,21 @@
|
|||
package org.syndicate_lang.actors.example.example2;
|
||||
|
||||
import org.syndicate_lang.actors.Actor;
|
||||
import org.syndicate_lang.actors.Remote;
|
||||
import org.syndicate_lang.actors.Entity;
|
||||
import org.syndicate_lang.actors.Ref;
|
||||
import org.syndicate_lang.actors.Turn;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static java.lang.Integer.parseInt;
|
||||
|
||||
public class Main implements IForwarder {
|
||||
|
||||
public class Main extends Entity {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
Actor.convenientLogging();
|
||||
Actor.forObject(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await();
|
||||
new Actor()
|
||||
.scheduleTurn(t -> new Main(parseInt(args[0]), parseInt(args[1])).boot(t))
|
||||
.daemonize();
|
||||
Actor.awaitAll();
|
||||
}
|
||||
|
||||
|
@ -27,44 +30,44 @@ public class Main implements IForwarder {
|
|||
this._remainingToReceive = nActors;
|
||||
}
|
||||
|
||||
public void boot() {
|
||||
Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
|
||||
final List<Remote<IForwarder>> _actors = new ArrayList<>();
|
||||
final Remote<IForwarder> me = Actor.ref(this);
|
||||
Remote<IForwarder> previous = null;
|
||||
public void boot(Turn t) {
|
||||
t.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
|
||||
final List<Ref> _forwarders = new ArrayList<>();
|
||||
final Ref me = t.ref(this);
|
||||
Ref previous = null;
|
||||
for (int i = 0; i < _nActors; i++) {
|
||||
Remote<IForwarder> current = Actor.forObject(new Forwarder(me, this._nRounds));
|
||||
current.getActor().link();
|
||||
_actors.add(current);
|
||||
Ref current = Actor.forEntity(new Forwarder(me, this._nRounds));
|
||||
t.assert_(current, new ILink());
|
||||
_forwarders.add(current);
|
||||
if (previous != null) {
|
||||
final var p = previous;
|
||||
current.async(f -> f.setPeer(p));
|
||||
t.message_(current, new IForwarder.SetPeer(previous));
|
||||
}
|
||||
previous = current;
|
||||
}
|
||||
_actors.get(0).async(f -> f.setPeer(_actors.get(_nActors - 1)));
|
||||
Actor.log().info("Start");
|
||||
this._startTime = System.currentTimeMillis();
|
||||
_actors.forEach(a -> a.async(f -> f.handleMessage(0)));
|
||||
t.message_(_forwarders.get(0), new IForwarder.SetPeer(_forwarders.get(_nActors - 1)));
|
||||
t.later(t0 -> {
|
||||
t0.log().info("Start");
|
||||
this._startTime = System.currentTimeMillis();
|
||||
_forwarders.forEach(a -> t0.message_(a, new IForwarder.HandleMessage(0)));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPeer(Remote<IForwarder> peer) {
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(int hopCount) {
|
||||
this._remainingToReceive--;
|
||||
if (this._remainingToReceive == 0) {
|
||||
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
|
||||
long nMessages = (long) _nActors * (long) _nRounds;
|
||||
double hz = nMessages / delta;
|
||||
Actor.current().stop();
|
||||
Actor.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
|
||||
nMessages,
|
||||
delta,
|
||||
hz));
|
||||
public void message_(Turn turn, Object body) {
|
||||
if (body instanceof IForwarder.HandleMessage) {
|
||||
this._remainingToReceive--;
|
||||
if (this._remainingToReceive == 0) {
|
||||
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
|
||||
long nMessages = (long) _nActors * (long) _nRounds;
|
||||
double hz = nMessages / delta;
|
||||
turn.quit();
|
||||
turn.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
|
||||
nMessages,
|
||||
delta,
|
||||
hz));
|
||||
}
|
||||
} else if (body instanceof IForwarder.SetPeer) {
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue