syndicate-java/src/main/java/org/syndicate_lang/actors/Actor.java

277 lines
8.0 KiB
Java
Raw Normal View History

2020-12-04 18:57:47 +00:00
package org.syndicate_lang.actors;
2021-04-14 19:42:50 +00:00
import java.util.HashMap;
import java.util.Map;
2020-12-04 18:57:47 +00:00
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
2020-12-09 15:12:58 +00:00
import java.util.concurrent.atomic.AtomicReference;
2023-10-28 15:12:07 +00:00
import java.util.function.Consumer;
import java.util.function.Function;
2020-12-04 18:57:47 +00:00
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* I represent the shared execution concepts for a collection of objects; I am roughly analogous to the E concept of a Vat.
*/
public class Actor extends ForkJoinTask<Void> {
2020-12-04 18:57:47 +00:00
private final static AtomicLong _count = new AtomicLong(0);
2020-12-04 22:25:40 +00:00
private final static AtomicLong _actorId = new AtomicLong(0);
protected final static ForkJoinPool _executor = new ForkJoinPool(
2023-10-28 16:41:10 +00:00
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true
);
2020-12-04 22:25:40 +00:00
protected final static ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
2020-12-04 18:57:47 +00:00
private final String _name;
private final Logger _logger;
final Turn _turn = new Turn(this);
2020-12-04 18:57:47 +00:00
private boolean _alive = true;
private Throwable _exitReason = null;
private boolean _isCounted = true;
2021-04-14 19:42:50 +00:00
private Map<Long, Ref> _outbound;
2020-12-04 18:57:47 +00:00
public final static class WorkItem extends AtomicReference<WorkItem> {
Consumer<Turn> work;
2020-12-09 15:12:58 +00:00
Runnable ifNotAlive;
public WorkItem(Consumer<Turn> work, Runnable ifNotAlive) {
2020-12-09 15:12:58 +00:00
this.work = work;
this.ifNotAlive = ifNotAlive;
}
public final void clear() {
this.work = null;
this.ifNotAlive = null;
}
}
private WorkItem head = new WorkItem(null, null);
private final AtomicReference<WorkItem> tail = new AtomicReference<>(head);
private final AtomicLong workItemCount = new AtomicLong(0);
2020-12-04 18:57:47 +00:00
public Actor() {
2021-04-14 19:42:50 +00:00
this(null, null);
2020-12-04 18:57:47 +00:00
}
public Actor(String debugName) {
2021-04-14 19:42:50 +00:00
this(debugName, null);
}
public Actor(String debugName, Map<Long, Ref> outbound) {
this._name = debugName == null ? "" + _actorId.incrementAndGet() : debugName;
2020-12-04 22:25:40 +00:00
this._logger = Logger.getLogger(this.getClass().getSimpleName() + "(" + this._name + ")");
2021-04-14 19:42:50 +00:00
this._outbound = outbound == null ? new HashMap<>() : outbound;
2020-12-04 18:57:47 +00:00
_count.incrementAndGet();
}
2021-04-14 19:42:50 +00:00
public static Ref forEntity(IEntity o) {
return new Actor().ref(o);
2020-12-04 18:57:47 +00:00
}
public static Promise<Ref> boot(Function<Turn, IEntity> f) {
2021-04-14 19:42:50 +00:00
final Promise<Ref> p = new Promise<>();
final Actor a = new Actor();
a.execute(new WorkItem(
t -> p.resolveCalling(() -> a.ref(f.apply(t))),
() -> p.rejectWith(new ActorTerminated(a))));
2020-12-05 22:50:41 +00:00
return p;
}
2020-12-04 18:57:47 +00:00
public String getName() {
return _name;
}
public Throwable getExitReason() {
return _exitReason;
}
public Logger log() {
2020-12-04 18:57:47 +00:00
return _logger;
}
public synchronized boolean isDaemon() {
return _alive && !_isCounted;
}
2023-10-28 18:01:39 +00:00
public void daemonize() {
this.scheduleTurn(_t -> this._releaseCount());
2020-12-04 18:57:47 +00:00
}
private void _releaseCount() {
if (_isCounted) {
_isCounted = false;
synchronized (_count) {
if (_count.decrementAndGet() == 0) {
_count.notifyAll();
}
}
}
}
2023-10-28 11:52:27 +00:00
void _injectOutbound(Long handle, Ref peer) {
synchronized (_outbound) {
_outbound.put(handle, peer);
}
2021-04-14 19:42:50 +00:00
}
2023-10-28 11:52:27 +00:00
Ref _lookupOutbound(Long handle) {
synchronized (_outbound) {
return _outbound.get(handle);
}
2021-04-14 19:42:50 +00:00
}
2023-10-28 11:52:27 +00:00
Ref _extractOutbound(Long handle) {
synchronized (_outbound) {
return _outbound.remove(handle);
}
2021-04-14 19:42:50 +00:00
}
2020-12-04 18:57:47 +00:00
public String toString() {
return super.toString() + "(" + this._name + ")";
}
2021-04-14 19:42:50 +00:00
public<T> Ref ref(IEntity o) {
return new Ref(this, o);
2020-12-04 18:57:47 +00:00
}
2023-10-28 18:01:39 +00:00
public Actor scheduleTurn(Consumer<Turn> f) {
this.execute(new WorkItem(f, null));
2023-10-28 18:01:39 +00:00
return this;
2023-10-28 15:12:07 +00:00
}
void _performTurn(Consumer<Turn> f) {
try {
f.accept(_turn);
_turn.commit();
} catch (Exception e) {
this.stop(false, e);
}
}
private void _performSync(WorkItem item) {
2020-12-04 18:57:47 +00:00
synchronized (this) {
_perform(item);
2020-12-04 18:57:47 +00:00
}
}
private void _perform(WorkItem item) {
2020-12-09 15:12:58 +00:00
if (!_alive) {
if (item.ifNotAlive != null) item.ifNotAlive.run();
2020-12-09 15:12:58 +00:00
} else {
this._performTurn(item.work);
2020-12-09 15:12:58 +00:00
}
}
2020-12-04 18:57:47 +00:00
public Promise<?> stop() {
2021-04-14 19:42:50 +00:00
return stop(true, null);
2020-12-04 18:57:47 +00:00
}
public Promise<?> stop(Throwable reason) {
2021-04-14 19:42:50 +00:00
return stop(true, reason);
}
public Promise<?> stop(boolean normally, Throwable reason) {
Promise<?> p = new Promise<>();
this.execute(new WorkItem(
_f -> { this._stop(normally, reason); p.resolve(); },
p::resolve));
return p;
2020-12-04 18:57:47 +00:00
}
private synchronized void _stop(boolean normally, Throwable reason) {
if (_alive) {
_alive = false;
_exitReason = reason;
if (normally) {
log().log(Level.FINE, "Actor stopped", reason);
2020-12-04 18:57:47 +00:00
} else {
log().log(Level.SEVERE, "Actor terminated with error", reason);
2020-12-04 18:57:47 +00:00
}
2023-10-28 15:12:07 +00:00
this._performTurn(t -> {
2023-10-28 11:52:27 +00:00
synchronized(_outbound) {
_outbound.forEach(t::_retract_);
}
});
2020-12-04 18:57:47 +00:00
_releaseCount();
}
}
2023-10-28 17:07:38 +00:00
synchronized void _runWorkItems() {
long batch = workItemCount.get();
while (batch > 0) {
for (int count = 0; count < batch; count++) {
WorkItem i = null;
while (i == null) i = head.get();
head = i;
this._perform(i);
i.clear();
}
batch = workItemCount.addAndGet(-batch);
}
}
2023-10-28 21:43:34 +00:00
@Override public final Void getRawResult() { return null; }
@Override public final void setRawResult(Void v) { }
@Override public final boolean exec() { this._runWorkItems(); return false; }
2023-10-28 19:00:53 +00:00
public void execute(WorkItem item) {
tail.getAndSet(item).set(item);
2020-12-09 20:04:20 +00:00
if (workItemCount.getAndIncrement() == 0) {
_executor.execute(this);
2020-12-09 20:04:20 +00:00
}
2020-12-04 18:57:47 +00:00
}
public void later(long delayMilliseconds, WorkItem item) {
2020-12-04 22:25:40 +00:00
if (delayMilliseconds == 0) {
this.execute(item);
2020-12-04 22:25:40 +00:00
} else {
_scheduledExecutor.schedule(
() -> this._performSync(item),
delayMilliseconds,
TimeUnit.MILLISECONDS);
2020-12-04 22:25:40 +00:00
}
2020-12-04 18:57:47 +00:00
}
public PeriodicTimer every(long initialDelayMilliseconds, long periodMilliseconds, Consumer<Turn> f) {
final var self = this;
final var callback = new Runnable() {
PeriodicTimer timer;
public void run() {
self._performSync(new WorkItem(f, this.timer::cancel));
}
};
callback.timer = new PeriodicTimer(
_scheduledExecutor.scheduleAtFixedRate(
callback,
initialDelayMilliseconds,
periodMilliseconds,
TimeUnit.MILLISECONDS));
return callback.timer;
2020-12-04 18:57:47 +00:00
}
2020-12-04 22:25:40 +00:00
public static void awaitAll() throws InterruptedException {
2020-12-04 18:57:47 +00:00
while (_count.get() > 0) {
synchronized (_count) {
_count.wait(1000);
}
}
_executor.shutdown();
2020-12-04 22:25:40 +00:00
_scheduledExecutor.shutdown();
2020-12-09 15:12:58 +00:00
//noinspection ResultOfMethodCallIgnored
2020-12-04 22:25:40 +00:00
_executor.awaitTermination(5, TimeUnit.MINUTES);
2020-12-09 15:12:58 +00:00
//noinspection ResultOfMethodCallIgnored
2020-12-04 22:25:40 +00:00
_scheduledExecutor.awaitTermination(5, TimeUnit.MINUTES);
}
public static void convenientLogging() {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$s %3$s %5$s%6$s%n");
2020-12-04 18:57:47 +00:00
}
public boolean isAlive() {
return _alive;
}
2020-12-04 18:57:47 +00:00
}