diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java index 246fa84..671df38 100644 --- a/src/main/java/org/syndicate_lang/actors/Actor.java +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -13,7 +13,7 @@ import java.util.logging.Logger; /** * I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat. */ -public class Actor implements Executor { +public class Actor { private final static AtomicLong _count = new AtomicLong(0); private final static AtomicLong _actorId = new AtomicLong(0); protected final static ExecutorService _executor = Executors.newWorkStealingPool(); @@ -28,11 +28,11 @@ public class Actor implements Executor { private boolean _isCounted = true; private Map _outbound; - private final static class WorkItem extends AtomicReference { - Runnable work; + public final static class WorkItem extends AtomicReference { + Consumer work; Runnable ifNotAlive; - public WorkItem(Runnable work, Runnable ifNotAlive) { + public WorkItem(Consumer work, Runnable ifNotAlive) { this.work = work; this.ifNotAlive = ifNotAlive; } @@ -69,9 +69,9 @@ public class Actor implements Executor { public static Promise boot(Function f) { final Promise p = new Promise<>(); final Actor a = new Actor(); - a.execute( - () -> a.scheduleTurn(t -> p.resolveCalling(() -> a.ref(f.apply(t)))), - () -> p.rejectWith(new ActorTerminated(a))); + a.execute(new WorkItem( + t -> p.resolveCalling(() -> a.ref(f.apply(t))), + () -> p.rejectWith(new ActorTerminated(a)))); return p; } @@ -134,7 +134,7 @@ public class Actor implements Executor { } public void scheduleTurn(Consumer f) { - this.execute(() -> this._performTurn(f)); + this.execute(new WorkItem(f, null)); } void _performTurn(Consumer f) { @@ -146,21 +146,17 @@ public class Actor implements Executor { } } - private void _performSync(Runnable work, Runnable ifNotAlive) { + private void _performSync(WorkItem item) { synchronized (this) { - _perform(work, ifNotAlive); + _perform(item); } } - private void _perform(Runnable work, Runnable ifNotAlive) { + private void _perform(WorkItem item) { if (!_alive) { - if (ifNotAlive != null) ifNotAlive.run(); + if (item.ifNotAlive != null) item.ifNotAlive.run(); } else { - try { - work.run(); - } catch (Throwable exn) { - this._stop(false, exn); - } + this._performTurn(item.work); } } @@ -173,10 +169,9 @@ public class Actor implements Executor { } public Promise stop(boolean normally, Throwable reason) { Promise p = new Promise<>(); - this.execute(() -> { - this._stop(normally, reason); - p.resolve(); - }, p::resolve); + this.execute(new WorkItem( + _f -> { this._stop(normally, reason); p.resolve(); }, + p::resolve)); return p; } @@ -198,16 +193,8 @@ public class Actor implements Executor { } } - @Override - public void execute(Runnable work) { - this.execute(work, null); - } - - public void execute(Runnable work, Runnable ifNotAlive) { - { - WorkItem i = new WorkItem(work, ifNotAlive); - tail.getAndSet(i).set(i); - } + public void execute(WorkItem item) { + tail.getAndSet(item).set(item); if (workItemCount.getAndIncrement() == 0) { _executor.execute(() -> { synchronized (this) { @@ -217,7 +204,7 @@ public class Actor implements Executor { WorkItem i = null; while (i == null) i = head.get(); head = i; - this._perform(i.work, i.ifNotAlive); + this._perform(i); i.clear(); } batch = workItemCount.addAndGet(-batch); @@ -227,29 +214,32 @@ public class Actor implements Executor { } } - public void later(long delayMilliseconds, Runnable work) { - this.later(delayMilliseconds, work, null); - } - - public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) { + public void later(long delayMilliseconds, WorkItem item) { if (delayMilliseconds == 0) { - this.execute(work, ifNotAlive); + this.execute(item); } else { - _scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); + _scheduledExecutor.schedule( + () -> this._performSync(item), + delayMilliseconds, + TimeUnit.MILLISECONDS); } } - public PeriodicTimer every(long periodMilliseconds, Runnable f) { - return every(0, periodMilliseconds, f); - } - - public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { - return new PeriodicTimer( - _scheduledExecutor.scheduleAtFixedRate( - () -> this._performSync(f, null), - initialDelayMilliseconds, - periodMilliseconds, - TimeUnit.MILLISECONDS)); + public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer f) { + final var self = this; + final var callback = new Runnable() { + PeriodicTimer timer; + public void run() { + self._performSync(new WorkItem(f, this.timer::cancel)); + } + }; + callback.timer = new PeriodicTimer( + _scheduledExecutor.scheduleAtFixedRate( + callback, + initialDelayMilliseconds, + periodMilliseconds, + TimeUnit.MILLISECONDS)); + return callback.timer; } public static void awaitAll() throws InterruptedException { diff --git a/src/main/java/org/syndicate_lang/actors/Promise.java b/src/main/java/org/syndicate_lang/actors/Promise.java index 59b512a..a3657ec 100644 --- a/src/main/java/org/syndicate_lang/actors/Promise.java +++ b/src/main/java/org/syndicate_lang/actors/Promise.java @@ -127,18 +127,18 @@ public class Promise implements Future { public synchronized Promise andThen(Actor a0, final Function ok, final Function fail) { final Actor a = a0 != null ? a0 : new Actor(); Promise p = new Promise<>(); - this.whenFulfilled((t) -> a.execute( - () -> p.resolveCalling(() -> ok.apply(t)), - () -> p.rejectWith(new ActorTerminated(a)))); - this.whenRejected((e) -> a.execute( - () -> { - if (fail == null) { - p.rejectWith(e); - } else { - p.resolveCalling(() -> fail.apply(e)); - } - }, - () -> p.rejectWith(new ActorTerminated(a)))); + this.whenFulfilled((t) -> a.execute(new Actor.WorkItem( + _turn -> p.resolveCalling(() -> ok.apply(t)), + () -> p.rejectWith(new ActorTerminated(a))))); + this.whenRejected((e) -> a.execute(new Actor.WorkItem( + _turn -> { + if (fail == null) { + p.rejectWith(e); + } else { + p.resolveCalling(() -> fail.apply(e)); + } + }, + () -> p.rejectWith(new ActorTerminated(a))))); return p; } diff --git a/src/main/java/org/syndicate_lang/actors/Turn.java b/src/main/java/org/syndicate_lang/actors/Turn.java index b3b29d4..21a8c5f 100644 --- a/src/main/java/org/syndicate_lang/actors/Turn.java +++ b/src/main/java/org/syndicate_lang/actors/Turn.java @@ -153,7 +153,7 @@ public class Turn { } public void later(long delayMilliseconds, Consumer action) { - _actor.later(delayMilliseconds, () -> _actor.scheduleTurn(action)); + _actor.later(delayMilliseconds, new Actor.WorkItem(action, null)); } public PeriodicTimer every(long periodMilliseconds, Consumer action) { @@ -161,6 +161,6 @@ public class Turn { } public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer action) { - return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> _actor.scheduleTurn(action)); + return _actor.every(initialDelayMilliseconds, periodMilliseconds, action); } }