This commit is contained in:
Tony Garnock-Jones 2020-12-04 23:25:40 +01:00
parent 956a509418
commit a333659741
11 changed files with 246 additions and 36 deletions

View File

@ -0,0 +1,7 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
</ScalaCodeStyleSettings>
</code_scheme>
</component>

View File

@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

View File

@ -0,0 +1,38 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public abstract class AbstractProxy<T> implements InvocationHandler {
protected final Remote<T> _ref;
public AbstractProxy(Remote<T> ref) {
this._ref = ref;
}
public Remote<T> ref() {
return this._ref;
}
abstract boolean isSync();
private static Method toStringMethod;
static {
try {
toStringMethod = Object.class.getMethod("toString");
} catch (NoSuchMethodException e) {
toStringMethod = null;
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.equals(toStringMethod)) {
return this._ref.toString();
}
return dispatch(method, args);
}
abstract Object dispatch(Method method, Object[] args) throws Throwable;
}

View File

@ -11,7 +11,9 @@ import java.util.logging.Logger;
public class Actor implements Executor {
private final static ThreadLocal<Actor> _currentActor = new ThreadLocal<>();
private final static AtomicLong _count = new AtomicLong(0);
protected final static ScheduledExecutorService _executor = Executors.newScheduledThreadPool(4);
private final static AtomicLong _actorId = new AtomicLong(0);
protected final static ExecutorService _executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private final String _name;
private final Logger _logger;
@ -33,12 +35,12 @@ public class Actor implements Executor {
}
public Actor() {
this("" + System.currentTimeMillis());
this("" + _actorId.incrementAndGet());
}
public Actor(String debugName) {
this._name = debugName;
this._logger = Logger.getLogger(Actor.class.getCanonicalName() + "(" + this._name + ")");
this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")");
_count.incrementAndGet();
}
@ -128,7 +130,7 @@ public class Actor implements Executor {
_alive = false;
_exitReason = reason;
if (normally) {
getLogger().log(Level.INFO, "Actor stopped", reason);
getLogger().log(Level.FINE, "Actor stopped", reason);
} else {
getLogger().log(Level.SEVERE, "Actor terminated with error", reason);
}
@ -150,7 +152,11 @@ public class Actor implements Executor {
}
public void later(long delayMilliseconds, Runnable work, Runnable ifNotAlive) {
_executor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS);
if (delayMilliseconds == 0) {
_executor.execute(() -> this._perform(work, ifNotAlive));
} else {
_scheduledExecutor.schedule(() -> this._perform(work, ifNotAlive), delayMilliseconds, TimeUnit.MILLISECONDS);
}
}
public PeriodicTimer every(long periodMilliseconds, Runnable f) {
@ -159,20 +165,27 @@ public class Actor implements Executor {
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Runnable f) {
return new PeriodicTimer(
_executor.scheduleAtFixedRate(
_scheduledExecutor.scheduleAtFixedRate(
() -> this._perform(f, null),
initialDelayMilliseconds,
periodMilliseconds,
TimeUnit.MILLISECONDS));
}
public static boolean awaitAll() throws InterruptedException {
public static void awaitAll() throws InterruptedException {
while (_count.get() > 0) {
synchronized (_count) {
_count.wait(1000);
}
}
_executor.shutdown();
return _executor.awaitTermination(5, TimeUnit.MINUTES);
_scheduledExecutor.shutdown();
_executor.awaitTermination(5, TimeUnit.MINUTES);
_scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES);
}
public static void convenientLogging() {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
}
}

View File

@ -0,0 +1,35 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class AsyncProxy<T> extends AbstractProxy<T> {
public AsyncProxy(Remote<T> ref) {
super(ref);
}
@Override
public Object dispatch(Method method, Object[] args) throws Throwable {
if (method.getReturnType().equals(void.class)) {
this._ref.async((v) -> {
try {
method.invoke(v, args);
} catch (IllegalAccessException e) {
throw new ProxyFailure(e);
} catch (InvocationTargetException e) {
throw new ProxyFailure(e.getCause());
}
});
return null;
} else {
System.err.println(method.getReturnType());
throw new UnsupportedOperationException(
"Cannot invoke non-void-returning method '" + method + "' asynchronously via AsyncProxy");
}
}
@Override
boolean isSync() {
return false;
}
}

View File

@ -7,7 +7,7 @@ import java.lang.reflect.Proxy;
import java.util.function.Consumer;
import java.util.function.Function;
public class Remote<T> implements InvocationHandler {
public class Remote<T> {
private final Actor _actor;
private final T _target;
@ -52,34 +52,31 @@ public class Remote<T> implements InvocationHandler {
return p;
}
@SuppressWarnings("unchecked")
public<I> I proxy(Class<I> c) {
private<I> void checkTargetInstance(Class<I> c) {
if (!c.isInstance(this._target)) {
throw new IllegalArgumentException("target is not an instance of " + c);
}
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, this);
}
@SuppressWarnings("unchecked")
public<I> I syncProxy(Class<I> c) {
checkTargetInstance(c);
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new SyncProxy<>(this));
}
@SuppressWarnings("unchecked")
public<I> I asyncProxy(Class<I> c) {
checkTargetInstance(c);
return (I) Proxy.newProxyInstance(c.getClassLoader(), new Class[] { c }, new AsyncProxy<>(this));
}
@SuppressWarnings("unchecked")
public static<I, T extends I> Remote<T> from(I proxy) {
return (Remote<T>) Proxy.getInvocationHandler(proxy);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return this.sync((v) -> {
try {
return method.invoke(v, args);
} catch (IllegalAccessException e) {
throw new ProxyFailure(e);
} catch (InvocationTargetException e) {
throw new ProxyFailure(e.getCause());
}
}).await();
return ((AbstractProxy<T>) Proxy.getInvocationHandler(proxy)).ref();
}
@Override
public String toString() {
return super.toString() + "(" + this._actor.getName() + "::" + this._target + ")";
return this.getClass().getSimpleName() + "(" + this._actor.getName() + "::" + this._target + ")";
}
}

View File

@ -0,0 +1,29 @@
package org.syndicate_lang.actors;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class SyncProxy<T> extends AbstractProxy<T> {
public SyncProxy(Remote<T> ref) {
super(ref);
}
@Override
public Object dispatch(Method method, Object[] args) throws Throwable {
return this._ref.sync((v) -> {
try {
return method.invoke(v, args);
} catch (IllegalAccessException e) {
throw new ProxyFailure(e);
} catch (InvocationTargetException e) {
throw new ProxyFailure(e.getCause());
}
}).await();
}
@Override
boolean isSync() {
return true;
}
}

View File

@ -1,20 +1,18 @@
package org.syndicate_lang.actors.example.example1;
import org.syndicate_lang.actors.Actor;
import org.syndicate_lang.actors.PeriodicTimer;
import org.syndicate_lang.actors.Remote;
import java.util.concurrent.atomic.AtomicReference;
public class Main {
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
final var vh = Actor.forObject(new ValueHolder<>("There"));
vh.getActor().daemonize();
final var m = Actor.forObject(new Main());
m.async(10, (m_) -> m_.run(vh));
@SuppressWarnings("unchecked")
IValueHolder<String> vv = vh.proxy(IValueHolder.class);
IValueHolder<String> vv = vh.syncProxy(IValueHolder.class);
System.out.println("Value: " + vv.get());
vv.set("Second");
@ -28,13 +26,12 @@ public class Main {
private int greetingCounter = 0;
public void run(Remote<ValueHolder<String>> vh) {
this.greet((String) vh.proxy(IValueHolder.class).get());
this.greet((String) vh.syncProxy(IValueHolder.class).get());
vh.syncVoid((v) -> v.set("World"));
AtomicReference<PeriodicTimer> timer = new AtomicReference<>();
timer.set(Actor.current().every(1000, () -> {
Actor.current().every(1000, () -> {
if (greetingCounter >= 3) Actor.current().stop();
this.greet(vh.sync((v) -> v.get()).await());
}));
this.greet(vh.sync(ValueHolder::get).await());
});
}
public void greet(String who) {

View File

@ -0,0 +1,26 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import java.util.List;
public class Forwarder implements IForwarder {
private final int _index;
private final List<IForwarder> _actors;
private final IForwarder _main;
private final int _nRounds;
public Forwarder(int index, List<IForwarder> actors, IForwarder main, int nRounds) {
this._index = index;
this._actors = actors;
this._main = main;
this._nRounds = nRounds;
}
@Override
public void handleMessage(int hopCount) {
int index = (this._index + 1) % this._actors.size();
IForwarder target = hopCount >= this._nRounds - 1 ? _main : this._actors.get(index);
target.handleMessage(hopCount + 1);
}
}

View File

@ -0,0 +1,10 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
public interface IForwarder {
void handleMessage(int hopCount);
default void shutdown() {
Actor.current().stop();
}
}

View File

@ -0,0 +1,53 @@
package org.syndicate_lang.actors.example.example2;
import org.syndicate_lang.actors.Actor;
import java.util.ArrayList;
import java.util.List;
public class Main implements IForwarder {
private List<IForwarder> _actors;
public static void main(String[] args) throws InterruptedException {
Actor.convenientLogging();
Actor.forObject(new Main(1000000, 10)).syncVoid(Main::boot).await();
Actor.awaitAll();
}
public final int _nActors;
public final int _nRounds;
public int _remainingToReceive;
public Main(int nActors, int nRounds) {
this._nActors = nActors;
this._nRounds = nRounds;
this._remainingToReceive = nActors;
}
public void boot() {
this._actors = new ArrayList<>();
IForwarder me = Actor.ref(this).asyncProxy(IForwarder.class);
for (int i = 0; i < _nActors; i++) {
this._actors.add(Actor.forObject(
new Forwarder(i, this._actors, me, this._nRounds))
.asyncProxy(IForwarder.class));
// Actor.log().info(this._actors.get(this._actors.size()-1).toString());
}
Actor.log().info("Start");
this._actors.forEach((a) -> a.handleMessage(0));
}
@Override
public void handleMessage(int hopCount) {
this._remainingToReceive--;
// Actor.log().info(String.format("hopCount: %d, remainingToReceive: %d",
// hopCount,
// this._remainingToReceive));
if (this._remainingToReceive == 0) {
this._actors.forEach(IForwarder::shutdown);
this.shutdown();
Actor.log().info("Stop after " + (_nActors * _nRounds) + " messages");
}
}
}