Links and monitors

This commit is contained in:
Tony Garnock-Jones 2020-12-05 23:50:41 +01:00
parent bb3d822988
commit f8a7d95e55
6 changed files with 184 additions and 69 deletions

View File

@ -1,7 +1,13 @@
package org.syndicate_lang.actors;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -21,6 +27,9 @@ public class Actor implements Executor {
private boolean _alive = true;
private Throwable _exitReason = null;
private boolean _isCounted = true;
private Set<Actor> _links = null;
private Map<Object, Remote<IMonitorHandler>> _monitors = null;
private Consumer<Actor> _exitTrap = null;
public static Actor current() {
return _currentActor.get();
@ -48,6 +57,15 @@ public class Actor implements Executor {
return new Actor().proxyFor(o);
}
public static<T> Promise<Remote<T>> boot(Supplier<T> f) {
Promise<Remote<T>> p = new Promise<>();
Actor a = new Actor();
a.execute(
() -> p.resolveCalling(() -> Actor.ref(f.get())),
() -> p.rejectWith(new ActorTerminated(a)));
return p;
}
public String getName() {
return _name;
}
@ -134,6 +152,21 @@ public class Actor implements Executor {
} else {
getLogger().log(Level.SEVERE, "Actor terminated with error", reason);
}
Set<Actor> linkedPeers = _links;
if (linkedPeers != null) {
_links = null;
for (var peer : linkedPeers) {
peer.notifyExit(this);
}
}
Map<Object, Remote<IMonitorHandler>> monitoringPeers = _monitors;
if (monitoringPeers != null) {
_monitors = null;
for (var entry : monitoringPeers.entrySet()) {
final var ref = entry.getKey();
entry.getValue().async((h) -> h.handleMonitor(this, ref));
}
}
_releaseCount();
}
}
@ -188,4 +221,64 @@ public class Actor implements Executor {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
}
public void link() {
final Actor peer = Actor.current();
this.linkPeer(peer);
peer.linkPeer(this);
}
public void unlink() {
final Actor peer = Actor.current();
this.unlinkPeer(peer);
peer.unlinkPeer(this);
}
private synchronized void linkPeer(Actor peer) {
if (this._alive) {
if (_links == null) _links = new HashSet<>();
_links.add(peer);
} else {
peer.notifyExit(this);
}
}
private void notifyExit(final Actor exitingPeer) {
this.execute(() -> {
this.unlinkPeer(exitingPeer);
if (this._exitTrap != null) {
this._exitTrap.accept(exitingPeer);
} else {
this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer));
}
});
}
private synchronized void unlinkPeer(Actor peer) {
if (_links != null) {
_links.remove(peer);
if (_links.isEmpty()) {
_links = null;
}
}
}
public void trapExits(Consumer<Actor> handler) {
this._exitTrap = handler;
}
public synchronized Object monitor(Consumer<Actor> handler) {
Object ref = new Object();
monitor(ref, (actor, _ref) -> handler.accept(actor));
return ref;
}
public synchronized void monitor(final Object ref, IMonitorHandler handler) {
if (this._alive) {
if (_monitors == null) _monitors = new HashMap<>();
_monitors.put(ref, Actor.ref(handler));
} else {
Actor.ref(handler).async((h) -> h.handleMonitor(this, ref));
}
}
}

View File

@ -0,0 +1,5 @@
package org.syndicate_lang.actors;
public interface IMonitorHandler {
void handleMonitor(Actor exitingPeer, Object ref);
}

View File

@ -5,13 +5,17 @@ import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class Promise<T> implements Future<T> {
public static enum State {
public static final TimeoutException TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION = new TimeoutException("Waiting for promise resolution");
public enum State {
PENDING,
FULFILLED,
REJECTED
};
}
private volatile State _state = State.PENDING;
private T _value = null;
@ -65,71 +69,79 @@ public class Promise<T> implements Future<T> {
this.resolveWith(null);
}
public synchronized void resolveWith(T t) {
public void resolveWith(T t) {
List<Consumer<T>> worklist = null;
synchronized (this) {
if (this.isPending()) {
this._value = t;
this._state = State.FULFILLED;
var worklist = _resolvers;
worklist = _resolvers;
_resolvers = null;
_rejecters = null;
}
}
if (worklist != null) {
for (var callback : worklist) {
callback.accept(t);
}
}
}
}
public void chain(Promise<T> t) {
if (this == t) {
throw new IllegalArgumentException("cannot chain promise immediately to itself");
}
t.whenFulfilled((v) -> this.resolveWith(v));
t.whenRejected((e) -> this.rejectWith(e));
t.whenFulfilled(this::resolveWith);
t.whenRejected(this::rejectWith);
}
public void rejectWith(Throwable e) {
List<Consumer<Throwable>> worklist = null;
synchronized (this) {
if (this.isPending()) {
this._reason = e;
this._state = State.REJECTED;
var worklist = _rejecters;
worklist = _rejecters;
_resolvers = null;
_rejecters = null;
}
}
if (worklist != null) {
for (var callback : worklist) {
callback.accept(e);
}
}
}
}
public<R> Promise<R> andThen(Function<T, R> ok) {
return this.andThen(ok, null);
}
public void resolveCalling(Supplier<T> f) {
try {
this.resolveWith(f.get());
} catch (Throwable e) {
this.rejectWith(e);
}
}
public synchronized<R> Promise<R> andThen(final Function<T, R> ok, final Function<Throwable, R> fail) {
Actor a0 = Actor.current();
final Actor a = a0 != null ? a0 : new Actor();
Promise<R> p = new Promise<>();
this.whenFulfilled((t) -> a.execute(() -> {
try {
p.resolveWith(ok.apply(t));
} catch (Throwable e) {
p.rejectWith(e);
}
}, () -> p.rejectWith(new ActorTerminated(a))));
this.whenRejected((e) -> a.execute(() -> {
try {
this.whenFulfilled((t) -> a.execute(
() -> p.resolveCalling(() -> ok.apply(t)),
() -> p.rejectWith(new ActorTerminated(a))));
this.whenRejected((e) -> a.execute(
() -> {
if (fail == null) {
p.rejectWith(e);
} else {
p.resolveWith(fail.apply(e));
p.resolveCalling(() -> fail.apply(e));
}
} catch (Throwable e2) {
p.rejectWith(e2);
}
}, () -> p.rejectWith(new ActorTerminated(a))));
},
() -> p.rejectWith(new ActorTerminated(a))));
return p;
}
@ -155,14 +167,30 @@ public class Promise<T> implements Future<T> {
}
public T _await(long delay, TimeUnit unit) throws TimeoutException, InterruptedException {
Actor a = Actor.current();
if (a == null) {
Semaphore s = new Semaphore(0);
this.whenFulfilled((_t) -> s.release());
this.whenRejected((_e) -> s.release());
if (delay == -1) {
s.acquire();
} else {
if (!s.tryAcquire(delay, unit)) {
throw new TimeoutException();
if (!s.tryAcquire(delay, unit)) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
}
} else {
this.whenFulfilled((_t) -> { synchronized (a) { a.notifyAll(); } });
this.whenRejected((_e) -> { synchronized (a) { a.notifyAll(); } });
synchronized (a) {
if (delay == -1) {
while (this.isPending()) a.wait();
} else {
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
while (this.isPending()) {
long now = System.currentTimeMillis();
if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
a.wait(targetTime - now);
}
}
}
}
if (this.isFulfilled()) {

View File

@ -1,7 +1,5 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import java.util.List;
public class Forwarder implements IForwarder {

View File

@ -4,7 +4,4 @@ import org.syndicate_lang.actors.Actor;
public interface IForwarder {
void handleMessage(int hopCount);
default void shutdown() {
Actor.current().stop();
}
}

View File

@ -1,6 +1,7 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Remote;
import java.util.ArrayList;
import java.util.List;
@ -9,8 +10,6 @@ import static java.lang.Integer.parseInt;
public class Main implements IForwarder {
private List<IForwarder> _actors;
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
Actor.forObject(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await();
@ -29,27 +28,22 @@ public class Main implements IForwarder {
public void boot() {
Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
this._actors = new ArrayList<>();
IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class);
List<IForwarder> _actors = new ArrayList<>();
final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class);
for (int i = 0; i < _nActors; i++) {
this._actors.add(Actor.forObject(
new Forwarder(i, this._actors, me, this._nRounds))
.asyncProxy(IForwarder.class));
// Actor.log().info(this._actors.get(this._actors.size()-1).toString());
Remote<IForwarder> a = Actor.forObject(new Forwarder(i, _actors, me, this._nRounds));
a.getActor().link();
_actors.add(a.asyncProxy(IForwarder.class));
}
Actor.log().info("Start");
this._actors.forEach((a) -> a.handleMessage(0));
_actors.forEach((a) -> a.handleMessage(0));
}
@Override
public void handleMessage(int hopCount) {
this._remainingToReceive--;
// Actor.log().info(String.format("hopCount: %d, remainingToReceive: %d",
// hopCount,
// this._remainingToReceive));
if (this._remainingToReceive == 0) {
this._actors.forEach(IForwarder::shutdown);
this.shutdown();
Actor.current().stop();
Actor.log().info("Stop after " + (_nActors * _nRounds) + " messages");
}
}