diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000..919ce1f --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/src/main/java/org/syndicate_lang/actors/AbstractProxy.java b/src/main/java/org/syndicate_lang/actors/AbstractProxy.java new file mode 100644 index 0000000..ea0f0b3 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/AbstractProxy.java @@ -0,0 +1,38 @@ +package org.syndicate_lang.actors; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; + +public abstract class AbstractProxy implements InvocationHandler { + protected final Remote _ref; + + public AbstractProxy(Remote ref) { + this._ref = ref; + } + + public Remote ref() { + return this._ref; + } + + abstract boolean isSync(); + + private static Method toStringMethod; + + static { + try { + toStringMethod = Object.class.getMethod("toString"); + } catch (NoSuchMethodException e) { + toStringMethod = null; + } + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (method.equals(toStringMethod)) { + return this._ref.toString(); + } + return dispatch(method, args); + } + + abstract Object dispatch(Method method, Object[] args) throws Throwable; +} diff --git a/src/main/java/org/syndicate_lang/actors/Actor.java b/src/main/java/org/syndicate_lang/actors/Actor.java index 6ae74a7..7996bbb 100644 --- a/src/main/java/org/syndicate_lang/actors/Actor.java +++ b/src/main/java/org/syndicate_lang/actors/Actor.java @@ -11,7 +11,9 @@ import java.util.logging.Logger; public class Actor implements Executor { private final static ThreadLocal _currentActor = new ThreadLocal<>(); private final static AtomicLong _count = new AtomicLong(0); - protected final static ScheduledExecutorService _executor = Executors.newScheduledThreadPool(4); + private final static AtomicLong _actorId = new AtomicLong(0); + protected final static ExecutorService _executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); private final String _name; private final Logger _logger; @@ -33,12 +35,12 @@ public class Actor implements Executor { } public Actor() { - this("" + System.currentTimeMillis()); + this("" + _actorId.incrementAndGet()); } public Actor(String debugName) { this._name = debugName; - this._logger = Logger.getLogger(Actor.class.getCanonicalName() + "(" + this._name + ")"); + this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")"); _count.incrementAndGet(); } @@ -128,7 +130,7 @@ public class Actor implements Executor { _alive = false; _exitReason = reason; if (normally) { - getLogger().log(Level.INFO, "Actor stopped", reason); + getLogger().log(Level.FINE, "Actor stopped", reason); } else { getLogger().log(Level.SEVERE, "Actor terminated with error", reason); } @@ -150,7 +152,11 @@ public class Actor implements Executor { } public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) { - _executor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); + if (delayMilliseconds == 0) { + _executor.execute(() -> this._perform(work, ifNotAlive)); + } else { + _scheduledExecutor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS); + } } public PeriodicTimer every(long periodMilliseconds, Runnable f) { @@ -159,20 +165,27 @@ public class Actor implements Executor { public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) { return new PeriodicTimer( - _executor.scheduleAtFixedRate( + _scheduledExecutor.scheduleAtFixedRate( () -> this._perform(f, null), initialDelayMilliseconds, periodMilliseconds, TimeUnit.MILLISECONDS)); } - public static boolean awaitAll() throws InterruptedException { + public static void awaitAll() throws InterruptedException { while (_count.get() > 0) { synchronized (_count) { _count.wait(1000); } } _executor.shutdown(); - return _executor.awaitTermination(5, TimeUnit.MINUTES); + _scheduledExecutor.shutdown(); + _executor.awaitTermination(5, TimeUnit.MINUTES); + _scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES); + } + + public static void convenientLogging() { + System.setProperty("java.util.logging.SimpleFormatter.format", + "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n"); } } diff --git a/src/main/java/org/syndicate_lang/actors/AsyncProxy.java b/src/main/java/org/syndicate_lang/actors/AsyncProxy.java new file mode 100644 index 0000000..d8cc780 --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/AsyncProxy.java @@ -0,0 +1,35 @@ +package org.syndicate_lang.actors; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class AsyncProxy extends AbstractProxy { + public AsyncProxy(Remote ref) { + super(ref); + } + + @Override + public Object dispatch(Method method, Object[] args) throws Throwable { + if (method.getReturnType().equals(void.class)) { + this._ref.async((v) -> { + try { + method.invoke(v, args); + } catch (IllegalAccessException e) { + throw new ProxyFailure(e); + } catch (InvocationTargetException e) { + throw new ProxyFailure(e.getCause()); + } + }); + return null; + } else { + System.err.println(method.getReturnType()); + throw new UnsupportedOperationException( + "Cannot invoke non-void-returning method '" + method + "' asynchronously via AsyncProxy"); + } + } + + @Override + boolean isSync() { + return false; + } +} diff --git a/src/main/java/org/syndicate_lang/actors/Remote.java b/src/main/java/org/syndicate_lang/actors/Remote.java index c27fc1b..0076aa5 100644 --- a/src/main/java/org/syndicate_lang/actors/Remote.java +++ b/src/main/java/org/syndicate_lang/actors/Remote.java @@ -7,7 +7,7 @@ import java.lang.reflect.Proxy; import java.util.function.Consumer; import java.util.function.Function; -public class Remote implements InvocationHandler { +public class Remote { private final Actor _actor; private final T _target; @@ -52,34 +52,31 @@ public class Remote implements InvocationHandler { return p; } - @SuppressWarnings("unchecked") - public I proxy(Class c) { + private void checkTargetInstance(Class c) { if (!c.isInstance(this._target)) { throw new IllegalArgumentException("target is not an instance of " + c); } - return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, this); + } + + @SuppressWarnings("unchecked") + public I syncProxy(Class c) { + checkTargetInstance(c); + return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new SyncProxy<>(this)); + } + + @SuppressWarnings("unchecked") + public I asyncProxy(Class c) { + checkTargetInstance(c); + return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new AsyncProxy<>(this)); } @SuppressWarnings("unchecked") public static Remote from(I proxy) { - return (Remote) Proxy.getInvocationHandler(proxy); - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - return this.sync((v) -> { - try { - return method.invoke(v, args); - } catch (IllegalAccessException e) { - throw new ProxyFailure(e); - } catch (InvocationTargetException e) { - throw new ProxyFailure(e.getCause()); - } - }).await(); + return ((AbstractProxy) Proxy.getInvocationHandler(proxy)).ref(); } @Override public String toString() { - return super.toString() + "(" + this._actor.getName() + "::" + this._target + ")"; + return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")"; } } diff --git a/src/main/java/org/syndicate_lang/actors/SyncProxy.java b/src/main/java/org/syndicate_lang/actors/SyncProxy.java new file mode 100644 index 0000000..f9b1b8e --- /dev/null +++ b/src/main/java/org/syndicate_lang/actors/SyncProxy.java @@ -0,0 +1,29 @@ +package org.syndicate_lang.actors; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class SyncProxy extends AbstractProxy { + public SyncProxy(Remote ref) { + super(ref); + } + + @Override + public Object dispatch(Method method, Object[] args) throws Throwable { + return this._ref.sync((v) -> { + try { + return method.invoke(v, args); + } catch (IllegalAccessException e) { + throw new ProxyFailure(e); + } catch (InvocationTargetException e) { + throw new ProxyFailure(e.getCause()); + } + }).await(); + } + + @Override + boolean isSync() { + return true; + } +} diff --git a/src/test/java/org/syndicate_lang/actors/example/example1/Main.java b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java index 8ce9cc9..869996d 100644 --- a/src/test/java/org/syndicate_lang/actors/example/example1/Main.java +++ b/src/test/java/org/syndicate_lang/actors/example/example1/Main.java @@ -1,20 +1,18 @@ package org.syndicate_lang.actors.example.example1; import org.syndicate_lang.actors.Actor; -import org.syndicate_lang.actors.PeriodicTimer; import org.syndicate_lang.actors.Remote; -import java.util.concurrent.atomic.AtomicReference; - public class Main { public static void main(String[] args) throws InterruptedException { + Actor.convenientLogging(); final var vh = Actor.forObject(new ValueHolder<>("There")); vh.getActor().daemonize(); final var m = Actor.forObject(new Main()); m.async(10, (m_) -> m_.run(vh)); @SuppressWarnings("unchecked") - IValueHolder vv = vh.proxy(IValueHolder.class); + IValueHolder vv = vh.syncProxy(IValueHolder.class); System.out.println("Value: " + vv.get()); vv.set("Second"); @@ -28,13 +26,12 @@ public class Main { private int greetingCounter = 0; public void run(Remote> vh) { - this.greet((String) vh.proxy(IValueHolder.class).get()); + this.greet((String) vh.syncProxy(IValueHolder.class).get()); vh.syncVoid((v) -> v.set("World")); - AtomicReference timer = new AtomicReference<>(); - timer.set(Actor.current().every(1000, () -> { + Actor.current().every(1000, () -> { if (greetingCounter >= 3) Actor.current().stop(); - this.greet(vh.sync((v) -> v.get()).await()); - })); + this.greet(vh.sync(ValueHolder::get).await()); + }); } public void greet(String who) { diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java new file mode 100644 index 0000000..074a4fd --- /dev/null +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Forwarder.java @@ -0,0 +1,26 @@ +package org.syndicate_lang.actors.example.example2; + +import org.syndicate_lang.actors.Actor; + +import java.util.List; + +public class Forwarder implements IForwarder { + private final int _index; + private final List _actors; + private final IForwarder _main; + private final int _nRounds; + + public Forwarder(int index, List actors, IForwarder main, int nRounds) { + this._index = index; + this._actors = actors; + this._main = main; + this._nRounds = nRounds; + } + + @Override + public void handleMessage(int hopCount) { + int index = (this._index + 1) % this._actors.size(); + IForwarder target = hopCount >= this._nRounds - 1 ? _main : this._actors.get(index); + target.handleMessage(hopCount + 1); + } +} diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java new file mode 100644 index 0000000..c3bb01a --- /dev/null +++ b/src/test/java/org/syndicate_lang/actors/example/example2/IForwarder.java @@ -0,0 +1,10 @@ +package org.syndicate_lang.actors.example.example2; + +import org.syndicate_lang.actors.Actor; + +public interface IForwarder { + void handleMessage(int hopCount); + default void shutdown() { + Actor.current().stop(); + } +} diff --git a/src/test/java/org/syndicate_lang/actors/example/example2/Main.java b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java new file mode 100644 index 0000000..df82001 --- /dev/null +++ b/src/test/java/org/syndicate_lang/actors/example/example2/Main.java @@ -0,0 +1,53 @@ +package org.syndicate_lang.actors.example.example2; + +import org.syndicate_lang.actors.Actor; + +import java.util.ArrayList; +import java.util.List; + +public class Main implements IForwarder { + + private List _actors; + + public static void main(String[] args) throws InterruptedException { + Actor.convenientLogging(); + Actor.forObject(new Main(1000000, 10)).syncVoid(Main::boot).await(); + Actor.awaitAll(); + } + + public final int _nActors; + public final int _nRounds; + public int _remainingToReceive; + + public Main(int nActors, int nRounds) { + this._nActors = nActors; + this._nRounds = nRounds; + this._remainingToReceive = nActors; + } + + public void boot() { + this._actors = new ArrayList<>(); + IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class); + for (int i = 0; i < _nActors; i++) { + this._actors.add(Actor.forObject( + new Forwarder(i, this._actors, me, this._nRounds)) + .asyncProxy(IForwarder.class)); +// Actor.log().info(this._actors.get(this._actors.size()-1).toString()); + } + Actor.log().info("Start"); + this._actors.forEach((a) -> a.handleMessage(0)); + } + + @Override + public void handleMessage(int hopCount) { + this._remainingToReceive--; +// Actor.log().info(String.format("hopCount: %d, remainingToReceive: %d", +// hopCount, +// this._remainingToReceive)); + if (this._remainingToReceive == 0) { + this._actors.forEach(IForwarder::shutdown); + this.shutdown(); + Actor.log().info("Stop after " + (_nActors * _nRounds) + " messages"); + } + } +}