Compare commits

...

31 Commits
main ... novy

Author SHA1 Message Date
Tony Garnock-Jones 27ab682bca Repair ancient spelling error 2024-02-28 22:28:17 +01:00
Tony Garnock-Jones 6d94e728af Repair Actor.every 2023-10-29 00:25:17 +02:00
Tony Garnock-Jones 7526caa179 Mark overrides 2023-10-28 23:43:34 +02:00
Tony Garnock-Jones e483a57df9 Avoid allocating a wrapper for a runnable every time an actor is scheduled 2023-10-28 23:16:12 +02:00
Tony Garnock-Jones 90147b4970 Reuse a runnable 2023-10-28 21:00:53 +02:00
Tony Garnock-Jones 9c7b71fa51 Rearrange daemonization 2023-10-28 20:01:39 +02:00
Tony Garnock-Jones 882b47602d Split out _runWorkItems 2023-10-28 19:07:38 +02:00
Tony Garnock-Jones d91bb6dc02 Address idea warnings 2023-10-28 18:57:39 +02:00
Tony Garnock-Jones 9e971de6b0 Async mode helps a little 2023-10-28 18:41:10 +02:00
Tony Garnock-Jones 94646a92de Remove a few small Runnable allocations 2023-10-28 18:22:53 +02:00
Tony Garnock-Jones d1acf60d1b Avoid double-scheduling for a nice win 2023-10-28 17:12:07 +02:00
Tony Garnock-Jones 072119d6c0 Why not 2023-10-28 16:32:23 +02:00
Tony Garnock-Jones 6904ef76df Rearrange to put the common case at the top 2023-10-28 16:27:51 +02:00
Tony Garnock-Jones 636da3f28f Is there *really* a need to allocate fresh Turns every time? I think not 2023-10-28 16:18:42 +02:00
Tony Garnock-Jones 732bb0066d Oops, forgot to remove the allocation 2023-10-28 16:16:16 +02:00
Tony Garnock-Jones 7d9174c363 More data-structure inlining 2023-10-28 16:09:06 +02:00
Tony Garnock-Jones fd8b445482 Try to eke out a little more speed 2023-10-28 16:03:46 +02:00
Tony Garnock-Jones 5401d2d8e9 Reduce allocation slightly further 2023-10-28 15:45:27 +02:00
Tony Garnock-Jones f55fd2c668 Good performance boost by avoiding hashmaps in the single-target case 2023-10-28 15:37:10 +02:00
Tony Garnock-Jones c79724a098 Update .gitignore 2023-10-28 15:31:51 +02:00
Tony Garnock-Jones f08dccaf07 Better Makefile variables; jfr "profiling" 2023-10-28 15:31:28 +02:00
Tony Garnock-Jones e37ec7e642 IDEA seems not to want the XML declaration anymore 2023-10-28 15:31:09 +02:00
Tony Garnock-Jones de6b5dfe18 See, this is why shadowing should be permitted 2023-10-28 14:11:58 +02:00
Tony Garnock-Jones e7d7942e9b Exclusion no longer required 2023-10-28 14:09:06 +02:00
Tony Garnock-Jones 85c106b6ca Guard concurrent access to _outbound 2023-10-28 13:52:27 +02:00
Tony Garnock-Jones cb3fd44529 Start timing after all peer-setting has been done 2023-10-28 13:49:49 +02:00
Tony Garnock-Jones 637f1da5f4 Implement ring benchmark; repair some errors 2023-10-28 13:23:29 +02:00
Tony Garnock-Jones c7718b3ddd Make turns atomic, rather than reply-turn-per-event 2021-05-31 12:05:00 +02:00
Tony Garnock-Jones d6c8a80995 Bump language level to JDK 16 to get records; update example1 2021-05-12 15:47:59 +02:00
Tony Garnock-Jones af611e1b2b First broad step toward novy 2021-05-12 15:47:59 +02:00
Tony Garnock-Jones 2ad5ac75ee Remove ambient authority (static current Actor) 2021-05-12 15:47:59 +02:00
21 changed files with 500 additions and 506 deletions

5
.gitignore vendored
View File

@ -1,2 +1,3 @@
out/
doc/
/out/
/doc/
/profile.jfr

View File

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" project-jdk-name="11" project-jdk-type="JavaSDK">
<component name="ProjectRootManager" version="2" languageLevel="JDK_16" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@ -1,3 +1,6 @@
NACTORS=1000000
NROUNDS=200
all:
ant jar
@ -5,7 +8,10 @@ clean:
ant clean
ziprun: all
java -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main 1000000 200
java -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main $(NACTORS) $(NROUNDS)
ziprun1000: all
java -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main 1000000 1000
profile.jfr: all
java -XX:StartFlightRecording=settings=profile,filename=$@ -cp out/lib/org.syndicate_lang.actors.jar:out/test/test org.syndicate_lang.actors.example.example2.Main $(NACTORS) $(NROUNDS)
ziprun1000:
$(MAKE) NROUNDS=1000 ziprun

View File

@ -1,38 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public abstract class AbstractProxy<T> implements InvocationHandler {
protected final Remote<T> _ref;
public AbstractProxy(Remote<T> ref) {
this._ref = ref;
}
public Remote<T> ref() {
return this._ref;
}
abstract boolean isSync();
private static Method toStringMethod;
static {
try {
toStringMethod = Object.class.getMethod("toString");
} catch (NoSuchMethodException e) {
toStringMethod = null;
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
if (method.equals(toStringMethod)) {
return this._ref.toString();
}
return dispatch(method, args);
}
abstract Object dispatch(Method method, Object[] args);
}

View File

@ -1,42 +1,43 @@
package org.syndicate_lang.actors;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat.
* I represent the shared execution context for a collection of objects; I am roughly analogous to the E concept of a Vat.
*/
public class Actor implements Executor {
private final static ThreadLocal<Actor> _currentActor = new ThreadLocal<>();
public class Actor extends ForkJoinTask<Void> {
private final static AtomicLong _count = new AtomicLong(0);
private final static AtomicLong _actorId = new AtomicLong(0);
protected final static ExecutorService _executor = Executors.newWorkStealingPool();
protected final static ForkJoinPool _executor = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true
);
protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private final String _name;
private final Logger _logger;
final Turn _turn = new Turn(this);
private boolean _alive = true;
private Throwable _exitReason = null;
private boolean _isCounted = true;
private Set<Actor> _links = null;
private Map<Object, Remote<BiConsumer<Actor, Object>>> _monitors = null;
private Consumer<Actor> _exitTrap = null;
private Map<Long, Ref> _outbound;
private final static class WorkItem extends AtomicReference<WorkItem> {
Runnable work;
public final static class WorkItem extends AtomicReference<WorkItem> {
Consumer<Turn> work;
Runnable ifNotAlive;
public WorkItem(Runnable work, Runnable ifNotAlive) {
public WorkItem(Consumer<Turn> work, Runnable ifNotAlive) {
this.work = work;
this.ifNotAlive = ifNotAlive;
}
@ -51,38 +52,31 @@ public class Actor implements Executor {
private final AtomicReference<WorkItem> tail = new AtomicReference<>(head);
private final AtomicLong workItemCount = new AtomicLong(0);
public static Actor current() {
return _currentActor.get();
}
public static Logger log() {
return current().getLogger();
}
public static<T> Remote<T> ref(T o) {
return current().proxyFor(o);
}
public Actor() {
this("" + _actorId.incrementAndGet());
this(null, null);
}
public Actor(String debugName) {
this._name = debugName;
this(debugName, null);
}
public Actor(String debugName, Map<Long, Ref> outbound) {
this._name = debugName == null ? "" + _actorId.incrementAndGet() : debugName;
this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")");
this._outbound = outbound == null ? new HashMap<>() : outbound;
_count.incrementAndGet();
}
public static<T> Remote<T> forObject(T o) {
return new Actor().proxyFor(o);
public static Ref forEntity(IEntity o) {
return new Actor().ref(o);
}
public static<T> Promise<Remote<T>> boot(ThrowingSupplier<T> f) {
Promise<Remote<T>> p = new Promise<>();
Actor a = new Actor();
a.execute(
() -> p.resolveCalling(() -> Actor.ref(f.get())),
() -> p.rejectWith(new ActorTerminated(a)));
public static Promise<Ref> boot(Function<Turn, IEntity> f) {
final Promise<Ref> p = new Promise<>();
final Actor a = new Actor();
a.execute(new WorkItem(
t -> p.resolveCalling(() -> a.ref(f.apply(t))),
() -> p.rejectWith(new ActorTerminated(a))));
return p;
}
@ -94,7 +88,7 @@ public class Actor implements Executor {
return _exitReason;
}
public Logger getLogger() {
public Logger log() {
return _logger;
}
@ -102,9 +96,8 @@ public class Actor implements Executor {
return _alive && !_isCounted;
}
public synchronized Actor daemonize() {
this._releaseCount();
return this;
public void daemonize() {
this.scheduleTurn(_t -> this._releaseCount());
}
private void _releaseCount() {
@ -118,53 +111,67 @@ public class Actor implements Executor {
}
}
void _injectOutbound(Long handle, Ref peer) {
synchronized (_outbound) {
_outbound.put(handle, peer);
}
}
Ref _lookupOutbound(Long handle) {
synchronized (_outbound) {
return _outbound.get(handle);
}
}
Ref _extractOutbound(Long handle) {
synchronized (_outbound) {
return _outbound.remove(handle);
}
}
public String toString() {
return super.toString() + "(" + this._name + ")";
}
public<T> Remote<T> proxyFor(T o) {
return new Remote<>(this, o);
public<T> Ref ref(IEntity o) {
return new Ref(this, o);
}
private void _performSync(Runnable work, Runnable ifNotAlive) {
synchronized (this) {
_currentActor.set(this);
try {
_perform(work, ifNotAlive);
} finally {
_currentActor.set(null);
}
public Actor scheduleTurn(Consumer<Turn> f) {
this.execute(new WorkItem(f, null));
return this;
}
void _performTurn(Consumer<Turn> f) {
try {
f.accept(_turn);
_turn.commit();
} catch (Exception e) {
this.stop(false, e);
}
}
private void _perform(Runnable work, Runnable ifNotAlive) {
private void _perform(WorkItem item) {
if (!_alive) {
if (ifNotAlive != null) ifNotAlive.run();
if (item.ifNotAlive != null) item.ifNotAlive.run();
} else {
try {
work.run();
} catch (Throwable exn) {
this._stop(false, exn);
}
this._performTurn(item.work);
}
}
public Promise<?> stop() {
return stop(null);
return stop(true, null);
}
public Promise<?> stop(Throwable reason) {
if (current() == this) {
this._stop(true, reason);
return Promise.resolved();
} else {
Promise<?> p = new Promise<>();
this.execute(() -> {
this._stop(true, reason);
p.resolve();
}, p::resolve);
return p;
}
return stop(true, reason);
}
public Promise<?> stop(boolean normally, Throwable reason) {
Promise<?> p = new Promise<>();
this.execute(new WorkItem(
_f -> { this._stop(normally, reason); p.resolve(); },
p::resolve));
return p;
}
private synchronized void _stop(boolean normally, Throwable reason) {
@ -172,86 +179,71 @@ public class Actor implements Executor {
_alive = false;
_exitReason = reason;
if (normally) {
getLogger().log(Level.FINE, "Actor stopped", reason);
log().log(Level.FINE, "Actor stopped", reason);
} else {
getLogger().log(Level.SEVERE, "Actor terminated with error", reason);
log().log(Level.SEVERE, "Actor terminated with error", reason);
}
Set<Actor> linkedPeers = _links;
if (linkedPeers != null) {
_links = null;
for (var peer : linkedPeers) {
peer.notifyExit(this);
this._performTurn(t -> {
synchronized(_outbound) {
_outbound.forEach(t::_retract_);
}
}
Map<Object, Remote<BiConsumer<Actor, Object>>> monitoringPeers = _monitors;
if (monitoringPeers != null) {
_monitors = null;
for (var entry : monitoringPeers.entrySet()) {
final var ref = entry.getKey();
entry.getValue().async((h) -> h.accept(this, ref));
}
}
});
_releaseCount();
}
}
@Override
public void execute(Runnable work) {
this.execute(work, null);
synchronized void _runWorkItems() {
long batch = workItemCount.get();
while (batch > 0) {
for (int count = 0; count < batch; count++) {
WorkItem i = null;
while (i == null) i = head.get();
head = i;
this._perform(i);
i.clear();
}
batch = workItemCount.addAndGet(-batch);
}
}
public void execute(Runnable work, Runnable ifNotAlive) {
{
WorkItem i = new WorkItem(work, ifNotAlive);
tail.getAndSet(i).set(i);
}
@Override public final Void getRawResult() { return null; }
@Override public final void setRawResult(Void v) { }
@Override public final boolean exec() { this._runWorkItems(); return false; }
public void execute(WorkItem item) {
tail.getAndSet(item).set(item);
if (workItemCount.getAndIncrement() == 0) {
_executor.execute(() -> {
synchronized (this) {
_currentActor.set(this);
try {
long batch = workItemCount.get();
while (batch > 0) {
for (int count = 0; count < batch; count++) {
WorkItem i = null;
while (i == null) i = head.get();
head = i;
this._perform(i.work, i.ifNotAlive);
i.clear();
}
batch = workItemCount.addAndGet(-batch);
}
} finally {
_currentActor.set(null);
}
}
});
_executor.execute(this);
}
}
public void later(long delayMilliseconds, Runnable work) {
this.later(delayMilliseconds, work, null);
}
public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) {
public void later(long delayMilliseconds, WorkItem item) {
if (delayMilliseconds == 0) {
this.execute(work, ifNotAlive);
this.execute(item);
} else {
_scheduledExecutor.schedule(() -> this._performSync(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS);
_scheduledExecutor.schedule(
() -> this.execute(item),
delayMilliseconds,
TimeUnit.MILLISECONDS);
}
}
public PeriodicTimer every(long periodMilliseconds, Runnable f) {
return every(0, periodMilliseconds, f);
}
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) {
return new PeriodicTimer(
_scheduledExecutor.scheduleAtFixedRate(
() -> this._performSync(f, null),
initialDelayMilliseconds,
periodMilliseconds,
TimeUnit.MILLISECONDS));
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> f) {
final var self = this;
final var callback = new Runnable() {
PeriodicTimer timer;
public void run() {
while (this.timer == null) {} // There's a race. Spin.
self.execute(new WorkItem(f, this.timer::cancel));
}
};
callback.timer = new PeriodicTimer(
_scheduledExecutor.scheduleAtFixedRate(
callback,
initialDelayMilliseconds,
periodMilliseconds,
TimeUnit.MILLISECONDS));
return callback.timer;
}
public static void awaitAll() throws InterruptedException {
@ -273,63 +265,7 @@ public class Actor implements Executor {
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
}
public void link() {
final Actor peer = Actor.current();
this.linkPeer(peer);
peer.linkPeer(this);
}
public void unlink() {
final Actor peer = Actor.current();
this.unlinkPeer(peer);
peer.unlinkPeer(this);
}
private synchronized void linkPeer(Actor peer) {
if (this._alive) {
if (_links == null) _links = new HashSet<>();
_links.add(peer);
} else {
peer.notifyExit(this);
}
}
private void notifyExit(final Actor exitingPeer) {
this.execute(() -> {
this.unlinkPeer(exitingPeer);
if (this._exitTrap != null) {
this._exitTrap.accept(exitingPeer);
} else {
this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer));
}
});
}
private synchronized void unlinkPeer(Actor peer) {
if (_links != null) {
_links.remove(peer);
if (_links.isEmpty()) {
_links = null;
}
}
}
public void trapExits(Consumer<Actor> handler) {
this._exitTrap = handler;
}
public synchronized Object monitor(Consumer<Actor> handler) {
Object ref = new Object();
monitor(ref, (actor, _ref) -> handler.accept(actor));
return ref;
}
public synchronized void monitor(final Object ref, BiConsumer<Actor, Object> handler) {
if (this._alive) {
if (_monitors == null) _monitors = new HashMap<>();
_monitors.put(ref, Actor.ref(handler));
} else {
Actor.ref(handler).async((h) -> h.accept(this, ref));
}
public boolean isAlive() {
return _alive;
}
}

View File

@ -1,35 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class AsyncProxy<T> extends AbstractProxy<T> {
public AsyncProxy(Remote<T> ref) {
super(ref);
}
@Override
public Object dispatch(Method method, Object[] args) {
if (method.getReturnType().equals(void.class)) {
this._ref.async((v) -> {
try {
method.invoke(v, args);
} catch (IllegalAccessException e) {
throw new ProxyFailure(e);
} catch (InvocationTargetException e) {
throw new ProxyFailure(e.getCause());
}
});
return null;
} else {
System.err.println(method.getReturnType());
throw new UnsupportedOperationException(
"Cannot invoke non-void-returning method '" + method + "' asynchronously via AsyncProxy");
}
}
@Override
boolean isSync() {
return false;
}
}

View File

@ -0,0 +1,20 @@
package org.syndicate_lang.actors;
public class Entity implements IEntity {
@Override
public void assert_(Turn turn, Object assertion, Long handle) {
}
@Override
public void retract_(Turn turn, Long handle) {
}
@Override
public void sync_(Turn turn, Ref peer) {
turn.message_(peer, true);
}
@Override
public void message_(Turn turn, Object body) {
}
}

View File

@ -0,0 +1,8 @@
package org.syndicate_lang.actors;
public interface IEntity {
void assert_(Turn turn, Object assertion, Long handle);
void retract_(Turn turn, Long handle);
void sync_(Turn turn, Ref peer);
void message_(Turn turn, Object body);
}

View File

@ -112,8 +112,8 @@ public class Promise<T> implements Future<T> {
}
}
public<R> Promise<R> andThen(Function<T, R> ok) {
return this.andThen(ok, null);
public<R> Promise<R> andThen(Actor a, Function<T, R> ok) {
return this.andThen(a, ok, null);
}
public void resolveCalling(ThrowingSupplier<T> f) {
@ -124,23 +124,21 @@ public class Promise<T> implements Future<T> {
}
}
public synchronized<R> Promise<R> andThen(final Function<T, R> ok, final Function<Throwable, R> fail) {
Actor a0 = Actor.current();
public synchronized<R> Promise<R> andThen(Actor a0, final Function<T, R> ok, final Function<Throwable, R> fail) {
final Actor a = a0 != null ? a0 : new Actor();
Promise<R> p = new Promise<>();
this.whenFulfilled((t) -> a.execute(
() -> p.resolveCalling(() -> ok.apply(t)),
() -> p.rejectWith(new ActorTerminated(a))));
this.whenRejected((e) -> a.execute(
() -> {
if (fail == null) {
p.rejectWith(e);
} else {
p.resolveCalling(() -> fail.apply(e));
}
},
() -> p.rejectWith(new ActorTerminated(a))));
this.whenFulfilled((t) -> a.execute(new Actor.WorkItem(
_turn -> p.resolveCalling(() -> ok.apply(t)),
() -> p.rejectWith(new ActorTerminated(a)))));
this.whenRejected((e) -> a.execute(new Actor.WorkItem(
_turn -> {
if (fail == null) {
p.rejectWith(e);
} else {
p.resolveCalling(() -> fail.apply(e));
}
},
() -> p.rejectWith(new ActorTerminated(a)))));
return p;
}
@ -166,29 +164,17 @@ public class Promise<T> implements Future<T> {
}
public T _await(long delay, TimeUnit unit) throws TimeoutException, InterruptedException {
Actor a = Actor.current();
if (a == null) {
Semaphore s = new Semaphore(0);
this.whenFulfilled((_t) -> s.release());
this.whenRejected((_e) -> s.release());
this.whenFulfilled((_t) -> { synchronized (this) { this.notifyAll(); } });
this.whenRejected((_e) -> { synchronized (this) { this.notifyAll(); } });
synchronized (this) {
if (delay == -1) {
s.acquire();
while (this.isPending()) this.wait();
} else {
if (!s.tryAcquire(delay, unit)) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
}
} else {
this.whenFulfilled((_t) -> { synchronized (a) { a.notifyAll(); } });
this.whenRejected((_e) -> { synchronized (a) { a.notifyAll(); } });
synchronized (a) {
if (delay == -1) {
while (this.isPending()) a.wait();
} else {
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
while (this.isPending()) {
long now = System.currentTimeMillis();
if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
a.wait(targetTime - now);
}
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
while (this.isPending()) {
long now = System.currentTimeMillis();
if (now >= targetTime) throw TIMEOUT_WAITING_FOR_PROMISE_RESOLUTION;
this.wait(targetTime - now);
}
}
}

View File

@ -1,7 +0,0 @@
package org.syndicate_lang.actors;
public class ProxyFailure extends RuntimeException {
public ProxyFailure(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,24 @@
package org.syndicate_lang.actors;
public class Ref {
private final Actor _actor;
private final IEntity _target;
public Ref(Actor actor, IEntity target) {
this._actor = actor;
this._target = target;
}
public Actor getActor() {
return _actor;
}
IEntity getEntity() {
return _target;
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")";
}
}

View File

@ -1,86 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.Proxy;
import java.util.function.Consumer;
import java.util.function.Function;
public class Remote<T> {
private final Actor _actor;
private final T _target;
public Remote(Actor actor, T target) {
this._actor = actor;
this._target = target;
}
public Actor getActor() {
return _actor;
}
public void async(Consumer<T> f) {
this._actor.execute(() -> f.accept(this._target));
}
public void async(long delayMilliseconds, Consumer<T> f) {
this._actor.later(delayMilliseconds, () -> f.accept(this._target));
}
public Promise<?> syncVoid(Consumer<T> f) {
return this.sync((t) -> {
f.accept(t);
return null;
});
}
public Promise<?> syncVoid(long delayMilliseconds, Consumer<T> f) {
return this.sync(delayMilliseconds, (t) -> {
f.accept(t);
return null;
});
}
public<R> Promise<R> sync(Function<T, R> f) {
Promise<R> p = new Promise<>();
this._actor.execute(
() -> p.resolveWith(f.apply(this._target)),
() -> p.rejectWith(this._actor.getExitReason()));
return p;
}
public<R> Promise<R> sync(long delayMilliseconds, Function<T, R> f) {
Promise<R> p = new Promise<>();
this._actor.later(
delayMilliseconds,
() -> p.resolveWith(f.apply(this._target)),
() -> p.rejectWith(this._actor.getExitReason()));
return p;
}
private<I> void checkTargetInstance(Class<I> c) {
if (!c.isInstance(this._target)) {
throw new IllegalArgumentException("target is not an instance of " + c);
}
}
@SuppressWarnings("unchecked")
public<I> I syncProxy(Class<I> c) {
checkTargetInstance(c);
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new SyncProxy<>(this));
}
@SuppressWarnings("unchecked")
public<I> I asyncProxy(Class<I> c) {
checkTargetInstance(c);
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new AsyncProxy<>(this));
}
@SuppressWarnings("unchecked")
public static<I, T extends I> Remote<T> from(I proxy) {
return ((AbstractProxy<T>) Proxy.getInvocationHandler(proxy)).ref();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")";
}
}

View File

@ -1,28 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class SyncProxy<T> extends AbstractProxy<T> {
public SyncProxy(Remote<T> ref) {
super(ref);
}
@Override
public Object dispatch(Method method, Object[] args) {
return this._ref.sync((v) -> {
try {
return method.invoke(v, args);
} catch (IllegalAccessException e) {
throw new ProxyFailure(e);
} catch (InvocationTargetException e) {
throw new ProxyFailure(e.getCause());
}
}).await();
}
@Override
boolean isSync() {
return true;
}
}

View File

@ -0,0 +1,166 @@
package org.syndicate_lang.actors;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Logger;
public class Turn {
private final static AtomicLong nextHandle = new AtomicLong(0);
private final Actor _actor;
private Actor _pendingTarget = null;
private Consumer<Turn> _pending0 = null;
private Consumer<Turn> _pending1 = null;
private Consumer<Turn> _pending2 = null;
private Map<Actor, List<Consumer<Turn>>> _pending = null;
Turn(Actor a) {
this._actor = a;
}
public Actor getActor() {
return _actor;
}
public Logger log() {
return _actor.log();
}
void commit() {
if (_pendingTarget != null) {
var ac = _pendingTarget;
Consumer<Turn> q0 = _pending0, q1 = _pending1, q2 = _pending2;
_pendingTarget = null;
_pending0 = _pending1 = _pending2 = null;
ac.scheduleTurn(t -> {
q0.accept(t);
if (q1 != null) q1.accept(t);
if (q2 != null) q2.accept(t);
});
} else if (_pending != null) {
_pending.forEach((ac, q) -> ac.scheduleTurn(t -> q.forEach(f -> f.accept(t))));
_pending = null;
}
}
private void enqueue(Actor target, Consumer<Turn> action) {
if (_pending == null) {
if (_pendingTarget == null) {
_pendingTarget = target;
_pending0 = action;
return;
}
if (_pendingTarget == target) {
if (_pending1 == null) { _pending1 = action; return; }
if (_pending2 == null) { _pending2 = action; return; }
}
_pending = new HashMap<>();
var q = new LinkedList<Consumer<Turn>>();
_pending.put(_pendingTarget, q);
q.add(_pending0);
if (_pending1 != null) q.add(_pending1);
if (_pending2 != null) q.add(_pending2);
_pendingTarget = null;
_pending0 = _pending1 = _pending2 = null;
}
_pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action);
}
public Ref ref(IEntity o) {
return _actor.ref(o);
}
public void spawn(Consumer<Turn> bootProc) {
this.spawn(bootProc, new HashSet<>());
}
public void spawn(Consumer<Turn> bootProc, Set<Long> initialAssertions) {
this.enqueue(this._actor, t -> {
Map<Long, Ref> newOutbound = new HashMap<>();
initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k)));
Actor newActor = new Actor(null, newOutbound);
newActor.scheduleTurn(bootProc);
});
}
public void quit() {
this._actor.stop();
}
public void crash(Throwable e) {
this._actor.stop(false, e);
}
public Long assert_(Ref target, Object assertion) {
Long h = Turn.nextHandle.getAndIncrement();
_assert_(target, assertion, h);
return h;
}
private void _assert_(Ref target, Object assertion, Long h) {
Object a = assertion; // TODO: runRewrites from target
if (a != null) {
this.enqueue(target.getActor(), t -> {
_actor._injectOutbound(h, target);
target.getEntity().assert_(t, a, h);
});
}
}
public void retract_(Long h) {
if (h == null) return;
Ref peer = this._actor._lookupOutbound(h);
if (peer == null) return;
_retract_(h, peer);
}
public Long replace_(Ref peer, Long h, Object assertion) {
var newHandle = assert_(peer, assertion);
retract_(h);
return newHandle;
}
void _retract_(Long handle, Ref peer) {
this.enqueue(peer.getActor(), t -> {
this._actor._extractOutbound(handle);
peer.getEntity().retract_(t, handle);
});
}
public void sync_(Ref peer, Consumer<Turn> k) {
this._sync_(peer, this.ref(new Entity() {
public void message_(Turn t, Object _message) {
k.accept(t);
}
}));
}
private void _sync_(Ref peer, Ref callback) {
this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback));
}
public void message_(Ref peer, Object body) {
Object a = body; // TODO runRewrites
if (a != null) {
this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body));
}
}
public void later(Consumer<Turn> action) {
this.later(0, action);
}
public void later(long delayMilliseconds, Consumer<Turn> action) {
_actor.later(delayMilliseconds, new Actor.WorkItem(action, null));
}
public PeriodicTimer every(long periodMilliseconds, Consumer<Turn> action) {
return every(0, periodMilliseconds, action);
}
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> action) {
return _actor.every(initialDelayMilliseconds, periodMilliseconds, action);
}
}

View File

@ -1,6 +1,8 @@
package org.syndicate_lang.actors.example.example1;
public interface IValueHolder<T> {
T get();
void set(T newValue);
}
import org.syndicate_lang.actors.Ref;
public interface IValueHolder {
record Get(Ref k) implements IValueHolder {}
record Set<T>(T newValue) implements IValueHolder {}
}

View File

@ -1,40 +1,62 @@
package org.syndicate_lang.actors.example.example1;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Entity;
import org.syndicate_lang.actors.Ref;
import org.syndicate_lang.actors.Turn;
public class Main {
public class Main extends Entity {
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
final var vh = Actor.forObject(new ValueHolder<>("There"));
vh.getActor().daemonize();
final var m = Actor.forObject(new Main());
m.async(10, (m_) -> m_.run(vh));
new Actor().scheduleTurn(t -> {
final var vh = Actor.forEntity(new ValueHolder<>("There"));
vh.getActor().daemonize();
Actor.boot(u -> {
Main main = new Main();
u.later(10, v -> main.run(v, vh));
return main;
});
@SuppressWarnings("unchecked")
IValueHolder<String> vv = vh.syncProxy(IValueHolder.class);
System.out.println("Value: " + vv.get());
vv.set("Second");
System.out.println("Value: " + vv.get());
System.out.println("Underlying: " + Remote.from(vv));
t.message_(vh, new IValueHolder.Get(t.ref(new Entity() {
public void message_(Turn t0, Object v0) {
System.out.println("Value: " + v0);
t0.message_(vh, new IValueHolder.Set<>("Second"));
t0.message_(vh, new IValueHolder.Get(t0.ref(new Entity() {
public void message_(Turn t1, Object v1) {
System.out.println("Value: " + v1);
System.out.println("Cell: " + vh);
}
})));
}
})));
}).daemonize();
Actor.awaitAll();
System.out.println("Overall main returning");
}
private Actor me;
private int greetingCounter = 0;
public void run(Remote<ValueHolder<String>> vh) {
this.greet((String) vh.syncProxy(IValueHolder.class).get());
vh.syncVoid((v) -> v.set("World"));
Actor.current().every(1000, () -> {
if (greetingCounter >= 3) Actor.current().stop();
this.greet(vh.sync(ValueHolder::get).await());
});
public void run(Turn t, Ref vh) {
me = t.getActor();
t.message_(vh, new IValueHolder.Get(t.ref(new Entity() {
public void message_(Turn t1, Object v1) {
Main.this.greet((String) v1);
t1.message_(vh, new IValueHolder.Set<>("World"));
t1.every(1000, t -> {
if (greetingCounter >= 3) t.quit();
t.message_(vh, new IValueHolder.Get(t.ref(new Entity() {
public void message_(Turn t2, Object v2) {
Main.this.greet((String) v2);
}
})));
});
}
})));
}
public void greet(String who) {
Actor.log().info((greetingCounter++) + ": Hi " + who);
me.log().info((greetingCounter++) + ": Hi " + who);
}
}

View File

@ -1,17 +1,23 @@
package org.syndicate_lang.actors.example.example1;
public class ValueHolder<T> implements IValueHolder<T> {
import org.syndicate_lang.actors.Entity;
import org.syndicate_lang.actors.Turn;
public class ValueHolder<T> extends Entity {
private T value;
public ValueHolder(T initialValue) {
this.value = initialValue;
}
public T get() {
return this.value;
}
public void set(T newValue) {
this.value = newValue;
@Override
public void message_(Turn turn, Object body) {
if (body instanceof IValueHolder.Get op) {
turn.message_(op.k(), this.value);
} else if (body instanceof IValueHolder.Set) {
@SuppressWarnings("unchecked")
IValueHolder.Set<T> op = (IValueHolder.Set<T>) body;
this.value = op.newValue();
}
}
}

View File

@ -1,25 +1,31 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Entity;
import org.syndicate_lang.actors.Ref;
import org.syndicate_lang.actors.Turn;
public class Forwarder implements IForwarder {
private final Remote<IForwarder> _main;
public class Forwarder extends Entity {
private final Ref _main;
private final int _nRounds;
private Remote<IForwarder> _peer = null;
private Ref _peer = null;
public Forwarder(Remote<IForwarder> main, int nRounds) {
public Forwarder(Ref main, int nRounds) {
this._main = main;
this._nRounds = nRounds;
}
@Override
public void setPeer(Remote<IForwarder> peer) {
this._peer = peer;
public void retract_(Turn turn, Long handle) {
turn.quit();
}
@Override
public void handleMessage(final int hopCount) {
Remote<IForwarder> target = hopCount >= this._nRounds - 1 ? _main : _peer;
target.async(f -> f.handleMessage(hopCount + 1));
public void message_(Turn turn, Object body) {
if (body instanceof IForwarder.HandleMessage op) {
Ref target = op.hopCount() >= this._nRounds - 1 ? _main : _peer;
turn.message_(target, new IForwarder.HandleMessage(op.hopCount() + 1));
} else if (body instanceof IForwarder.SetPeer op) {
this._peer = op.peer();
}
}
}

View File

@ -1,8 +1,8 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Ref;
public interface IForwarder {
void setPeer(Remote<IForwarder> peer);
void handleMessage(int hopCount);
record SetPeer(Ref peer) implements IForwarder {}
record HandleMessage(int hopCount) implements IForwarder {}
}

View File

@ -0,0 +1,3 @@
package org.syndicate_lang.actors.example.example2;
public record ILink() {}

View File

@ -1,18 +1,21 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Entity;
import org.syndicate_lang.actors.Ref;
import org.syndicate_lang.actors.Turn;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
public class Main implements IForwarder {
public class Main extends Entity {
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
Actor.forObject(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await();
new Actor()
.scheduleTurn(t -> new Main(parseInt(args[0]), parseInt(args[1])).boot(t))
.daemonize();
Actor.awaitAll();
}
@ -27,44 +30,44 @@ public class Main implements IForwarder {
this._remainingToReceive = nActors;
}
public void boot() {
Actor.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
final List<Remote<IForwarder>> _actors = new ArrayList<>();
final Remote<IForwarder> me = Actor.ref(this);
Remote<IForwarder> previous = null;
public void boot(Turn t) {
t.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
final List<Ref> _forwarders = new ArrayList<>();
final Ref me = t.ref(this);
Ref previous = null;
for (int i = 0; i < _nActors; i++) {
Remote<IForwarder> current = Actor.forObject(new Forwarder(me, this._nRounds));
current.getActor().link();
_actors.add(current);
Ref current = Actor.forEntity(new Forwarder(me, this._nRounds));
t.assert_(current, new ILink());
_forwarders.add(current);
if (previous != null) {
final var p = previous;
current.async(f -> f.setPeer(p));
t.message_(current, new IForwarder.SetPeer(previous));
}
previous = current;
}
_actors.get(0).async(f -> f.setPeer(_actors.get(_nActors - 1)));
Actor.log().info("Start");
this._startTime = System.currentTimeMillis();
_actors.forEach(a -> a.async(f -> f.handleMessage(0)));
t.message_(_forwarders.get(0), new IForwarder.SetPeer(_forwarders.get(_nActors - 1)));
t.later(t0 -> {
t0.log().info("Start");
this._startTime = System.currentTimeMillis();
_forwarders.forEach(a -> t0.message_(a, new IForwarder.HandleMessage(0)));
});
}
@Override
public void setPeer(Remote<IForwarder> peer) {
// Do nothing.
}
@Override
public void handleMessage(int hopCount) {
this._remainingToReceive--;
if (this._remainingToReceive == 0) {
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
long nMessages = (long) _nActors * (long) _nRounds;
double hz = nMessages / delta;
Actor.current().stop();
Actor.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
nMessages,
delta,
hz));
public void message_(Turn turn, Object body) {
if (body instanceof IForwarder.HandleMessage) {
this._remainingToReceive--;
if (this._remainingToReceive == 0) {
double delta = (System.currentTimeMillis() - this._startTime) / 1000.0;
long nMessages = (long) _nActors * (long) _nRounds;
double hz = nMessages / delta;
turn.quit();
turn.log().info(String.format("Stop after %d messages; %.1f seconds, %.1f Hz",
nMessages,
delta,
hz));
}
} else if (body instanceof IForwarder.SetPeer) {
// Do nothing.
}
}
}