JDK9响应式流使用详解( 三 )

代码后是运行结果如下:
订阅者:S1,最大消费数据: 2 。推送数据:1 。休眠 3 秒 。订阅者:S3,最大消费数据: 6 。订阅者:S2,最大消费数据: 4 。订阅者:S2 接收到数据:1.订阅者:S3 接收到数据:1.订阅者:S1 接收到数据:1.订阅者:S4,最大消费数据: 10 。推送数据:2 。休眠 3 秒 。订阅者:S2 接收到数据:2.订阅者:S3 接收到数据:2.订阅者:S1 接收到数据:2.订阅者:S4 接收到数据:2.准备取消订阅者: S1 。已处理数据个数:2 。推送数据:3 。休眠 3 秒 。订阅者:S4 接收到数据:3.订阅者:S2 接收到数据:3.订阅者:S3 接收到数据:3.推送数据:4 。休眠 3 秒 。订阅者:S4 接收到数据:4.订阅者:S3 接收到数据:4.订阅者:S2 接收到数据:4.准备取消订阅者: S2 。已处理数据个数:4 。推送数据:5 。休眠 3 秒 。订阅者:S3 接收到数据:5.订阅者:S4 接收到数据:5.订阅者: S3 处理完成 。订阅者: S4 处理完成 。由于是异步执行,所以在“接收数据”部分的顺序可能不同 。
【JDK9响应式流使用详解】我们分析一下程序的执行流程 。

  • 创建一个发布者实例
  • 创建四个订阅者实例S1、S2、S3、S4,可以接收数据的数量分别为:2、4、6、10 。
  • 前三个订阅者立即订阅消息 。
  • S4的订阅者单独创建一个线程等待WAIT_TIME秒(2秒)之后进行数据的订阅 。
  • 新建一个线程来以SLEEP_TIME秒(3秒)为间隔发布5个数据 。
  • 将publish线程join()住等待流程结束 。
执行的日志满足上述流程而针对一些关键点为:
  • S4在发送者推送数据"1"的时候还未订阅,所以S4没有接收到数据"1" 。
  • 当发送数据"2"的时候S1已经接收够了预期数据2个,所以取消了订阅 。之后只剩下S2、S3、S4 。
  • 当发送数据"4"的时候S2已经接收够了预期数据4个,所以取消了订阅 。之后只剩下S3、S4 。
  • 当发送数据"5"的时候只剩下S3、S4,当发送完毕后publisher调用close()方法,通知S3、S4数据处理完成 。
需要注意的是,如果在最后submit完毕之后直接close()然后结束进行的话可能订阅者并不能执行完毕 。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以执行完毕的 。
最后本文中的例子是是简单的实现,可以通过调整订阅者中的request的参数,与在onNext中添加request调用来测试背压的效果,还可以将submit调整为offer并添加onDrop方法以观察抛弃信息时的流程 。同时本文没有提供Processor的例子,各位也可以自行学习 。
总结一下流程: 订阅者向发布者进行订阅,然后发布者向订阅者发送令牌 。订阅者使用令牌请求消息,发送者根据请求消息的数量推送消息 。订阅者可以随时异步追加需要的更多信息 。
JDK9中在Flow接口中实现了Java API的4个接口,并提供了SubmissionPublisher<T>作为Publisher<T>接口的简单实现 。




推荐阅读