First broad step toward novy

This commit is contained in:
Tony Garnock-Jones 2021-04-14 21:42:50 +02:00
parent 2ad5ac75ee
commit af611e1b2b
13 changed files with 242 additions and 292 deletions

View File

@ -1,38 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public abstract class AbstractProxy<T> implements InvocationHandler {
protected final Remote<T> _ref;
public AbstractProxy(Remote<T> ref) {
this._ref = ref;
}
public Remote<T> ref() {
return this._ref;
}
abstract boolean isSync();
private static Method toStringMethod;
static {
try {
toStringMethod = Object.class.getMethod("toString");
} catch (NoSuchMethodException e) {
toStringMethod = null;
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
if (method.equals(toStringMethod)) {
return this._ref.toString();
}
return dispatch(method, args);
}
abstract Object dispatch(Method method, Object[] args);
}

View File

@ -1,11 +1,10 @@
package org.syndicate_lang.actors;
import java.util.HashSet;
import java.util.Set;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -24,9 +23,7 @@ public class Actor implements Executor {
private boolean _alive = true;
private Throwable _exitReason = null;
private boolean _isCounted = true;
private Set<Actor> _links = null;
private Set<Remote<Consumer<Actor>>> _monitors = null;
private Consumer<Actor> _exitTrap = null;
private Map<Long, Ref> _outbound;
private final static class WorkItem extends AtomicReference<WorkItem> {
Runnable work;
@ -48,21 +45,26 @@ public class Actor implements Executor {
private final AtomicLong workItemCount = new AtomicLong(0);
public Actor() {
this("" + _actorId.incrementAndGet());
this(null, null);
}
public Actor(String debugName) {
this._name = debugName;
this(debugName, null);
}
public Actor(String debugName, Map<Long, Ref> outbound) {
this._name = debugName == null ? "" + _actorId.incrementAndGet() : debugName;
this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")");
this._outbound = outbound == null ? new HashMap<>() : outbound;
_count.incrementAndGet();
}
public static<T> Remote<T> forObject(T o) {
public static Ref forEntity(IEntity o) {
return new Actor().ref(o);
}
public static<T> Promise<Remote<T>> boot(ThrowingSupplier<T> f) {
final Promise<Remote<T>> p = new Promise<>();
public static Promise<Ref> boot(ThrowingSupplier<IEntity> f) {
final Promise<Ref> p = new Promise<>();
final Actor a = new Actor();
a.execute(
() -> p.resolveCalling(() -> a.ref(f.get())),
@ -102,12 +104,24 @@ public class Actor implements Executor {
}
}
public void _injectOutbound(Long handle, Ref peer) {
_outbound.put(handle, peer);
}
public Ref _lookupOutbound(Long handle) {
return _outbound.get(handle);
}
public Ref _extractOutbound(Long handle) {
return _outbound.remove(handle);
}
public String toString() {
return super.toString() + "(" + this._name + ")";
}
public<T> Remote<T> ref(T o) {
return new Remote<>(this, o);
public<T> Ref ref(IEntity o) {
return new Ref(this, o);
}
private void _performSync(Runnable work, Runnable ifNotAlive) {
@ -129,13 +143,16 @@ public class Actor implements Executor {
}
public Promise<?> stop() {
return stop(null);
return stop(true, null);
}
public Promise<?> stop(Throwable reason) {
return stop(true, reason);
}
public Promise<?> stop(boolean normally, Throwable reason) {
Promise<?> p = new Promise<>();
this.execute(() -> {
this._stop(true, reason);
this._stop(normally, reason);
p.resolve();
}, p::resolve);
return p;
@ -150,20 +167,7 @@ public class Actor implements Executor {
} else {
log().log(Level.SEVERE, "Actor terminated with error", reason);
}
Set<Actor> linkedPeers = _links;
if (linkedPeers != null) {
_links = null;
for (var peer : linkedPeers) {
peer.notifyExit(this);
}
}
Set<Remote<Consumer<Actor>>> monitoringPeers = _monitors;
if (monitoringPeers != null) {
_monitors = null;
for (var handler : monitoringPeers) {
handler.async(Consumer::accept);
}
}
Turn.forActor(this, t -> _outbound.forEach(t::_retract_), true);
_releaseCount();
}
}
@ -240,60 +244,4 @@ public class Actor implements Executor {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
}
public void link(Actor peer) {
this.linkPeer(peer);
peer.linkPeer(this);
}
public void unlink(Actor peer) {
this.unlinkPeer(peer);
peer.unlinkPeer(this);
}
private synchronized void linkPeer(Actor peer) {
if (this._alive) {
if (_links == null) _links = new HashSet<>();
_links.add(peer);
} else {
peer.notifyExit(this);
}
}
private void notifyExit(final Actor exitingPeer) {
this.execute(() -> {
this.unlinkPeer(exitingPeer);
if (this._exitTrap != null) {
this._exitTrap.accept(exitingPeer);
} else {
this._stop(exitingPeer.getExitReason() == null, new ActorTerminated(exitingPeer));
}
});
}
private synchronized void unlinkPeer(Actor peer) {
if (_links != null) {
_links.remove(peer);
if (_links.isEmpty()) {
_links = null;
}
}
}
public void trapExits(Consumer<Actor> handler) {
this._exitTrap = handler;
}
public void monitor(Actor peer, Consumer<Actor> handler) {
peer.installMonitor(this.ref(handler));
}
private synchronized void installMonitor(Remote<Consumer<Actor>> handler) {
if (this._alive) {
if (_monitors == null) _monitors = new HashSet<>();
_monitors.add(handler);
} else {
handler.async(Consumer::accept);
}
}
}

View File

@ -1,35 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class AsyncProxy<T> extends AbstractProxy<T> {
public AsyncProxy(Remote<T> ref) {
super(ref);
}
@Override
public Object dispatch(Method method, Object[] args) {
if (method.getReturnType().equals(void.class)) {
this._ref.async((v, _ac) -> {
try {
method.invoke(v, args);
} catch (IllegalAccessException e) {
throw new ProxyFailure(e);
} catch (InvocationTargetException e) {
throw new ProxyFailure(e.getCause());
}
});
return null;
} else {
System.err.println(method.getReturnType());
throw new UnsupportedOperationException(
"Cannot invoke non-void-returning method '" + method + "' asynchronously via AsyncProxy");
}
}
@Override
boolean isSync() {
return false;
}
}

View File

@ -0,0 +1,20 @@
package org.syndicate_lang.actors;
public class Entity implements IEntity {
@Override
public void assert_(Turn turn, Object assertion, Long handle) {
}
@Override
public void retract_(Turn turn, Long handle) {
}
@Override
public void sync_(Turn turn, Ref peer) {
turn.message_(peer, true);
}
@Override
public void message_(Turn turn, Object body) {
}
}

View File

@ -0,0 +1,8 @@
package org.syndicate_lang.actors;
public interface IEntity {
void assert_(Turn turn, Object assertion, Long handle);
void retract_(Turn turn, Long handle);
void sync_(Turn turn, Ref peer);
void message_(Turn turn, Object body);
}

View File

@ -0,0 +1,24 @@
package org.syndicate_lang.actors;
public class Ref {
private final Actor _actor;
private final IEntity _target;
public Ref(Actor actor, IEntity target) {
this._actor = actor;
this._target = target;
}
public Actor getActor() {
return _actor;
}
IEntity getEntity() {
return _target;
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")";
}
}

View File

@ -1,86 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.Proxy;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
public class Remote<T> {
private final Actor _actor;
private final T _target;
public Remote(Actor actor, T target) {
this._actor = actor;
this._target = target;
}
public Actor getActor() {
return _actor;
}
public void async(BiConsumer<T, Actor> f) {
this._actor.execute(() -> f.accept(this._target, this._actor));
}
public void async(long delayMilliseconds, BiConsumer<T, Actor> f) {
this._actor.later(delayMilliseconds, () -> f.accept(this._target, this._actor));
}
public Promise<?> syncVoid(BiConsumer<T, Actor> f) {
return this.sync((t, ac) -> {
f.accept(t, ac);
return null;
});
}
public Promise<?> syncVoid(long delayMilliseconds, BiConsumer<T, Actor> f) {
return this.sync(delayMilliseconds, (t, ac) -> {
f.accept(t, ac);
return null;
});
}
public<R> Promise<R> sync(BiFunction<T, Actor, R> f) {
Promise<R> p = new Promise<>();
this._actor.execute(
() -> p.resolveWith(f.apply(this._target, this._actor)),
() -> p.rejectWith(this._actor.getExitReason()));
return p;
}
public<R> Promise<R> sync(long delayMilliseconds, BiFunction<T, Actor, R> f) {
Promise<R> p = new Promise<>();
this._actor.later(
delayMilliseconds,
() -> p.resolveWith(f.apply(this._target, this._actor)),
() -> p.rejectWith(this._actor.getExitReason()));
return p;
}
private<I> void checkTargetInstance(Class<I> c) {
if (!c.isInstance(this._target)) {
throw new IllegalArgumentException("target is not an instance of " + c);
}
}
@SuppressWarnings("unchecked")
public<I> I syncProxy(Class<I> c) {
checkTargetInstance(c);
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new SyncProxy<>(this));
}
@SuppressWarnings("unchecked")
public<I> I asyncProxy(Class<I> c) {
checkTargetInstance(c);
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new AsyncProxy<>(this));
}
@SuppressWarnings("unchecked")
public static<I, T extends I> Remote<T> from(I proxy) {
return ((AbstractProxy<T>) Proxy.getInvocationHandler(proxy)).ref();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")";
}
}

View File

@ -1,28 +0,0 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class SyncProxy<T> extends AbstractProxy<T> {
public SyncProxy(Remote<T> ref) {
super(ref);
}
@Override
public Object dispatch(Method method, Object[] args) {
return this._ref.sync((v, _ac) -> {
try {
return method.invoke(v, args);
} catch (IllegalAccessException e) {
throw new ProxyFailure(e);
} catch (InvocationTargetException e) {
throw new ProxyFailure(e.getCause());
}
}).await();
}
@Override
boolean isSync() {
return true;
}
}

View File

@ -0,0 +1,137 @@
package org.syndicate_lang.actors;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class Turn {
private final static AtomicLong nextHandle = new AtomicLong(0);
private final Actor _actor;
private Map<Actor, List<Consumer<Turn>>> _pending = null;
private boolean _complete = false;
public static void forActor(Actor a, Consumer<Turn> f) {
Turn.forActor(a, f, false);
}
public static void forActor(Actor a, Consumer<Turn> f, boolean zombieTurn) {
if ((a.getExitReason() == null) == zombieTurn) return;
Turn t = new Turn(a);
try {
f.accept(t);
t.commit();
} catch (Exception e) {
a.stop(false, e);
}
}
private Turn(Actor a) {
this._actor = a;
}
public Actor getActor() {
return _actor;
}
private void commit() {
if (_pending != null) {
_pending.forEach((ac, q) -> ac.execute(() -> q.forEach(f -> Turn.forActor(ac, f))));
_pending = null;
}
_complete = true;
}
private void enqueue(Actor target, Consumer<Turn> action) {
if (_complete) throw new IllegalStateException("Attempt to reuse a committed Turn");
if (_pending == null) _pending = new HashMap<>();
_pending.computeIfAbsent(target, k -> new LinkedList<>()).add(action);
}
public void freshen(Consumer<Turn> action) {
if (!_complete) throw new IllegalStateException(("Attempt to freshen a non-stale Turn"));
Turn.forActor(this._actor, action);
}
public Ref ref(IEntity o) {
return _actor.ref(o);
}
public void spawn(Consumer<Turn> bootProc) {
this.spawn(bootProc, new HashSet<>());
}
public void spawn(Consumer<Turn> bootProc, Set<Long> initialAssertions) {
this.enqueue(this._actor, t -> {
Map<Long, Ref> newOutbound = new HashMap<>();
initialAssertions.forEach(k -> newOutbound.put(k, this._actor._extractOutbound(k)));
Actor newActor = new Actor(null, newOutbound);
newActor.execute(() -> Turn.forActor(newActor, bootProc));
});
}
public void quit() {
this._actor.stop();
}
public void crash(Throwable e) {
this._actor.stop(false, e);
}
public Long assert_(Ref target, Object assertion) {
Long h = Turn.nextHandle.getAndIncrement();
_assert_(target, assertion, h);
return h;
}
private void _assert_(Ref target, Object assertion, Long h) {
Object a = assertion; // TODO: runRewrites from target
if (a != null) {
this.enqueue(target.getActor(), t -> {
_actor._injectOutbound(h, target);
target.getEntity().assert_(t, a, h);
});
}
}
public void retract_(Long h) {
if (h == null) return;
Ref peer = this._actor._lookupOutbound(h);
if (peer == null) return;
_retract_(h, peer);
}
public Long replace_(Ref peer, Long h, Object assertion) {
var newHandle = assert_(peer, assertion);
retract_(h);
return newHandle;
}
void _retract_(Long handle, Ref peer) {
this.enqueue(peer.getActor(), t -> {
this._actor._extractOutbound(handle);
peer.getEntity().retract_(t, handle);
});
}
public Promise<Turn> sync_(Ref peer) {
Promise<Turn> p = new Promise<>();
this._sync_(peer, this.ref(new Entity() {
public void message_(Turn t, Object _message) {
p.resolveWith(t);
}
}));
return p;
}
private void _sync_(Ref peer, Ref callback) {
this.enqueue(peer.getActor(), t -> peer.getEntity().sync_(t, callback));
}
public void message_(Ref peer, Object body) {
Object a = body; // TODO runRewrites
if (a != null) {
this.enqueue(peer.getActor(), t -> peer.getEntity().message_(t, body));
}
}
}

View File

@ -1,14 +1,14 @@
package org.syndicate_lang.actors.example.example1;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Ref;
public class Main {
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
final var vh = Actor.forObject(new ValueHolder<>("There"));
final var vh = Actor.forEntity(new ValueHolder<>("There"));
vh.getActor().daemonize();
final var m = Actor.forObject(new Main());
final var m = Actor.forEntity(new Main());
m.async(10, (m_, ac) -> m_.run(ac, vh));
@SuppressWarnings("unchecked")
@ -17,7 +17,7 @@ public class Main {
System.out.println("Value: " + vv.get());
vv.set("Second");
System.out.println("Value: " + vv.get());
System.out.println("Underlying: " + Remote.from(vv));
System.out.println("Underlying: " + Ref.from(vv));
Actor.awaitAll();
System.out.println("Overall main returning");
@ -26,7 +26,7 @@ public class Main {
private Actor me;
private int greetingCounter = 0;
public void run(Actor me, Remote<ValueHolder<String>> vh) {
public void run(Actor me, Ref<ValueHolder<String>> vh) {
this.me = me;
this.greet((String) vh.syncProxy(IValueHolder.class).get());
vh.syncVoid((v, _ac) -> v.set("World"));

View File

@ -1,26 +1,26 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Ref;
public class Forwarder implements IForwarder {
private final Remote<IForwarder> _main;
private final Ref<IForwarder> _main;
private final int _nRounds;
private Remote<IForwarder> _peer = null;
private Ref<IForwarder> _peer = null;
public Forwarder(Remote<IForwarder> main, int nRounds) {
public Forwarder(Ref<IForwarder> main, int nRounds) {
this._main = main;
this._nRounds = nRounds;
}
@Override
public void setPeer(Remote<IForwarder> peer) {
public void setPeer(Ref<IForwarder> peer) {
this._peer = peer;
}
@Override
public void handleMessage(Actor _ac, final int hopCount) {
Remote<IForwarder> target = hopCount >= this._nRounds - 1 ? _main : _peer;
Ref<IForwarder> target = hopCount >= this._nRounds - 1 ? _main : _peer;
target.async((f, ac) -> f.handleMessage(ac, hopCount + 1));
}
}

View File

@ -1,9 +1,9 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Ref;
public interface IForwarder {
void setPeer(Remote<IForwarder> peer);
void setPeer(Ref<IForwarder> peer);
void handleMessage(Actor ac, int hopCount);
}

View File

@ -1,7 +1,7 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.Remote;
import org.syndicate_lang.actors.Ref;
import java.util.ArrayList;
import java.util.List;
@ -12,7 +12,7 @@ public class Main implements IForwarder {
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
Actor.forObject(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await();
Actor.forEntity(new Main(parseInt(args[0]), parseInt(args[1]))).syncVoid(Main::boot).await();
Actor.awaitAll();
}
@ -29,11 +29,11 @@ public class Main implements IForwarder {
public void boot(Actor ac) {
ac.log().info("Available processors: " + Runtime.getRuntime().availableProcessors());
final List<Remote<IForwarder>> _actors = new ArrayList<>();
final Remote<IForwarder> me = ac.ref(this);
Remote<IForwarder> previous = null;
final List<Ref<IForwarder>> _actors = new ArrayList<>();
final Ref<IForwarder> me = ac.ref(this);
Ref<IForwarder> previous = null;
for (int i = 0; i < _nActors; i++) {
Remote<IForwarder> current = Actor.forObject(new Forwarder(me, this._nRounds));
Ref<IForwarder> current = Actor.forEntity(new Forwarder(me, this._nRounds));
ac.link(current.getActor());
_actors.add(current);
if (previous != null) {
@ -49,7 +49,7 @@ public class Main implements IForwarder {
}
@Override
public void setPeer(Remote<IForwarder> peer) {
public void setPeer(Ref<IForwarder> peer) {
// Do nothing.
}