Remove a few small Runnable allocations

This commit is contained in:
Tony Garnock-Jones 2023-10-28 18:22:53 +02:00
parent d1acf60d1b
commit 94646a92de
3 changed files with 54 additions and 64 deletions

View File

@ -13,7 +13,7 @@ import java.util.logging.Logger;
/** /**
* I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat. * I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat.
*/ */
public class Actor implements Executor { public class Actor {
private final static AtomicLong _count = new AtomicLong(0); private final static AtomicLong _count = new AtomicLong(0);
private final static AtomicLong _actorId = new AtomicLong(0); private final static AtomicLong _actorId = new AtomicLong(0);
protected final static ExecutorService _executor = Executors.newWorkStealingPool(); protected final static ExecutorService _executor = Executors.newWorkStealingPool();
@ -28,11 +28,11 @@ public class Actor implements Executor {
private boolean _isCounted = true; private boolean _isCounted = true;
private Map<Long, Ref> _outbound; private Map<Long, Ref> _outbound;
private final static class WorkItem extends AtomicReference<WorkItem> { public final static class WorkItem extends AtomicReference<WorkItem> {
Runnable work; Consumer<Turn> work;
Runnable ifNotAlive; Runnable ifNotAlive;
public WorkItem(Runnable work, Runnable ifNotAlive) { public WorkItem(Consumer<Turn> work, Runnable ifNotAlive) {
this.work = work; this.work = work;
this.ifNotAlive = ifNotAlive; this.ifNotAlive = ifNotAlive;
} }
@ -69,9 +69,9 @@ public class Actor implements Executor {
public static Promise<Ref> boot(Function<Turn, IEntity> f) { public static Promise<Ref> boot(Function<Turn, IEntity> f) {
final Promise<Ref> p = new Promise<>(); final Promise<Ref> p = new Promise<>();
final Actor a = new Actor(); final Actor a = new Actor();
a.execute( a.execute(new WorkItem(
() -> a.scheduleTurn(t -> p.resolveCalling(() -> a.ref(f.apply(t)))), t -> p.resolveCalling(() -> a.ref(f.apply(t))),
() -> p.rejectWith(new ActorTerminated(a))); () -> p.rejectWith(new ActorTerminated(a))));
return p; return p;
} }
@ -134,7 +134,7 @@ public class Actor implements Executor {
} }
public void scheduleTurn(Consumer<Turn> f) { public void scheduleTurn(Consumer<Turn> f) {
this.execute(() -> this._performTurn(f)); this.execute(new WorkItem(f, null));
} }
void _performTurn(Consumer<Turn> f) { void _performTurn(Consumer<Turn> f) {
@ -146,21 +146,17 @@ public class Actor implements Executor {
} }
} }
private void _performSync(Runnable work, Runnable ifNotAlive) { private void _performSync(WorkItem item) {
synchronized (this) { synchronized (this) {
_perform(work, ifNotAlive); _perform(item);
} }
} }
private void _perform(Runnable work, Runnable ifNotAlive) { private void _perform(WorkItem item) {
if (!_alive) { if (!_alive) {
if (ifNotAlive != null) ifNotAlive.run(); if (item.ifNotAlive != null) item.ifNotAlive.run();
} else { } else {
try { this._performTurn(item.work);
work.run();
} catch (Throwable exn) {
this._stop(false, exn);
}
} }
} }
@ -173,10 +169,9 @@ public class Actor implements Executor {
} }
public Promise<?> stop(boolean normally, Throwable reason) { public Promise<?> stop(boolean normally, Throwable reason) {
Promise<?> p = new Promise<>(); Promise<?> p = new Promise<>();
this.execute(() -> { this.execute(new WorkItem(
this._stop(normally, reason); _f -> { this._stop(normally, reason); p.resolve(); },
p.resolve(); p::resolve));
}, p::resolve);
return p; return p;
} }
@ -198,16 +193,8 @@ public class Actor implements Executor {
} }
} }
@Override public void execute(WorkItem item) {
public void execute(Runnable work) { tail.getAndSet(item).set(item);
this.execute(work, null);
}
public void execute(Runnable work, Runnable ifNotAlive) {
{
WorkItem i = new WorkItem(work, ifNotAlive);
tail.getAndSet(i).set(i);
}
if (workItemCount.getAndIncrement() == 0) { if (workItemCount.getAndIncrement() == 0) {
_executor.execute(() -> { _executor.execute(() -> {
synchronized (this) { synchronized (this) {
@ -217,7 +204,7 @@ public class Actor implements Executor {
WorkItem i = null; WorkItem i = null;
while (i == null) i = head.get(); while (i == null) i = head.get();
head = i; head = i;
this._perform(i.work, i.ifNotAlive); this._perform(i);
i.clear(); i.clear();
} }
batch = workItemCount.addAndGet(-batch); batch = workItemCount.addAndGet(-batch);
@ -227,29 +214,32 @@ public class Actor implements Executor {
} }
} }
public void later(long delayMilliseconds, Runnable work) { public void later(long delayMilliseconds, WorkItem item) {
this.later(delayMilliseconds, work, null);
}
public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) {
if (delayMilliseconds == 0) { if (delayMilliseconds == 0) {
this.execute(work, ifNotAlive); this.execute(item);
} else { } else {
_scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); _scheduledExecutor.schedule(
() -> this._performSync(item),
delayMilliseconds,
TimeUnit.MILLISECONDS);
} }
} }
public PeriodicTimer every(long periodMilliseconds, Runnable f) { public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> f) {
return every(0, periodMilliseconds, f); final var self = this;
} final var callback = new Runnable() {
PeriodicTimer timer;
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { public void run() {
return new PeriodicTimer( self._performSync(new WorkItem(f, this.timer::cancel));
_scheduledExecutor.scheduleAtFixedRate( }
() -> this._performSync(f, null), };
initialDelayMilliseconds, callback.timer = new PeriodicTimer(
periodMilliseconds, _scheduledExecutor.scheduleAtFixedRate(
TimeUnit.MILLISECONDS)); callback,
initialDelayMilliseconds,
periodMilliseconds,
TimeUnit.MILLISECONDS));
return callback.timer;
} }
public static void awaitAll() throws InterruptedException { public static void awaitAll() throws InterruptedException {

View File

@ -127,18 +127,18 @@ public class Promise<T> implements Future<T> {
public synchronized<R> Promise<R> andThen(Actor a0, final Function<T, R> ok, final Function<Throwable, R> fail) { public synchronized<R> Promise<R> andThen(Actor a0, final Function<T, R> ok, final Function<Throwable, R> fail) {
final Actor a = a0 != null ? a0 : new Actor(); final Actor a = a0 != null ? a0 : new Actor();
Promise<R> p = new Promise<>(); Promise<R> p = new Promise<>();
this.whenFulfilled((t) -> a.execute( this.whenFulfilled((t) -> a.execute(new Actor.WorkItem(
() -> p.resolveCalling(() -> ok.apply(t)), _turn -> p.resolveCalling(() -> ok.apply(t)),
() -> p.rejectWith(new ActorTerminated(a)))); () -> p.rejectWith(new ActorTerminated(a)))));
this.whenRejected((e) -> a.execute( this.whenRejected((e) -> a.execute(new Actor.WorkItem(
() -> { _turn -> {
if (fail == null) { if (fail == null) {
p.rejectWith(e); p.rejectWith(e);
} else { } else {
p.resolveCalling(() -> fail.apply(e)); p.resolveCalling(() -> fail.apply(e));
} }
}, },
() -> p.rejectWith(new ActorTerminated(a)))); () -> p.rejectWith(new ActorTerminated(a)))));
return p; return p;
} }

View File

@ -153,7 +153,7 @@ public class Turn {
} }
public void later(long delayMilliseconds, Consumer<Turn> action) { public void later(long delayMilliseconds, Consumer<Turn> action) {
_actor.later(delayMilliseconds, () -> _actor.scheduleTurn(action)); _actor.later(delayMilliseconds, new Actor.WorkItem(action, null));
} }
public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) { public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) {
@ -161,6 +161,6 @@ public class Turn {
} }
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) { public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) {
return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> _actor.scheduleTurn(action)); return _actor.every(initialDelayMilliseconds, periodMilliseconds, action);
} }
} }