syndicate-java/src/main/java/org/syndicate_lang/actors/Turn.java

148 lines
4.4 KiB
Java

package org.syndicate_lang.actors;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class Turn {
private final static AtomicLong nextHandle = new AtomicLong(0);
private final Actor _actor;
private Map<Actor, List<Consumer<Turn>>> _pending = null;
private boolean _complete = false;
public static void forActor(Actor a, Consumer<Turn> f) {
Turn.forActor(a, f, false);
}
public static void forActor(Actor a, Consumer<Turn> f, boolean zombieTurn) {
if ((a.getExitReason() == null) == zombieTurn) return;
Turn t = new Turn(a);
try {
f.accept(t);
t.commit();
} catch (Exception e) {
a.stop(false, e);
}
}
private Turn(Actor a) {
this._actor = a;
}
public Actor getActor() {
return _actor;
}
private void commit() {
if (_pending != null) {
_pending.forEach((ac, q) -> ac.execute(() -> q.forEach(f -> Turn.forActor(ac, f))));
_pending = null;
}
_complete = true;
}
private void enqueue(Actor target, Consumer<Turn> action) {
if (_complete) throw new IllegalStateException("Attempt to reuse a committed Turn");
if (_pending == null) _pending = new HashMap<>();
_pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action);
}
public void freshen(Consumer<Turn> action) {
if (!_complete) throw new IllegalStateException(("Attempt to freshen a non-stale Turn"));
Turn.forActor(this._actor, action);
}
public Ref ref(IEntity o) {
return _actor.ref(o);
}
public void spawn(Consumer<Turn> bootProc) {
this.spawn(bootProc, new HashSet<>());
}
public void spawn(Consumer<Turn> bootProc, Set<Long> initialAssertions) {
this.enqueue(this._actor, t -> {
Map<Long, Ref> newOutbound = new HashMap<>();
initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k)));
Actor newActor = new Actor(null, newOutbound);
newActor.execute(() -> Turn.forActor(newActor, bootProc));
});
}
public void quit() {
this._actor.stop();
}
public void crash(Throwable e) {
this._actor.stop(false, e);
}
public Long assert_(Ref target, Object assertion) {
Long h = Turn.nextHandle.getAndIncrement();
_assert_(target, assertion, h);
return h;
}
private void _assert_(Ref target, Object assertion, Long h) {
Object a = assertion; // TODO: runRewrites from target
if (a != null) {
this.enqueue(target.getActor(), t -> {
_actor._injectOutbound(h, target);
target.getEntity().assert_(t, a, h);
});
}
}
public void retract_(Long h) {
if (h == null) return;
Ref peer = this._actor._lookupOutbound(h);
if (peer == null) return;
_retract_(h, peer);
}
public Long replace_(Ref peer, Long h, Object assertion) {
var newHandle = assert_(peer, assertion);
retract_(h);
return newHandle;
}
void _retract_(Long handle, Ref peer) {
this.enqueue(peer.getActor(), t -> {
this._actor._extractOutbound(handle);
peer.getEntity().retract_(t, handle);
});
}
public void sync_(Ref peer, Consumer<Turn> k) {
this._sync_(peer, this.ref(new Entity() {
public void message_(Turn t, Object _message) {
k.accept(t);
}
}));
}
private void _sync_(Ref peer, Ref callback) {
this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback));
}
public void message_(Ref peer, Object body) {
Object a = body; // TODO runRewrites
if (a != null) {
this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body));
}
}
public void later(long delayMilliseconds, Consumer<Turn> action) {
_actor.later(delayMilliseconds, () -> Turn.forActor(_actor, action));
}
public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) {
return every(0, periodMilliseconds, action);
}
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) {
return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> Turn.forActor(_actor, action));
}
}