241 lines
6.4 KiB
Java
241 lines
6.4 KiB
Java
|
package org.syndicate_lang.actors;
|
||
|
|
||
|
import java.util.ArrayList;
|
||
|
import java.util.List;
|
||
|
import java.util.concurrent.*;
|
||
|
import java.util.function.Consumer;
|
||
|
import java.util.function.Function;
|
||
|
|
||
|
public class Promise<T> implements Future<T> {
|
||
|
public static enum State {
|
||
|
PENDING,
|
||
|
FULFILLED,
|
||
|
REJECTED
|
||
|
};
|
||
|
|
||
|
private volatile State _state = State.PENDING;
|
||
|
private T _value = null;
|
||
|
private Throwable _reason = null;
|
||
|
private List<Consumer<T>> _resolvers = null;
|
||
|
private List<Consumer<Throwable>> _rejecters = null;
|
||
|
|
||
|
public Promise() {}
|
||
|
|
||
|
public static<T> Promise<T> resolved() {
|
||
|
return resolved(null);
|
||
|
}
|
||
|
|
||
|
public static<T> Promise<T> resolved(T v) {
|
||
|
var p = new Promise<T>();
|
||
|
p.resolveWith(v);
|
||
|
return p;
|
||
|
}
|
||
|
|
||
|
public static<T> Promise<T> rejected(Throwable e) {
|
||
|
var p = new Promise<T>();
|
||
|
p.rejectWith(e);
|
||
|
return p;
|
||
|
}
|
||
|
|
||
|
public State getState() {
|
||
|
return _state;
|
||
|
}
|
||
|
|
||
|
public boolean isPending() {
|
||
|
return _state == State.PENDING;
|
||
|
}
|
||
|
|
||
|
public boolean isFulfilled() {
|
||
|
return _state == State.FULFILLED;
|
||
|
}
|
||
|
|
||
|
public boolean isRejected() {
|
||
|
return _state == State.REJECTED;
|
||
|
}
|
||
|
|
||
|
public T getValue() {
|
||
|
return _value;
|
||
|
}
|
||
|
|
||
|
public Throwable getReason() {
|
||
|
return _reason;
|
||
|
}
|
||
|
|
||
|
public void resolve() {
|
||
|
this.resolveWith(null);
|
||
|
}
|
||
|
|
||
|
public synchronized void resolveWith(T t) {
|
||
|
if (this.isPending()) {
|
||
|
this._value = t;
|
||
|
this._state = State.FULFILLED;
|
||
|
var worklist = _resolvers;
|
||
|
_resolvers = null;
|
||
|
_rejecters = null;
|
||
|
if (worklist != null) {
|
||
|
for (var callback : worklist) {
|
||
|
callback.accept(t);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public void chain(Promise<T> t) {
|
||
|
if (this == t) {
|
||
|
throw new IllegalArgumentException("cannot chain promise immediately to itself");
|
||
|
}
|
||
|
t.whenFulfilled((v) -> this.resolveWith(v));
|
||
|
t.whenRejected((e) -> this.rejectWith(e));
|
||
|
}
|
||
|
|
||
|
public void rejectWith(Throwable e) {
|
||
|
if (this.isPending()) {
|
||
|
this._reason = e;
|
||
|
this._state = State.REJECTED;
|
||
|
var worklist = _rejecters;
|
||
|
_resolvers = null;
|
||
|
_rejecters = null;
|
||
|
if (worklist != null) {
|
||
|
for (var callback : worklist) {
|
||
|
callback.accept(e);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public<R> Promise<R> andThen(Function<T, R> ok) {
|
||
|
return this.andThen(ok, null);
|
||
|
}
|
||
|
|
||
|
public synchronized<R> Promise<R> andThen(final Function<T, R> ok, final Function<Throwable, R> fail) {
|
||
|
Actor a0 = Actor.current();
|
||
|
final Actor a = a0 != null ? a0 : new Actor();
|
||
|
|
||
|
Promise<R> p = new Promise<>();
|
||
|
this.whenFulfilled((t) -> a.execute(() -> {
|
||
|
try {
|
||
|
p.resolveWith(ok.apply(t));
|
||
|
} catch (Throwable e) {
|
||
|
p.rejectWith(e);
|
||
|
}
|
||
|
}, () -> p.rejectWith(new ActorTerminated(a))));
|
||
|
this.whenRejected((e) -> a.execute(() -> {
|
||
|
try {
|
||
|
if (fail == null) {
|
||
|
p.rejectWith(e);
|
||
|
} else {
|
||
|
p.resolveWith(fail.apply(e));
|
||
|
}
|
||
|
} catch (Throwable e2) {
|
||
|
p.rejectWith(e2);
|
||
|
}
|
||
|
}, () -> p.rejectWith(new ActorTerminated(a))));
|
||
|
return p;
|
||
|
}
|
||
|
|
||
|
private static<T> T unexpectedTimeout(TimeoutException e) {
|
||
|
throw new InternalError("await() without delay signalled TimeoutException", e);
|
||
|
}
|
||
|
|
||
|
public T await() {
|
||
|
try {
|
||
|
return this.await(-1);
|
||
|
} catch (TimeoutException e) {
|
||
|
return unexpectedTimeout(e);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public T await(long delayMilliseconds) throws TimeoutException {
|
||
|
try {
|
||
|
return _await(delayMilliseconds, TimeUnit.MILLISECONDS);
|
||
|
} catch (InterruptedException e) {
|
||
|
this.rejectWith(e);
|
||
|
throw new BrokenPromise(this);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public T _await(long delay, TimeUnit unit) throws TimeoutException, InterruptedException {
|
||
|
Semaphore s = new Semaphore(0);
|
||
|
this.whenFulfilled((_t) -> s.release());
|
||
|
this.whenRejected((_e) -> s.release());
|
||
|
if (delay == -1) {
|
||
|
s.acquire();
|
||
|
} else {
|
||
|
if (!s.tryAcquire(delay, unit)) {
|
||
|
throw new TimeoutException();
|
||
|
}
|
||
|
}
|
||
|
if (this.isFulfilled()) {
|
||
|
return this._value;
|
||
|
} else {
|
||
|
throw new BrokenPromise(this);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private synchronized void whenFulfilled(Consumer<T> callback) {
|
||
|
switch (this._state) {
|
||
|
case PENDING:
|
||
|
if (_resolvers == null) _resolvers = new ArrayList<>();
|
||
|
_resolvers.add(callback);
|
||
|
break;
|
||
|
case FULFILLED:
|
||
|
callback.accept(this._value);
|
||
|
break;
|
||
|
case REJECTED:
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private synchronized void whenRejected(Consumer<Throwable> callback) {
|
||
|
switch (this._state) {
|
||
|
case PENDING:
|
||
|
if (_rejecters == null) _rejecters = new ArrayList<>();
|
||
|
_rejecters.add(callback);
|
||
|
break;
|
||
|
case FULFILLED:
|
||
|
break;
|
||
|
case REJECTED:
|
||
|
callback.accept(this._reason);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Implementation of Future<T>
|
||
|
|
||
|
@Override
|
||
|
public boolean cancel(boolean b) {
|
||
|
rejectWith(new CancellationException());
|
||
|
return isRejected();
|
||
|
}
|
||
|
|
||
|
@Override
|
||
|
public boolean isCancelled() {
|
||
|
return isRejected();
|
||
|
}
|
||
|
|
||
|
@Override
|
||
|
public boolean isDone() {
|
||
|
return !isPending();
|
||
|
}
|
||
|
|
||
|
@Override
|
||
|
public T get() throws InterruptedException, ExecutionException {
|
||
|
try {
|
||
|
return _await(-1, TimeUnit.MILLISECONDS);
|
||
|
} catch (TimeoutException e) {
|
||
|
return unexpectedTimeout(e);
|
||
|
} catch (BrokenPromise e) {
|
||
|
throw new ExecutionException(e.getReason());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
@Override
|
||
|
public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
|
||
|
try {
|
||
|
return _await(l, timeUnit);
|
||
|
} catch (BrokenPromise e) {
|
||
|
throw new ExecutionException(e.getReason());
|
||
|
}
|
||
|
}
|
||
|
}
|