2021-04-14 19:42:50 +00:00
|
|
|
package org.syndicate_lang.actors;
|
|
|
|
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import java.util.function.Consumer;
|
2023-10-28 11:23:29 +00:00
|
|
|
import java.util.logging.Logger;
|
2021-04-14 19:42:50 +00:00
|
|
|
|
|
|
|
public class Turn {
|
|
|
|
private final static AtomicLong nextHandle = new AtomicLong(0);
|
|
|
|
|
|
|
|
private final Actor _actor;
|
2023-10-28 14:03:46 +00:00
|
|
|
private boolean _complete = false;
|
|
|
|
|
2023-10-28 13:45:27 +00:00
|
|
|
private Actor _pendingTarget = null;
|
2023-10-28 14:03:46 +00:00
|
|
|
private Consumer<Turn>[] _pendingQ = null;
|
2021-04-14 19:42:50 +00:00
|
|
|
private Map<Actor, List<Consumer<Turn>>> _pending = null;
|
|
|
|
|
|
|
|
public static void forActor(Actor a, Consumer<Turn> f) {
|
2023-10-28 11:23:29 +00:00
|
|
|
a.execute(() -> Turn._forActor(a, f));
|
2021-04-14 19:42:50 +00:00
|
|
|
}
|
|
|
|
|
2023-10-28 11:23:29 +00:00
|
|
|
static void _forActor(Actor a, Consumer<Turn> f) {
|
2021-04-14 19:42:50 +00:00
|
|
|
Turn t = new Turn(a);
|
|
|
|
try {
|
|
|
|
f.accept(t);
|
|
|
|
t.commit();
|
|
|
|
} catch (Exception e) {
|
|
|
|
a.stop(false, e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private Turn(Actor a) {
|
|
|
|
this._actor = a;
|
|
|
|
}
|
|
|
|
|
|
|
|
public Actor getActor() {
|
|
|
|
return _actor;
|
|
|
|
}
|
|
|
|
|
2023-10-28 11:23:29 +00:00
|
|
|
public Logger log() {
|
|
|
|
return _actor.log();
|
|
|
|
}
|
|
|
|
|
2021-04-14 19:42:50 +00:00
|
|
|
private void commit() {
|
|
|
|
if (_pending != null) {
|
2021-05-31 10:05:00 +00:00
|
|
|
_pending.forEach((ac, q) -> ac.execute(() -> Turn.forActor(ac, t -> q.forEach(f -> f.accept(t)))));
|
2021-04-14 19:42:50 +00:00
|
|
|
_pending = null;
|
2023-10-28 13:45:27 +00:00
|
|
|
} else if (_pendingTarget != null) {
|
|
|
|
var ac = _pendingTarget;
|
|
|
|
var q = _pendingQ;
|
|
|
|
_pendingTarget = null;
|
|
|
|
_pendingQ = null;
|
2023-10-28 14:03:46 +00:00
|
|
|
ac.execute(() -> Turn.forActor(ac, t -> {
|
|
|
|
for (var f : q) if (f != null) f.accept(t);
|
|
|
|
}));
|
2021-04-14 19:42:50 +00:00
|
|
|
}
|
|
|
|
_complete = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void enqueue(Actor target, Consumer<Turn> action) {
|
|
|
|
if (_complete) throw new IllegalStateException("Attempt to reuse a committed Turn");
|
2023-10-28 13:37:10 +00:00
|
|
|
if (_pending == null) {
|
2023-10-28 13:45:27 +00:00
|
|
|
if (_pendingTarget == null) {
|
|
|
|
_pendingTarget = target;
|
2023-10-28 14:03:46 +00:00
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
var q = (Consumer<Turn>[]) new Consumer<?>[3];
|
|
|
|
_pendingQ = q;
|
|
|
|
_pendingQ[0] = action;
|
2023-10-28 13:37:10 +00:00
|
|
|
return;
|
|
|
|
}
|
2023-10-28 13:45:27 +00:00
|
|
|
if (_pendingTarget == target) {
|
2023-10-28 14:03:46 +00:00
|
|
|
if (_pendingQ[1] == null) { _pendingQ[1] = action; return; }
|
|
|
|
if (_pendingQ[2] == null) { _pendingQ[2] = action; return; }
|
2023-10-28 13:37:10 +00:00
|
|
|
}
|
|
|
|
_pending = new HashMap<>();
|
2023-10-28 14:03:46 +00:00
|
|
|
var q = new LinkedList<Consumer<Turn>>();
|
|
|
|
_pending.put(_pendingTarget, q);
|
|
|
|
q.add(_pendingQ[0]);
|
|
|
|
if (_pendingQ[1] != null) q.add(_pendingQ[1]);
|
|
|
|
if (_pendingQ[2] != null) q.add(_pendingQ[2]);
|
2023-10-28 13:45:27 +00:00
|
|
|
_pendingTarget = null;
|
|
|
|
_pendingQ = null;
|
2023-10-28 13:37:10 +00:00
|
|
|
}
|
2021-04-14 19:42:50 +00:00
|
|
|
_pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action);
|
|
|
|
}
|
|
|
|
|
|
|
|
public void freshen(Consumer<Turn> action) {
|
|
|
|
if (!_complete) throw new IllegalStateException(("Attempt to freshen a non-stale Turn"));
|
|
|
|
Turn.forActor(this._actor, action);
|
|
|
|
}
|
|
|
|
|
|
|
|
public Ref ref(IEntity o) {
|
|
|
|
return _actor.ref(o);
|
|
|
|
}
|
|
|
|
|
|
|
|
public void spawn(Consumer<Turn> bootProc) {
|
|
|
|
this.spawn(bootProc, new HashSet<>());
|
|
|
|
}
|
|
|
|
|
|
|
|
public void spawn(Consumer<Turn> bootProc, Set<Long> initialAssertions) {
|
|
|
|
this.enqueue(this._actor, t -> {
|
|
|
|
Map<Long, Ref> newOutbound = new HashMap<>();
|
|
|
|
initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k)));
|
|
|
|
Actor newActor = new Actor(null, newOutbound);
|
|
|
|
newActor.execute(() -> Turn.forActor(newActor, bootProc));
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
public void quit() {
|
|
|
|
this._actor.stop();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void crash(Throwable e) {
|
|
|
|
this._actor.stop(false, e);
|
|
|
|
}
|
|
|
|
|
|
|
|
public Long assert_(Ref target, Object assertion) {
|
|
|
|
Long h = Turn.nextHandle.getAndIncrement();
|
|
|
|
_assert_(target, assertion, h);
|
|
|
|
return h;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void _assert_(Ref target, Object assertion, Long h) {
|
|
|
|
Object a = assertion; // TODO: runRewrites from target
|
|
|
|
if (a != null) {
|
|
|
|
this.enqueue(target.getActor(), t -> {
|
|
|
|
_actor._injectOutbound(h, target);
|
|
|
|
target.getEntity().assert_(t, a, h);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public void retract_(Long h) {
|
|
|
|
if (h == null) return;
|
|
|
|
Ref peer = this._actor._lookupOutbound(h);
|
|
|
|
if (peer == null) return;
|
|
|
|
_retract_(h, peer);
|
|
|
|
}
|
|
|
|
|
|
|
|
public Long replace_(Ref peer, Long h, Object assertion) {
|
|
|
|
var newHandle = assert_(peer, assertion);
|
|
|
|
retract_(h);
|
|
|
|
return newHandle;
|
|
|
|
}
|
|
|
|
|
|
|
|
void _retract_(Long handle, Ref peer) {
|
|
|
|
this.enqueue(peer.getActor(), t -> {
|
|
|
|
this._actor._extractOutbound(handle);
|
|
|
|
peer.getEntity().retract_(t, handle);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-04-15 08:49:05 +00:00
|
|
|
public void sync_(Ref peer, Consumer<Turn> k) {
|
2021-04-14 19:42:50 +00:00
|
|
|
this._sync_(peer, this.ref(new Entity() {
|
|
|
|
public void message_(Turn t, Object _message) {
|
2021-04-15 08:49:05 +00:00
|
|
|
k.accept(t);
|
2021-04-14 19:42:50 +00:00
|
|
|
}
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
private void _sync_(Ref peer, Ref callback) {
|
|
|
|
this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback));
|
|
|
|
}
|
|
|
|
|
|
|
|
public void message_(Ref peer, Object body) {
|
|
|
|
Object a = body; // TODO runRewrites
|
|
|
|
if (a != null) {
|
|
|
|
this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body));
|
|
|
|
}
|
|
|
|
}
|
2021-04-15 08:49:05 +00:00
|
|
|
|
2023-10-28 11:23:29 +00:00
|
|
|
public void later(Consumer<Turn> action) {
|
|
|
|
this.later(0, action);
|
|
|
|
}
|
|
|
|
|
2021-04-15 08:49:05 +00:00
|
|
|
public void later(long delayMilliseconds, Consumer<Turn> action) {
|
|
|
|
_actor.later(delayMilliseconds, () -> Turn.forActor(_actor, action));
|
|
|
|
}
|
|
|
|
|
|
|
|
public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) {
|
|
|
|
return every(0, periodMilliseconds, action);
|
|
|
|
}
|
|
|
|
|
|
|
|
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) {
|
|
|
|
return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> Turn.forActor(_actor, action));
|
|
|
|
}
|
2021-04-14 19:42:50 +00:00
|
|
|
}
|