使用JDK 9 Flow API进行响应式编程
什么是响应式编程?
响应式编程是关于处理数据项的异步流,也就是应用程序在数据项发生时对其进行响应。 数据流实质上是指随时间发生的数据项序列。与迭代内存数据相比, 这个模型的内存效率更高,因为数据是以流的形式处理的。
在响应式编程模型中,有一个Publisher和一个Subscriber。 Publisher发布一个数据流,Subscriber异步订阅。
该模型还提供了一种机制来引入更高阶的函数,这样就可以使用Processor在流上操作。 Processor转换数据流不需要更改Publisher或Subscriber。 Processor(或一连串的Processor)位于Publisher和Subcriber之间来将一个数据流转换为另一个。 Publisher和Subcriber独立于发生在数据流中的转换。
为什么要响应式编程?
- 更简洁的代码,使其更具可读性。
- 从模板代码中抽离出来,专注于业务逻辑。
- 从底层的线程,同步和并发问题中抽离出来。
- 流处理意味着高效的内存
- 该模型几乎可以应用于任何类型的问题。
JDK 9 Flow API
JDK 9中的Flow API与Reactive Streams Specification相对应,这是一个事实上的标准。 Reactive Streams Specification是标准化响应式编程的举措之一。 已经有几个实现支持Reactive Streams Specification。
Flow API(和Reactive Streams API)在某些方面是迭代器模式和观察者模式的组合。 迭代器模式是一个拉式(pull)的模型,应用程序从数据源拉取项目。 观察者模式是一个推送(push)模型,数据源的项目被推送到应用程序。 使用Flow API,应用程序最初请求(拉)N个项目,然后发布者将最多N个项目推送给订阅者。 所以说它是拉和推编程模型的混合。
过一遍Flow API 接口
@FunctionalInterface
public static interface Flow.Publisher<T> {
public void subscribe(Flow.Subscriber<? super T> subscriber);
}
public static interface Flow.Subscriber<T> {
public void onSubscribe(Flow.Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Flow.Subscription {
public void request(long n);
public void cancel();
}
public static interface Flow.Processor<T,R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}
订阅者Subscriber
Subscriber订阅Publisher的回调。 除非有请求,数据项目是不会被推送到订阅者,但可能会请求多个项目。 对于给定订阅(Subscription),调用Subscriber的方法是严格按顺序的。 应用程序可以响应订阅者上的以下回调。
onSubscribe
对于给定的订阅,在调用任何其他Subcriber方法之前调用此方法。
onNext
订阅下一个项目调用此方法
onError
在Publisher或Subcriber遇到不可恢复的错误时调用此方法,之后Subscription不会再调用Subscriber其他的方法。
如果Publisher遇到不允许将项目发送给Subscriber的错误,则Subscriber会收到onError消息,然后不会再收到其他消息。
onComplete
当已知不会再额外调用Subscriber的方法,且没有发生有错误而导致终止订阅,调用此方法。之后Subscription不会调用其它Subscriber的方法。
当知道没有更多的消息发送给它时,订阅者收到onComplete。
Subscriber示例
import java.util.concurrent.Flow.*;
...
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
发布者Publisher
发布者将数据流发布给注册的订阅者。 它通常使用Excutor异步发布项目给订阅者。 Publisher确保每个订阅的Subcriber方法严格按顺序调用。
使用JDK的SubmissionPublisher将数据流发布给订阅者的示例
import java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
//Publish items
System.out.println("Publishing Items...");
String[] items = {"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
订阅Subscription
连接Flow.Publisher和Flow.Subscriber。 Subscriber只有在请求时才会收到项目,并可能随时通过Subscription取消订阅。
方法
- request:将给定数量的n个项目添加到当前未完成的此订阅需求中。
- cancel:导致Subscriber(最终)停止接收消息。
处理器Processor
充当Subscriber和Publisher的组件。 处理器位于Publisher和Subscriber之间,它把一个流转换为另一个。 可能有一个或多个链接在一起的处理器,链中最后处理器的结果由Subscriber处理。 JDK没有提供任何具体的处理器,因此需要单独编写任何需要的处理器。
将字符串转换为整数的处理器示例
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
...
public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {
private Function function;
private Subscription subscription;
public MyTransformProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit((R) function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
使用处理器转换数据流的示例代码
import java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Create Processor and Subscriber
MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s));
MySubscriber<Integer> subscriber = new MySubscriber<>();
//Chain Processor and Subscriber
publisher.subscribe(filterProcessor);
filterProcessor.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
System.out.println("Publishing Items...");
String[] items = {"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
背压
当Publisher以比Subscriber所消费数据快得多的速率产生背压。 未处理项目的缓冲区的大小可能受到限制。 Flow API没有提供任何API来发信号或处理背压,但是可以有多种策略由自己来处理背压。 看看RxJava如何处理背压。
总结
将响应式编程的API添加到JDK 9是一个好的开始。 许多其他产品也开始提供响应式编程的API来访问其功能。 尽管Flow API允许程序员开始编写响应式程序,但是生态系统仍然需要发展。
例如,响应式程序可能仍然使用传统的API来访问数据库,也许是因为并不是所有的数据库都支持用于响应式编程的API。 即响应式程序依赖的API可能还不支持响应式编程模型。