Preserve message ordering (!!)

This commit is contained in:
Tony Garnock-Jones 2020-12-09 16:12:58 +01:00
parent 46a129d40f
commit c2c80215d4
3 changed files with 69 additions and 17 deletions

View File

@ -6,9 +6,9 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -32,6 +32,25 @@ public class Actor implements Executor {
private Map<Object, Remote<BiConsumer<Actor, Object>>> _monitors = null; private Map<Object, Remote<BiConsumer<Actor, Object>>> _monitors = null;
private Consumer<Actor> _exitTrap = null; private Consumer<Actor> _exitTrap = null;
private final static class WorkItem extends AtomicReference<WorkItem> {
Runnable work;
Runnable ifNotAlive;
public WorkItem(Runnable work, Runnable ifNotAlive) {
this.work = work;
this.ifNotAlive = ifNotAlive;
}
public final void clear() {
this.work = null;
this.ifNotAlive = null;
}
}
private WorkItem head = new WorkItem(null, null);
private final AtomicReference<WorkItem> tail = new AtomicReference<>(head);
private final AtomicLong workItemCount = new AtomicLong(0);
public static Actor current() { public static Actor current() {
return _currentActor.get(); return _currentActor.get();
} }
@ -107,25 +126,29 @@ public class Actor implements Executor {
return new Remote<>(this, o); return new Remote<>(this, o);
} }
private void _perform(Runnable work, Runnable ifNotAlive) { private void _performSync(Runnable work, Runnable ifNotAlive) {
synchronized (this) { synchronized (this) {
_currentActor.set(this); _currentActor.set(this);
try { try {
if (!_alive) { _perform(work, ifNotAlive);
if (ifNotAlive != null) ifNotAlive.run();
} else {
try {
work.run();
} catch (Throwable exn) {
this._stop(false, exn);
}
}
} finally { } finally {
_currentActor.set(null); _currentActor.set(null);
} }
} }
} }
private void _perform(Runnable work, Runnable ifNotAlive) {
if (!_alive) {
if (ifNotAlive != null) ifNotAlive.run();
} else {
try {
work.run();
} catch (Throwable exn) {
this._stop(false, exn);
}
}
}
public Promise<?> stop() { public Promise<?> stop() {
return stop(null); return stop(null);
} }
@ -187,9 +210,34 @@ public class Actor implements Executor {
public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) { public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) {
if (delayMilliseconds == 0) { if (delayMilliseconds == 0) {
_executor.execute(() -> this._perform(work, ifNotAlive)); {
WorkItem i = new WorkItem(work, ifNotAlive);
tail.getAndSet(i).set(i);
}
if (workItemCount.getAndIncrement() == 0) {
_executor.execute(() -> {
synchronized (this) {
_currentActor.set(this);
try {
long batch = workItemCount.get();
while (batch > 0) {
for (int count = 0; count < batch; count++) {
WorkItem i = null;
while (i == null) i = head.get();
head = i;
this._perform(i.work, i.ifNotAlive);
i.clear();
}
batch = workItemCount.addAndGet(-batch);
}
} finally {
_currentActor.set(null);
}
}
});
}
} else { } else {
_scheduledExecutor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); _scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS);
} }
} }
@ -200,7 +248,7 @@ public class Actor implements Executor {
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) {
return new PeriodicTimer( return new PeriodicTimer(
_scheduledExecutor.scheduleAtFixedRate( _scheduledExecutor.scheduleAtFixedRate(
() -> this._perform(f, null), () -> this._performSync(f, null),
initialDelayMilliseconds, initialDelayMilliseconds,
periodMilliseconds, periodMilliseconds,
TimeUnit.MILLISECONDS)); TimeUnit.MILLISECONDS));
@ -214,7 +262,9 @@ public class Actor implements Executor {
} }
_executor.shutdown(); _executor.shutdown();
_scheduledExecutor.shutdown(); _scheduledExecutor.shutdown();
//noinspection ResultOfMethodCallIgnored
_executor.awaitTermination(5, TimeUnit.MINUTES); _executor.awaitTermination(5, TimeUnit.MINUTES);
//noinspection ResultOfMethodCallIgnored
_scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES); _scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES);
} }

View File

@ -6,18 +6,20 @@ public class Forwarder implements IForwarder {
private final int _index; private final int _index;
private final List<IForwarder> _actors; private final List<IForwarder> _actors;
private final IForwarder _main; private final IForwarder _main;
private final int _nActors;
private final int _nRounds; private final int _nRounds;
public Forwarder(int index, List<IForwarder> actors, IForwarder main, int nRounds) { public Forwarder(int index, List<IForwarder> actors, IForwarder main, int nActors, int nRounds) {
this._index = index; this._index = index;
this._actors = actors; this._actors = actors;
this._main = main; this._main = main;
this._nActors = nActors;
this._nRounds = nRounds; this._nRounds = nRounds;
} }
@Override @Override
public void handleMessage(int hopCount) { public void handleMessage(int hopCount) {
int index = (this._index + 1) % this._actors.size(); int index = (this._index + 1) % this._nActors;
IForwarder target = hopCount >= this._nRounds - 1 ? _main : this._actors.get(index); IForwarder target = hopCount >= this._nRounds - 1 ? _main : this._actors.get(index);
target.handleMessage(hopCount + 1); target.handleMessage(hopCount + 1);
} }

View File

@ -32,7 +32,7 @@ public class Main implements IForwarder {
List<IForwarder> _actors = new ArrayList<>(); List<IForwarder> _actors = new ArrayList<>();
final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class); final IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class);
for (int i = 0; i < _nActors; i++) { for (int i = 0; i < _nActors; i++) {
Remote<IForwarder> a = Actor.forObject(new Forwarder(i, _actors, me, this._nRounds)); Remote<IForwarder> a = Actor.forObject(new Forwarder(i, _actors, me, this._nActors, this._nRounds));
a.getActor().link(); a.getActor().link();
_actors.add(a.asyncProxy(IForwarder.class)); _actors.add(a.asyncProxy(IForwarder.class));
} }