package org.syndicate_lang.actors; import java.util.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class Turn { private final static AtomicLong nextHandle = new AtomicLong(0); private final Actor _actor; private Map>> _pending = null; private boolean _complete = false; public static void forActor(Actor a, Consumer f) { Turn.forActor(a, f, false); } public static void forActor(Actor a, Consumer f, boolean zombieTurn) { if ((a.getExitReason() == null) == zombieTurn) return; Turn t = new Turn(a); try { f.accept(t); t.commit(); } catch (Exception e) { a.stop(false, e); } } private Turn(Actor a) { this._actor = a; } public Actor getActor() { return _actor; } private void commit() { if (_pending != null) { _pending.forEach((ac, q) -> ac.execute(() -> Turn.forActor(ac, t -> q.forEach(f -> f.accept(t))))); _pending = null; } _complete = true; } private void enqueue(Actor target, Consumer action) { if (_complete) throw new IllegalStateException("Attempt to reuse a committed Turn"); if (_pending == null) _pending = new HashMap<>(); _pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action); } public void freshen(Consumer action) { if (!_complete) throw new IllegalStateException(("Attempt to freshen a non-stale Turn")); Turn.forActor(this._actor, action); } public Ref ref(IEntity o) { return _actor.ref(o); } public void spawn(Consumer bootProc) { this.spawn(bootProc, new HashSet<>()); } public void spawn(Consumer bootProc, Set initialAssertions) { this.enqueue(this._actor, t -> { Map newOutbound = new HashMap<>(); initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k))); Actor newActor = new Actor(null, newOutbound); newActor.execute(() -> Turn.forActor(newActor, bootProc)); }); } public void quit() { this._actor.stop(); } public void crash(Throwable e) { this._actor.stop(false, e); } public Long assert_(Ref target, Object assertion) { Long h = Turn.nextHandle.getAndIncrement(); _assert_(target, assertion, h); return h; } private void _assert_(Ref target, Object assertion, Long h) { Object a = assertion; // TODO: runRewrites from target if (a != null) { this.enqueue(target.getActor(), t -> { _actor._injectOutbound(h, target); target.getEntity().assert_(t, a, h); }); } } public void retract_(Long h) { if (h == null) return; Ref peer = this._actor._lookupOutbound(h); if (peer == null) return; _retract_(h, peer); } public Long replace_(Ref peer, Long h, Object assertion) { var newHandle = assert_(peer, assertion); retract_(h); return newHandle; } void _retract_(Long handle, Ref peer) { this.enqueue(peer.getActor(), t -> { this._actor._extractOutbound(handle); peer.getEntity().retract_(t, handle); }); } public void sync_(Ref peer, Consumer k) { this._sync_(peer, this.ref(new Entity() { public void message_(Turn t, Object _message) { k.accept(t); } })); } private void _sync_(Ref peer, Ref callback) { this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback)); } public void message_(Ref peer, Object body) { Object a = body; // TODO runRewrites if (a != null) { this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body)); } } public void later(long delayMilliseconds, Consumer action) { _actor.later(delayMilliseconds, () -> Turn.forActor(_actor, action)); } public PeriodicTimer every(long periodMilliseconds, Consumer action) { return every(0, periodMilliseconds, action); } public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer action) { return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> Turn.forActor(_actor, action)); } }