Avoid double-scheduling for a nice win
This commit is contained in:
parent
072119d6c0
commit
d1acf60d1b
|
@ -5,6 +5,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
@ -69,7 +70,7 @@ public class Actor implements Executor {
|
|||
final Promise<Ref> p = new Promise<>();
|
||||
final Actor a = new Actor();
|
||||
a.execute(
|
||||
() -> Turn.forActor(a, t -> p.resolveCalling(() -> a.ref(f.apply(t)))),
|
||||
() -> a.scheduleTurn(t -> p.resolveCalling(() -> a.ref(f.apply(t)))),
|
||||
() -> p.rejectWith(new ActorTerminated(a)));
|
||||
return p;
|
||||
}
|
||||
|
@ -132,6 +133,19 @@ public class Actor implements Executor {
|
|||
return new Ref(this, o);
|
||||
}
|
||||
|
||||
public void scheduleTurn(Consumer<Turn> f) {
|
||||
this.execute(() -> this._performTurn(f));
|
||||
}
|
||||
|
||||
void _performTurn(Consumer<Turn> f) {
|
||||
try {
|
||||
f.accept(_turn);
|
||||
_turn.commit();
|
||||
} catch (Exception e) {
|
||||
this.stop(false, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void _performSync(Runnable work, Runnable ifNotAlive) {
|
||||
synchronized (this) {
|
||||
_perform(work, ifNotAlive);
|
||||
|
@ -175,7 +189,7 @@ public class Actor implements Executor {
|
|||
} else {
|
||||
log().log(Level.SEVERE, "Actor terminated with error", reason);
|
||||
}
|
||||
Turn._forActor(this, t -> {
|
||||
this._performTurn(t -> {
|
||||
synchronized(_outbound) {
|
||||
_outbound.forEach(t::_retract_);
|
||||
}
|
||||
|
|
|
@ -16,20 +16,6 @@ public class Turn {
|
|||
private Consumer<Turn> _pending2 = null;
|
||||
private Map<Actor, List<Consumer<Turn>>> _pending = null;
|
||||
|
||||
public static void forActor(Actor a, Consumer<Turn> f) {
|
||||
a.execute(() -> Turn._forActor(a, f));
|
||||
}
|
||||
|
||||
static void _forActor(Actor a, Consumer<Turn> f) {
|
||||
Turn t = a._turn;
|
||||
try {
|
||||
f.accept(t);
|
||||
t.commit();
|
||||
} catch (Exception e) {
|
||||
a.stop(false, e);
|
||||
}
|
||||
}
|
||||
|
||||
Turn(Actor a) {
|
||||
this._actor = a;
|
||||
}
|
||||
|
@ -42,19 +28,19 @@ public class Turn {
|
|||
return _actor.log();
|
||||
}
|
||||
|
||||
private void commit() {
|
||||
void commit() {
|
||||
if (_pendingTarget != null) {
|
||||
var ac = _pendingTarget;
|
||||
Consumer<Turn> q0 = _pending0, q1 = _pending1, q2 = _pending2;
|
||||
_pendingTarget = null;
|
||||
_pending0 = _pending1 = _pending2 = null;
|
||||
ac.execute(() -> Turn.forActor(ac, t -> {
|
||||
ac.scheduleTurn(t -> {
|
||||
q0.accept(t);
|
||||
if (q1 != null) q1.accept(t);
|
||||
if (q2 != null) q2.accept(t);
|
||||
}));
|
||||
});
|
||||
} else if (_pending != null) {
|
||||
_pending.forEach((ac, q) -> ac.execute(() -> Turn.forActor(ac, t -> q.forEach(f -> f.accept(t)))));
|
||||
_pending.forEach((ac, q) -> ac.scheduleTurn(t -> q.forEach(f -> f.accept(t))));
|
||||
_pending = null;
|
||||
}
|
||||
}
|
||||
|
@ -95,7 +81,7 @@ public class Turn {
|
|||
Map<Long, Ref> newOutbound = new HashMap<>();
|
||||
initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k)));
|
||||
Actor newActor = new Actor(null, newOutbound);
|
||||
newActor.execute(() -> Turn.forActor(newActor, bootProc));
|
||||
newActor.scheduleTurn(bootProc);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -167,7 +153,7 @@ public class Turn {
|
|||
}
|
||||
|
||||
public void later(long delayMilliseconds, Consumer<Turn> action) {
|
||||
_actor.later(delayMilliseconds, () -> Turn.forActor(_actor, action));
|
||||
_actor.later(delayMilliseconds, () -> _actor.scheduleTurn(action));
|
||||
}
|
||||
|
||||
public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) {
|
||||
|
@ -175,6 +161,6 @@ public class Turn {
|
|||
}
|
||||
|
||||
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) {
|
||||
return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> Turn.forActor(_actor, action));
|
||||
return _actor.every(initialDelayMilliseconds, periodMilliseconds, () -> _actor.scheduleTurn(action));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.syndicate_lang.actors.Turn;
|
|||
public class Main extends Entity {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
Actor.convenientLogging();
|
||||
Turn.forActor(new Actor().daemonize(), t -> {
|
||||
new Actor().daemonize().scheduleTurn(t -> {
|
||||
final var vh = Actor.forEntity(new ValueHolder<>("There"));
|
||||
vh.getActor().daemonize();
|
||||
final var m = Actor.boot(u -> {
|
||||
|
|
|
@ -13,7 +13,7 @@ import static java.lang.Integer.parseInt;
|
|||
public class Main extends Entity {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
Actor.convenientLogging();
|
||||
Turn.forActor(new Actor().daemonize(), t -> {
|
||||
new Actor().daemonize().scheduleTurn(t -> {
|
||||
new Main(parseInt(args[0]), parseInt(args[1])).boot(t);
|
||||
});
|
||||
Actor.awaitAll();
|
||||
|
|
Loading…
Reference in New Issue