Chain to Publisher and Subscriber
reactive
Pub/Sub Chain
@Slf4j
public class PubSubChain {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> mapPub1 = mapPub(pub, (Function<Integer, Integer>) s -> s * 10);
Publisher<Integer> mapPub2 = mapPub(mapPub1, (Function<Integer, Integer>) s -> -s);
mapPub2.subscribe(logSub());
}
// Chain
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}
});
}
};
}
private static Publisher<Integer> iterPub(List<Integer> iter) {
Publisher<Integer> pub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
// onSubscribe 는 무조건 호출되어야 하는 메서드: subscribe 하는 즉시 호출해줘야 함
// Subscription: Publisher, Subscriber 둘 사이의 구독이 한번 일어난다는 의미
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(s -> sub.onNext(s));
// publisher 가 notify 완료 시 onComplete 또는 onError 를 처리해야 한다.
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
/**
* Subscriber 에서 Subscription 객체의 cancel()을 호출할 수 있다.
* 더이상 데이터를 통지받지 않겠다고 알림
*/
@Override
public void cancel() {
}
});
}
};
return pub;
}
private static Subscriber<Integer> logSub() {
Subscriber<Integer> sub = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// Subscription 의 request 를 요청해야한다.
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer i) {
log.debug("onNext:{}", i);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
return sub;
}
}
Refactoring
- DelegateSubscriber
public class DelegateSubscriber<T> implements Subscriber<Integer> {
private Subscriber sub;
public DelegateSubscriber(Subscriber<? super T> sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(T i) {
sub.onNext(i);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
- GenericPubSub
@Slf4j
public class GenericPubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10);
mapPub.subscribe(logSub());
}
private static <T> Publisher<T> mapPub(Publisher<T> pub, Function<T, T> f) {
return new Publisher<T>() {
@Override
public void subscribe(Subscriber<? super T> sub) {
pub.subscribe(new DelegateSubscriber<T>(sub) {
@Override
public void onNext(T i) {
sub.onNext(f.apply(i));
}
});
}
};
}
private static <T> Subscriber<T> logSub() {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T i) {
log.debug("onNext:{}", i);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
}
private static Publisher<Integer> iterPub(List<Integer> iter) {
Publisher<Integer> pub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(s -> sub.onNext(s));
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
return pub;
}
}