代码后是运行结果如下:
订阅者:S1,最大消费数据: 2 。推送数据:1 。休眠 3 秒 。订阅者:S3,最大消费数据: 6 。订阅者:S2,最大消费数据: 4 。订阅者:S2 接收到数据:1.订阅者:S3 接收到数据:1.订阅者:S1 接收到数据:1.订阅者:S4,最大消费数据: 10 。推送数据:2 。休眠 3 秒 。订阅者:S2 接收到数据:2.订阅者:S3 接收到数据:2.订阅者:S1 接收到数据:2.订阅者:S4 接收到数据:2.准备取消订阅者: S1 。已处理数据个数:2 。推送数据:3 。休眠 3 秒 。订阅者:S4 接收到数据:3.订阅者:S2 接收到数据:3.订阅者:S3 接收到数据:3.推送数据:4 。休眠 3 秒 。订阅者:S4 接收到数据:4.订阅者:S3 接收到数据:4.订阅者:S2 接收到数据:4.准备取消订阅者: S2 。已处理数据个数:4 。推送数据:5 。休眠 3 秒 。订阅者:S3 接收到数据:5.订阅者:S4 接收到数据:5.订阅者: S3 处理完成 。订阅者: S4 处理完成 。由于是异步执行,所以在“接收数据”部分的顺序可能不同 。
【JDK9响应式流使用详解】我们分析一下程序的执行流程 。
- 创建一个发布者实例
- 创建四个订阅者实例S1、S2、S3、S4,可以接收数据的数量分别为:2、4、6、10 。
- 前三个订阅者立即订阅消息 。
- S4的订阅者单独创建一个线程等待WAIT_TIME秒(2秒)之后进行数据的订阅 。
- 新建一个线程来以SLEEP_TIME秒(3秒)为间隔发布5个数据 。
- 将publish线程join()住等待流程结束 。
- S4在发送者推送数据"1"的时候还未订阅,所以S4没有接收到数据"1" 。
- 当发送数据"2"的时候S1已经接收够了预期数据2个,所以取消了订阅 。之后只剩下S2、S3、S4 。
- 当发送数据"4"的时候S2已经接收够了预期数据4个,所以取消了订阅 。之后只剩下S3、S4 。
- 当发送数据"5"的时候只剩下S3、S4,当发送完毕后publisher调用close()方法,通知S3、S4数据处理完成 。
最后本文中的例子是是简单的实现,可以通过调整订阅者中的request的参数,与在onNext中添加request调用来测试背压的效果,还可以将submit调整为offer并添加onDrop方法以观察抛弃信息时的流程 。同时本文没有提供Processor的例子,各位也可以自行学习 。
总结一下流程: 订阅者向发布者进行订阅,然后发布者向订阅者发送令牌 。订阅者使用令牌请求消息,发送者根据请求消息的数量推送消息 。订阅者可以随时异步追加需要的更多信息 。
JDK9中在Flow接口中实现了Java API的4个接口,并提供了SubmissionPublisher<T>作为Publisher<T>接口的简单实现 。
推荐阅读
- 常见的62种http响应代码整理
- apache-4-请求头和响应头
- CAD安装过程中遇到注册机无响应,怎么办?
- Django内置的响应类
- Linux 应急响应入门:入侵排查应该这样做
- Spring Boot 中如何统一 API 接口响应格式?
- Redis响应严重延迟,如何解决?
- 响应速度与智能化如何平衡,携程酒店搜索实践
- 响应式网页中的高度设计,你认真的吗?
- Spring Boot 使用拦截器优雅打印接口响应时间
