From 956a509418b95d0d4af60fcf5a734d8854eaa41c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 4 Dec 2020 19:57:47 +0100 Subject: [PATCH] Initial commit --- .gitignore | 1 + .idea/.gitignore | 3 + .idea/misc.xml | 6 + .idea/modules.xml | 9 + .idea/uiDesigner.xml | 124 +++++++++ .idea/vcs.xml | 6 + .../java/org/syndicate_lang/actors/Actor.java | 178 +++++++++++++ .../actors/ActorTerminated.java | 14 + .../syndicate_lang/actors/BrokenPromise.java | 18 ++ .../syndicate_lang/actors/InternalError.java | 7 + .../syndicate_lang/actors/PeriodicTimer.java | 19 ++ .../org/syndicate_lang/actors/Promise.java | 240 ++++++++++++++++++ .../syndicate_lang/actors/ProxyFailure.java | 7 + .../org/syndicate_lang/actors/Remote.java | 85 +++++++ src/main/main.iml | 11 + .../actors/example/example1/IValueHolder.java | 6 + .../actors/example/example1/Main.java | 43 ++++ .../actors/example/example1/ValueHolder.java | 19 ++ src/test/test.iml | 13 + 19 files changed, 809 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/uiDesigner.xml create mode 100644 .idea/vcs.xml create mode 100644 src/main/java/org/syndicate_lang/actors/Actor.java create mode 100644 src/main/java/org/syndicate_lang/actors/ActorTerminated.java create mode 100644 src/main/java/org/syndicate_lang/actors/BrokenPromise.java create mode 100644 src/main/java/org/syndicate_lang/actors/InternalError.java create mode 100644 src/main/java/org/syndicate_lang/actors/PeriodicTimer.java create mode 100644 src/main/java/org/syndicate_lang/actors/Promise.java create mode 100644 src/main/java/org/syndicate_lang/actors/ProxyFailure.java create mode 100644 src/main/java/org/syndicate_lang/actors/Remote.java create mode 100644 src/main/main.iml create mode 100644 src/test/java/org/syndicate_lang/actors/example/example1/IValueHolder.java create mode 100644 src/test/java/org/syndicate_lang/actors/example/example1/Main.java create mode 100644 src/test/java/org/syndicate_lang/actors/example/example1/ValueHolder.java create mode 100644 src/test/test.iml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..89f9ac0 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +out/ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..1763e15 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..211fc4d --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java new file mode 100644 index 0000000..6ae74a7 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -0,0 +1,178 @@ +package org.syndicate_lang.actors; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat. + */ +public class Actor implements Executor { + private final static ThreadLocal _currentActor = new ThreadLocal<>(); + private final static AtomicLong _count = new AtomicLong(0); + protected final static ScheduledExecutorService _executor = Executors.newScheduledThreadPool(4); + + private final String _name; + private final Logger _logger; + + private boolean _alive = true; + private Throwable _exitReason = null; + private boolean _isCounted = true; + + public static Actor current() { + return _currentActor.get(); + } + + public static Logger log() { + return current().getLogger(); + } + + public static Remote ref(T o) { + return current().proxyFor(o); + } + + public Actor() { + this("" + System.currentTimeMillis()); + } + + public Actor(String debugName) { + this._name = debugName; + this._logger = Logger.getLogger(Actor.class.getCanonicalName() + "(" + this._name + ")"); + _count.incrementAndGet(); + } + + public static Remote forObject(T o) { + return new Actor().proxyFor(o); + } + + public String getName() { + return _name; + } + + public Throwable getExitReason() { + return _exitReason; + } + + public Logger getLogger() { + return _logger; + } + + public synchronized boolean isDaemon() { + return _alive && !_isCounted; + } + + public synchronized Actor daemonize() { + this._releaseCount(); + return this; + } + + private void _releaseCount() { + if (_isCounted) { + _isCounted = false; + synchronized (_count) { + if (_count.decrementAndGet() == 0) { + _count.notifyAll(); + } + } + } + } + + public String toString() { + return super.toString() + "(" + this._name + ")"; + } + + public Remote proxyFor(T o) { + return new Remote(this, o); + } + + private void _perform(Runnable work, Runnable ifNotAlive) { + synchronized (this) { + _currentActor.set(this); + try { + if (!_alive) { + if (ifNotAlive != null) ifNotAlive.run(); + } else { + try { + work.run(); + } catch (Throwable exn) { + this._stop(false, exn); + } + } + } finally { + _currentActor.set(null); + } + } + } + + public Promise stop() { + return stop(null); + } + + public Promise stop(Throwable reason) { + if (current() == this) { + this._stop(true, reason); + return Promise.resolved(); + } else { + Promise p = new Promise<>(); + this.execute(() -> { + this._stop(true, reason); + p.resolve(); + }, p::resolve); + return p; + } + } + + private synchronized void _stop(boolean normally, Throwable reason) { + if (_alive) { + _alive = false; + _exitReason = reason; + if (normally) { + getLogger().log(Level.INFO, "Actor stopped", reason); + } else { + getLogger().log(Level.SEVERE, "Actor terminated with error", reason); + } + _releaseCount(); + } + } + + @Override + public void execute(Runnable work) { + this.execute(work, null); + } + + public void execute(Runnable work, Runnable ifNotAlive) { + this.later(0, work, ifNotAlive); + } + + public void later(long delayMilliseconds, Runnable work) { + this.later(delayMilliseconds, work, null); + } + + public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) { + _executor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); + } + + public PeriodicTimer every(long periodMilliseconds, Runnable f) { + return every(0, periodMilliseconds, f); + } + + public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { + return new PeriodicTimer( + _executor.scheduleAtFixedRate( + () -> this._perform(f, null), + initialDelayMilliseconds, + periodMilliseconds, + TimeUnit.MILLISECONDS)); + } + + public static boolean awaitAll() throws InterruptedException { + while (_count.get() > 0) { + synchronized (_count) { + _count.wait(1000); + } + } + _executor.shutdown(); + return _executor.awaitTermination(5, TimeUnit.MINUTES); + } +} diff --git a/src/main/java/org/syndicate_lang/actors/ActorTerminated.java b/src/main/java/org/syndicate_lang/actors/ActorTerminated.java new file mode 100644 index 0000000..99a7729 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/ActorTerminated.java @@ -0,0 +1,14 @@ +package org.syndicate_lang.actors; + +public class ActorTerminated extends Exception { + private Actor _actor; + + public ActorTerminated(Actor actor) { + super("Actor terminated: " + actor, actor.getExitReason()); + this._actor = actor; + } + + public Actor getActor() { + return _actor; + } +} diff --git a/src/main/java/org/syndicate_lang/actors/BrokenPromise.java b/src/main/java/org/syndicate_lang/actors/BrokenPromise.java new file mode 100644 index 0000000..8a841e4 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/BrokenPromise.java @@ -0,0 +1,18 @@ +package org.syndicate_lang.actors; + +public class BrokenPromise extends RuntimeException { + private Promise _promise; + + public BrokenPromise(Promise promise) { + super(promise.getReason()); + this._promise = promise; + } + + public Promise getPromise() { + return _promise; + } + + public Throwable getReason() { + return _promise.getReason(); + } +} diff --git a/src/main/java/org/syndicate_lang/actors/InternalError.java b/src/main/java/org/syndicate_lang/actors/InternalError.java new file mode 100644 index 0000000..d5776a8 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/InternalError.java @@ -0,0 +1,7 @@ +package org.syndicate_lang.actors; + +public class InternalError extends RuntimeException { + public InternalError(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/syndicate_lang/actors/PeriodicTimer.java b/src/main/java/org/syndicate_lang/actors/PeriodicTimer.java new file mode 100644 index 0000000..0722d16 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/PeriodicTimer.java @@ -0,0 +1,19 @@ +package org.syndicate_lang.actors; + +import java.util.concurrent.ScheduledFuture; + +public class PeriodicTimer { + private ScheduledFuture _future; + + protected PeriodicTimer(ScheduledFuture future) { + this._future = future; + } + + public boolean isCancelled() { + return _future.isCancelled(); + } + + public void cancel() { + _future.cancel(false); + } +} diff --git a/src/main/java/org/syndicate_lang/actors/Promise.java b/src/main/java/org/syndicate_lang/actors/Promise.java new file mode 100644 index 0000000..1d1de0e --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/Promise.java @@ -0,0 +1,240 @@ +package org.syndicate_lang.actors; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.function.Consumer; +import java.util.function.Function; + +public class Promise implements Future { + public static enum State { + PENDING, + FULFILLED, + REJECTED + }; + + private volatile State _state = State.PENDING; + private T _value = null; + private Throwable _reason = null; + private List> _resolvers = null; + private List> _rejecters = null; + + public Promise() {} + + public static Promise resolved() { + return resolved(null); + } + + public static Promise resolved(T v) { + var p = new Promise(); + p.resolveWith(v); + return p; + } + + public static Promise rejected(Throwable e) { + var p = new Promise(); + p.rejectWith(e); + return p; + } + + public State getState() { + return _state; + } + + public boolean isPending() { + return _state == State.PENDING; + } + + public boolean isFulfilled() { + return _state == State.FULFILLED; + } + + public boolean isRejected() { + return _state == State.REJECTED; + } + + public T getValue() { + return _value; + } + + public Throwable getReason() { + return _reason; + } + + public void resolve() { + this.resolveWith(null); + } + + public synchronized void resolveWith(T t) { + if (this.isPending()) { + this._value = t; + this._state = State.FULFILLED; + var worklist = _resolvers; + _resolvers = null; + _rejecters = null; + if (worklist != null) { + for (var callback : worklist) { + callback.accept(t); + } + } + } + } + + public void chain(Promise t) { + if (this == t) { + throw new IllegalArgumentException("cannot chain promise immediately to itself"); + } + t.whenFulfilled((v) -> this.resolveWith(v)); + t.whenRejected((e) -> this.rejectWith(e)); + } + + public void rejectWith(Throwable e) { + if (this.isPending()) { + this._reason = e; + this._state = State.REJECTED; + var worklist = _rejecters; + _resolvers = null; + _rejecters = null; + if (worklist != null) { + for (var callback : worklist) { + callback.accept(e); + } + } + } + } + + public Promise andThen(Function ok) { + return this.andThen(ok, null); + } + + public synchronized Promise andThen(final Function ok, final Function fail) { + Actor a0 = Actor.current(); + final Actor a = a0 != null ? a0 : new Actor(); + + Promise p = new Promise<>(); + this.whenFulfilled((t) -> a.execute(() -> { + try { + p.resolveWith(ok.apply(t)); + } catch (Throwable e) { + p.rejectWith(e); + } + }, () -> p.rejectWith(new ActorTerminated(a)))); + this.whenRejected((e) -> a.execute(() -> { + try { + if (fail == null) { + p.rejectWith(e); + } else { + p.resolveWith(fail.apply(e)); + } + } catch (Throwable e2) { + p.rejectWith(e2); + } + }, () -> p.rejectWith(new ActorTerminated(a)))); + return p; + } + + private static T unexpectedTimeout(TimeoutException e) { + throw new InternalError("await() without delay signalled TimeoutException", e); + } + + public T await() { + try { + return this.await(-1); + } catch (TimeoutException e) { + return unexpectedTimeout(e); + } + } + + public T await(long delayMilliseconds) throws TimeoutException { + try { + return _await(delayMilliseconds, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + this.rejectWith(e); + throw new BrokenPromise(this); + } + } + + public T _await(long delay, TimeUnit unit) throws TimeoutException, InterruptedException { + Semaphore s = new Semaphore(0); + this.whenFulfilled((_t) -> s.release()); + this.whenRejected((_e) -> s.release()); + if (delay == -1) { + s.acquire(); + } else { + if (!s.tryAcquire(delay, unit)) { + throw new TimeoutException(); + } + } + if (this.isFulfilled()) { + return this._value; + } else { + throw new BrokenPromise(this); + } + } + + private synchronized void whenFulfilled(Consumer callback) { + switch (this._state) { + case PENDING: + if (_resolvers == null) _resolvers = new ArrayList<>(); + _resolvers.add(callback); + break; + case FULFILLED: + callback.accept(this._value); + break; + case REJECTED: + break; + } + } + + private synchronized void whenRejected(Consumer callback) { + switch (this._state) { + case PENDING: + if (_rejecters == null) _rejecters = new ArrayList<>(); + _rejecters.add(callback); + break; + case FULFILLED: + break; + case REJECTED: + callback.accept(this._reason); + break; + } + } + + // Implementation of Future + + @Override + public boolean cancel(boolean b) { + rejectWith(new CancellationException()); + return isRejected(); + } + + @Override + public boolean isCancelled() { + return isRejected(); + } + + @Override + public boolean isDone() { + return !isPending(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + return _await(-1, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + return unexpectedTimeout(e); + } catch (BrokenPromise e) { + throw new ExecutionException(e.getReason()); + } + } + + @Override + public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + try { + return _await(l, timeUnit); + } catch (BrokenPromise e) { + throw new ExecutionException(e.getReason()); + } + } +} diff --git a/src/main/java/org/syndicate_lang/actors/ProxyFailure.java b/src/main/java/org/syndicate_lang/actors/ProxyFailure.java new file mode 100644 index 0000000..af01894 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/ProxyFailure.java @@ -0,0 +1,7 @@ +package org.syndicate_lang.actors; + +public class ProxyFailure extends RuntimeException { + public ProxyFailure(Throwable t) { + super(t); + } +} diff --git a/src/main/java/org/syndicate_lang/actors/Remote.java b/src/main/java/org/syndicate_lang/actors/Remote.java new file mode 100644 index 0000000..c27fc1b --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/Remote.java @@ -0,0 +1,85 @@ +package org.syndicate_lang.actors; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.function.Consumer; +import java.util.function.Function; + +public class Remote implements InvocationHandler { + private final Actor _actor; + private final T _target; + + public Remote(Actor actor, T target) { + this._actor = actor; + this._target = target; + } + + public Actor getActor() { + return _actor; + } + + public void async(Consumer f) { + this.async(0, f); + } + + public void async(long delayMilliseconds, Consumer f) { + this._actor.later(delayMilliseconds, () -> f.accept(this._target)); + } + + public Promise syncVoid(Consumer f) { + return this.syncVoid(0, f); + } + + public Promise syncVoid(long delayMilliseconds, Consumer f) { + return this.sync(delayMilliseconds, (t) -> { + f.accept(t); + return null; + }); + } + + public Promise sync(Function f) { + return this.sync(0, f); + } + + public Promise sync(long delayMilliseconds, Function f) { + Promise p = new Promise<>(); + this._actor.later( + delayMilliseconds, + () -> p.resolveWith(f.apply(this._target)), + () -> p.rejectWith(this._actor.getExitReason())); + return p; + } + + @SuppressWarnings("unchecked") + public I proxy(Class c) { + if (!c.isInstance(this._target)) { + throw new IllegalArgumentException("target is not an instance of " + c); + } + return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, this); + } + + @SuppressWarnings("unchecked") + public static Remote from(I proxy) { + return (Remote) Proxy.getInvocationHandler(proxy); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + return this.sync((v) -> { + try { + return method.invoke(v, args); + } catch (IllegalAccessException e) { + throw new ProxyFailure(e); + } catch (InvocationTargetException e) { + throw new ProxyFailure(e.getCause()); + } + }).await(); + } + + @Override + public String toString() { + return super.toString() + "(" + this._actor.getName() + "::" + this._target + ")"; + } +} diff --git a/src/main/main.iml b/src/main/main.iml new file mode 100644 index 0000000..908ad4f --- /dev/null +++ b/src/main/main.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/java/org/syndicate_lang/actors/example/example1/IValueHolder.java b/src/test/java/org/syndicate_lang/actors/example/example1/IValueHolder.java new file mode 100644 index 0000000..c43b337 --- /dev/null +++ b/src/test/java/org/syndicate_lang/actors/example/example1/IValueHolder.java @@ -0,0 +1,6 @@ +package org.syndicate_lang.actors.example.example1; + +public interface IValueHolder { + public T get(); + public void set(T newValue); +} diff --git a/src/test/java/org/syndicate_lang/actors/example/example1/Main.java b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java new file mode 100644 index 0000000..8ce9cc9 --- /dev/null +++ b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java @@ -0,0 +1,43 @@ +package org.syndicate_lang.actors.example.example1; + +import org.syndicate_lang.actors.Actor; +import org.syndicate_lang.actors.PeriodicTimer; +import org.syndicate_lang.actors.Remote; + +import java.util.concurrent.atomic.AtomicReference; + +public class Main { + public static void main(String[] args) throws InterruptedException { + final var vh = Actor.forObject(new ValueHolder<>("There")); + vh.getActor().daemonize(); + final var m = Actor.forObject(new Main()); + m.async(10, (m_) -> m_.run(vh)); + + @SuppressWarnings("unchecked") + IValueHolder vv = vh.proxy(IValueHolder.class); + + System.out.println("Value: " + vv.get()); + vv.set("Second"); + System.out.println("Value: " + vv.get()); + System.out.println("Underlying: " + Remote.from(vv)); + + Actor.awaitAll(); + System.out.println("Overall main returning"); + } + + private int greetingCounter = 0; + + public void run(Remote> vh) { + this.greet((String) vh.proxy(IValueHolder.class).get()); + vh.syncVoid((v) -> v.set("World")); + AtomicReference timer = new AtomicReference<>(); + timer.set(Actor.current().every(1000, () -> { + if (greetingCounter >= 3) Actor.current().stop(); + this.greet(vh.sync((v) -> v.get()).await()); + })); + } + + public void greet(String who) { + Actor.log().info((greetingCounter++) + ": Hi " + who); + } +} diff --git a/src/test/java/org/syndicate_lang/actors/example/example1/ValueHolder.java b/src/test/java/org/syndicate_lang/actors/example/example1/ValueHolder.java new file mode 100644 index 0000000..a90b349 --- /dev/null +++ b/src/test/java/org/syndicate_lang/actors/example/example1/ValueHolder.java @@ -0,0 +1,19 @@ +package org.syndicate_lang.actors.example.example1; + +import java.util.concurrent.Future; + +public class ValueHolder implements IValueHolder { + private T value; + + public ValueHolder(T initialValue) { + this.value = initialValue; + } + + public T get() { + return this.value; + } + + public void set(T newValue) { + this.value = newValue; + } +} diff --git a/src/test/test.iml b/src/test/test.iml new file mode 100644 index 0000000..bb46440 --- /dev/null +++ b/src/test/test.iml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file