发布于 2年前

使用JDK 9 Flow API进行响应式编程

什么是响应式编程?

响应式编程是关于处理数据项的异步流,也就是应用程序在数据项发生时对其进行响应。 数据流实质上是指随时间发生的数据项序列。与迭代内存数据相比, 这个模型的内存效率更高,因为数据是以流的形式处理的。

在响应式编程模型中,有一个Publisher和一个Subscriber。 Publisher发布一个数据流,Subscriber异步订阅。

该模型还提供了一种机制来引入更高阶的函数,这样就可以使用Processor在流上操作。 Processor转换数据流不需要更改Publisher或Subscriber。 Processor(或一连串的Processor)位于Publisher和Subcriber之间来将一个数据流转换为另一个。 Publisher和Subcriber独立于发生在数据流中的转换。

为什么要响应式编程?

  • 更简洁的代码,使其更具可读性。
  • 从模板代码中抽离出来,专注于业务逻辑。
  • 从底层的线程,同步和并发问题中抽离出来。
  • 流处理意味着高效的内存
  • 该模型几乎可以应用于任何类型的问题。

JDK 9 Flow API

JDK 9中的Flow API与Reactive Streams Specification相对应,这是一个事实上的标准。 Reactive Streams Specification是标准化响应式编程的举措之一。 已经有几个实现支持Reactive Streams Specification。

Flow API(和Reactive Streams API)在某些方面是迭代器模式和观察者模式的组合。 迭代器模式是一个拉式(pull)的模型,应用程序从数据源拉取项目。 观察者模式是一个推送(push)模型,数据源的项目被推送到应用程序。 使用Flow API,应用程序最初请求(拉)N个项目,然后发布者将最多N个项目推送给订阅者。 所以说它是拉和推编程模型的混合。

过一遍Flow API 接口

@FunctionalInterface
public static interface Flow.Publisher<T> {
    public void  subscribe(Flow.Subscriber<? super T> subscriber);
}
  
public static interface Flow.Subscriber<T> {
    public void  onSubscribe(Flow.Subscription subscription);
    public void  onNext(T item);
    public void  onError(Throwable throwable);
    public void  onComplete();
}
  
public static interface Flow.Subscription {
    public void  request(long n);
    public void  cancel();
}
  
public static interface Flow.Processor<T,R>  extends Flow.Subscriber<T>, Flow.Publisher<R> {
}

订阅者Subscriber

Subscriber订阅Publisher的回调。 除非有请求,数据项目是不会被推送到订阅者,但可能会请求多个项目。 对于给定订阅(Subscription),调用Subscriber的方法是严格按顺序的。 应用程序可以响应订阅者上的以下回调。

onSubscribe

对于给定的订阅,在调用任何其他Subcriber方法之前调用此方法。

onNext

订阅下一个项目调用此方法

onError

在Publisher或Subcriber遇到不可恢复的错误时调用此方法,之后Subscription不会再调用Subscriber其他的方法。

如果Publisher遇到不允许将项目发送给Subscriber的错误,则Subscriber会收到onError消息,然后不会再收到其他消息。

onComplete

当已知不会再额外调用Subscriber的方法,且没有发生有错误而导致终止订阅,调用此方法。之后Subscription不会调用其它Subscriber的方法。

当知道没有更多的消息发送给它时,订阅者收到onComplete。

Subscriber示例

import java.util.concurrent.Flow.*;
...

public class MySubscriber<T> implements Subscriber<T> {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
    }

    @Override
    public void onNext(T item) {
        System.out.println("Got : " + item);
        subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
} 

发布者Publisher

发布者将数据流发布给注册的订阅者。 它通常使用Excutor异步发布项目给订阅者。 Publisher确保每个订阅的Subcriber方法严格按顺序调用。

使用JDK的SubmissionPublisher将数据流发布给订阅者的示例

import java.util.concurrent.SubmissionPublisher;
...

//Create Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

//Register Subscriber
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);

//Publish items
System.out.println("Publishing Items...");
String[] items = {"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close(); 

订阅Subscription

连接Flow.Publisher和Flow.Subscriber。 Subscriber只有在请求时才会收到项目,并可能随时通过Subscription取消订阅。

方法

  • request:将给定数量的n个项目添加到当前未完成的此订阅需求中。
  • cancel:导致Subscriber(最终)停止接收消息。

处理器Processor

充当Subscriber和Publisher的组件。 处理器位于Publisher和Subscriber之间,它把一个流转换为另一个。 可能有一个或多个链接在一起的处理器,链中最后处理器的结果由Subscriber处理。 JDK没有提供任何具体的处理器,因此需要单独编写任何需要的处理器。

将字符串转换为整数的处理器示例

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
...

public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {

    private Function function;
    private Subscription subscription;

    public MyTransformProcessor(Function<? super T, ? extends R> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        submit((R) function.apply(item));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        close();
    }
}  

使用处理器转换数据流的示例代码

import java.util.concurrent.SubmissionPublisher;
...

//Create Publisher  
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

//Create Processor and Subscriber  
MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));

MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s));

MySubscriber<Integer> subscriber = new MySubscriber<>();

//Chain Processor and Subscriber  
publisher.subscribe(filterProcessor);
filterProcessor.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);

System.out.println("Publishing Items...");
String[] items = {"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();

背压

当Publisher以比Subscriber所消费数据快得多的速率产生背压。 未处理项目的缓冲区的大小可能受到限制。 Flow API没有提供任何API来发信号或处理背压,但是可以有多种策略由自己来处理背压。 看看RxJava如何处理背压。

总结

将响应式编程的API添加到JDK 9是一个好的开始。 许多其他产品也开始提供响应式编程的API来访问其功能。 尽管Flow API允许程序员开始编写响应式程序,但是生态系统仍然需要发展。

例如,响应式程序可能仍然使用传统的API来访问数据库,也许是因为并不是所有的数据库都支持用于响应式编程的API。 即响应式程序依赖的API可能还不支持响应式编程模型。

©2020 edoou.com   京ICP备16001874号-3