syndicate-java/src/main/java/org/syndicate_lang/actors/Turn.java

181 lines
5.5 KiB
Java

package org.syndicate_lang.actors;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Logger;
public class Turn {
private final static AtomicLong nextHandle = new AtomicLong(0);
private final Actor _actor;
private Actor _pendingTarget = null;
private Consumer<Turn> _pending0 = null;
private Consumer<Turn> _pending1 = null;
private Consumer<Turn> _pending2 = null;
private Map<Actor, List<Consumer<Turn>>> _pending = null;
public static void forActor(Actor a, Consumer<Turn> f) {
a.execute(() -> Turn._forActor(a, f));
}
static void _forActor(Actor a, Consumer<Turn> f) {
Turn t = a._turn;
try {
f.accept(t);
t.commit();
} catch (Exception e) {
a.stop(false, e);
}
}
Turn(Actor a) {
this._actor = a;
}
public Actor getActor() {
return _actor;
}
public Logger log() {
return _actor.log();
}
private void commit() {
if (_pendingTarget != null) {
var ac = _pendingTarget;
Consumer<Turn> q0 = _pending0, q1 = _pending1, q2 = _pending2;
_pendingTarget = null;
_pending0 = _pending1 = _pending2 = null;
ac.execute(() -> Turn.forActor(ac, t -> {
q0.accept(t);
if (q1 != null) q1.accept(t);
if (q2 != null) q2.accept(t);
}));
} else if (_pending != null) {
_pending.forEach((ac, q) -> ac.execute(() -> Turn.forActor(ac, t -> q.forEach(f -> f.accept(t)))));
_pending = null;
}
}
private void enqueue(Actor target, Consumer<Turn> action) {
if (_pending == null) {
if (_pendingTarget == null) {
_pendingTarget = target;
_pending0 = action;
return;
}
if (_pendingTarget == target) {
if (_pending1 == null) { _pending1 = action; return; }
if (_pending2 == null) { _pending2 = action; return; }
}
_pending = new HashMap<>();
var q = new LinkedList<Consumer<Turn>>();
_pending.put(_pendingTarget, q);
q.add(_pending0);
if (_pending1 != null) q.add(_pending1);
if (_pending2 != null) q.add(_pending2);
_pendingTarget = null;
_pending0 = _pending1 = _pending2 = null;
}
_pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action);
}
public Ref ref(IEntity o) {
return _actor.ref(o);
}
public void spawn(Consumer<Turn> bootProc) {
this.spawn(bootProc, new HashSet<>());
}
public void spawn(Consumer<Turn> bootProc, Set<Long> initialAssertions) {
this.enqueue(this._actor, t -> {
Map<Long, Ref> newOutbound = new HashMap<>();
initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k)));
Actor newActor = new Actor(null, newOutbound);
newActor.execute(() -> Turn.forActor(newActor, bootProc));
});
}
public void quit() {
this._actor.stop();
}
public void crash(Throwable e) {
this._actor.stop(false, e);
}
public Long assert_(Ref target, Object assertion) {
Long h = Turn.nextHandle.getAndIncrement();
_assert_(target, assertion, h);
return h;
}
private void _assert_(Ref target, Object assertion, Long h) {
Object a = assertion; // TODO: runRewrites from target
if (a != null) {
this.enqueue(target.getActor(), t -> {
_actor._injectOutbound(h, target);
target.getEntity().assert_(t, a, h);
});
}
}
public void retract_(Long h) {
if (h == null) return;
Ref peer = this._actor._lookupOutbound(h);
if (peer == null) return;
_retract_(h, peer);
}
public Long replace_(Ref peer, Long h, Object assertion) {
var newHandle = assert_(peer, assertion);
retract_(h);
return newHandle;
}
void _retract_(Long handle, Ref peer) {
this.enqueue(peer.getActor(), t -> {
this._actor._extractOutbound(handle);
peer.getEntity().retract_(t, handle);
});
}
public void sync_(Ref peer, Consumer<Turn> k) {
this._sync_(peer, this.ref(new Entity() {
public void message_(Turn t, Object _message) {
k.accept(t);
}
}));
}
private void _sync_(Ref peer, Ref callback) {
this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback));
}
public void message_(Ref peer, Object body) {
Object a = body; // TODO runRewrites
if (a != null) {
this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body));
}
}
public void later(Consumer<Turn> action) {
this.later(0, action);
}
public void later(long delayMilliseconds, Consumer<Turn> action) {
_actor.later(delayMilliseconds, () -> Turn.forActor(_actor, action));
}
public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) {
return every(0, periodMilliseconds, action);
}
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) {
return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> Turn.forActor(_actor, action));
}
}