package org.syndicate_lang.actors; import java.util.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.logging.Logger; public class Turn { private final static AtomicLong nextHandle = new AtomicLong(0); private final Actor _actor; private Actor _pendingTarget = null; private Consumer _pending0 = null; private Consumer _pending1 = null; private Consumer _pending2 = null; private Map>> _pending = null; Turn(Actor a) { this._actor = a; } public Actor getActor() { return _actor; } public Logger log() { return _actor.log(); } void commit() { if (_pendingTarget != null) { var ac = _pendingTarget; Consumer q0 = _pending0, q1 = _pending1, q2 = _pending2; _pendingTarget = null; _pending0 = _pending1 = _pending2 = null; ac.scheduleTurn(t -> { q0.accept(t); if (q1 != null) q1.accept(t); if (q2 != null) q2.accept(t); }); } else if (_pending != null) { _pending.forEach((ac, q) -> ac.scheduleTurn(t -> q.forEach(f -> f.accept(t)))); _pending = null; } } private void enqueue(Actor target, Consumer action) { if (_pending == null) { if (_pendingTarget == null) { _pendingTarget = target; _pending0 = action; return; } if (_pendingTarget == target) { if (_pending1 == null) { _pending1 = action; return; } if (_pending2 == null) { _pending2 = action; return; } } _pending = new HashMap<>(); var q = new LinkedList>(); _pending.put(_pendingTarget, q); q.add(_pending0); if (_pending1 != null) q.add(_pending1); if (_pending2 != null) q.add(_pending2); _pendingTarget = null; _pending0 = _pending1 = _pending2 = null; } _pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action); } public Ref ref(IEntity o) { return _actor.ref(o); } public void spawn(Consumer bootProc) { this.spawn(bootProc, new HashSet<>()); } public void spawn(Consumer bootProc, Set initialAssertions) { this.enqueue(this._actor, t -> { Map newOutbound = new HashMap<>(); initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k))); Actor newActor = new Actor(null, newOutbound); newActor.scheduleTurn(bootProc); }); } public void quit() { this._actor.stop(); } public void crash(Throwable e) { this._actor.stop(false, e); } public Long assert_(Ref target, Object assertion) { Long h = Turn.nextHandle.getAndIncrement(); _assert_(target, assertion, h); return h; } private void _assert_(Ref target, Object assertion, Long h) { Object a = assertion; // TODO: runRewrites from target if (a != null) { this.enqueue(target.getActor(), t -> { _actor._injectOutbound(h, target); target.getEntity().assert_(t, a, h); }); } } public void retract_(Long h) { if (h == null) return; Ref peer = this._actor._lookupOutbound(h); if (peer == null) return; _retract_(h, peer); } public Long replace_(Ref peer, Long h, Object assertion) { var newHandle = assert_(peer, assertion); retract_(h); return newHandle; } void _retract_(Long handle, Ref peer) { this.enqueue(peer.getActor(), t -> { this._actor._extractOutbound(handle); peer.getEntity().retract_(t, handle); }); } public void sync_(Ref peer, Consumer k) { this._sync_(peer, this.ref(new Entity() { public void message_(Turn t, Object _message) { k.accept(t); } })); } private void _sync_(Ref peer, Ref callback) { this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback)); } public void message_(Ref peer, Object body) { Object a = body; // TODO runRewrites if (a != null) { this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body)); } } public void later(Consumer action) { this.later(0, action); } public void later(long delayMilliseconds, Consumer action) { _actor.later(delayMilliseconds, new Actor.WorkItem(action, null)); } public PeriodicTimer every(long periodMilliseconds, Consumer action) { return every(0, periodMilliseconds, action); } public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer action) { return _actor.every(initialDelayMilliseconds, periodMilliseconds, action); } }