From c2c80215d4a4a7b0a686995adb88c1ea11515dac Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 9 Dec 2020 16:12:58 +0100 Subject: [PATCH] Preserve message ordering (!!) --- .../java/org/syndicate_lang/actors/Actor.java | 78 +++++++++++++++---- .../actors/example/example2/Forwarder.java | 6 +- .../actors/example/example2/Main.java | 2 +- 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java index 03ac4f5..c0c47a2 100644 --- a/src/main/java/org/syndicate_lang/actors/Actor.java +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -6,9 +6,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,6 +32,25 @@ public class Actor implements Executor { private Map>> _monitors = null; private Consumer _exitTrap = null; + private final static class WorkItem extends AtomicReference { + Runnable work; + Runnable ifNotAlive; + + public WorkItem(Runnable work, Runnable ifNotAlive) { + this.work = work; + this.ifNotAlive = ifNotAlive; + } + + public final void clear() { + this.work = null; + this.ifNotAlive = null; + } + } + + private WorkItem head = new WorkItem(null, null); + private final AtomicReference tail = new AtomicReference<>(head); + private final AtomicLong workItemCount = new AtomicLong(0); + public static Actor current() { return _currentActor.get(); } @@ -107,25 +126,29 @@ public class Actor implements Executor { return new Remote<>(this, o); } - private void _perform(Runnable work, Runnable ifNotAlive) { + private void _performSync(Runnable work, Runnable ifNotAlive) { synchronized (this) { _currentActor.set(this); try { - if (!_alive) { - if (ifNotAlive != null) ifNotAlive.run(); - } else { - try { - work.run(); - } catch (Throwable exn) { - this._stop(false, exn); - } - } + _perform(work, ifNotAlive); } finally { _currentActor.set(null); } } } + private void _perform(Runnable work, Runnable ifNotAlive) { + if (!_alive) { + if (ifNotAlive != null) ifNotAlive.run(); + } else { + try { + work.run(); + } catch (Throwable exn) { + this._stop(false, exn); + } + } + } + public Promise stop() { return stop(null); } @@ -187,9 +210,34 @@ public class Actor implements Executor { public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) { if (delayMilliseconds == 0) { - _executor.execute(() -> this._perform(work, ifNotAlive)); + { + WorkItem i = new WorkItem(work, ifNotAlive); + tail.getAndSet(i).set(i); + } + if (workItemCount.getAndIncrement() == 0) { + _executor.execute(() -> { + synchronized (this) { + _currentActor.set(this); + try { + long batch = workItemCount.get(); + while (batch > 0) { + for (int count = 0; count < batch; count++) { + WorkItem i = null; + while (i == null) i = head.get(); + head = i; + this._perform(i.work, i.ifNotAlive); + i.clear(); + } + batch = workItemCount.addAndGet(-batch); + } + } finally { + _currentActor.set(null); + } + } + }); + } } else { - _scheduledExecutor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); + _scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); } } @@ -200,7 +248,7 @@ public class Actor implements Executor { public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { return new PeriodicTimer( _scheduledExecutor.scheduleAtFixedRate( - () -> this._perform(f, null), + () -> this._performSync(f, null), initialDelayMilliseconds, periodMilliseconds, TimeUnit.MILLISECONDS)); @@ -214,7 +262,9 @@ public class Actor implements Executor { } _executor.shutdown(); _scheduledExecutor.shutdown(); + //noinspection ResultOfMethodCallIgnored _executor.awaitTermination(5, TimeUnit.MINUTES); + //noinspection ResultOfMethodCallIgnored _scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES); } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java index 0c53d0a..a9f16bb 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java @@ -6,18 +6,20 @@ public class Forwarder implements IForwarder { private final int _index; private final List _actors; private final IForwarder _main; + private final int _nActors; private final int _nRounds; - public Forwarder(int index, List actors, IForwarder main, int nRounds) { + public Forwarder(int index, List actors, IForwarder main, int nActors, int nRounds) { this._index = index; this._actors = actors; this._main = main; + this._nActors = nActors; this._nRounds = nRounds; } @Override public void handleMessage(int hopCount) { - int index = (this._index + 1) % this._actors.size(); + int index = (this._index + 1) % this._nActors; IForwarder target = hopCount >= this._nRounds - 1 ? _main : this._actors.get(index); target.handleMessage(hopCount + 1); } diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java index 0c3c536..b3ad854 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java @@ -32,7 +32,7 @@ public class Main implements IForwarder { List _actors = new ArrayList<>(); final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class); for (int i = 0; i < _nActors; i++) { - Remote a = Actor.forObject(new Forwarder(i, _actors, me, this._nRounds)); + Remote a = Actor.forObject(new Forwarder(i, _actors, me, this._nActors, this._nRounds)); a.getActor().link(); _actors.add(a.asyncProxy(IForwarder.class)); }