一文搞懂响应式编程( 三 )

我们再举一个generate的例子
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)如上代码所示,generate需要一个Callable参数,而且是supplier (即没有输入值,只有一个输出)
另一个参数是BiFunction (前面我们也介绍过,需要两个输入值,一个输出值) 。BiFunction中的其中一个输入值是SynchronousSink,下面我们给出一个generate创建Flux的示例 。
Flux.generate( () -> 0, //提供一个初始状态值0 (i, sink) -> {sink.next("3*" + i + "=" + 3 * i);//使用初始值去生产一个3的乘法if (i > 9) sink.complete();//设置停止条件return i + 1;//返回一个新的状态值,以便在下一次的生产中使用,除非响应序列终止}).subscribe(System.out::println);下面我们在看一个Flux嵌套处理示例:
需求:将字符串去空格,并去重,然后排序输出 。
String str = "qa ws ed rf tg yh uj i k ol p za sx dc vf bg hn jm k loi yt ";Flux.fromArray(str.split(" "))//通过数组创建Flux.flatMap(i -> Flux.fromArray(i.split(""))).distinct() // 去重.sort() //排序.subscribe(System.out::print);//flatMap与Stream中的flatMap类似,接受Function作为参数,输入一个值,输出一个值,此处输出均为Publisher,以上就是Flux和Mono的一些简单介绍,同时Ractor也支持JDK中的FlowPubliser 和FlowSubscriber与 Reactor中的publisher, subscriber的适配等.
4. WebFluxSpringBoot 2之后支持的Reactive响应式编程 。
关于Reactive技术栈和经典的Servlet技术栈对比,Spring官网的这张图比较清晰 。

一文搞懂响应式编程

文章插图
 
Spring响应式编程主要依赖于Reactor第三方库,即上面讲的Flux和Mono的库 。
WebFlux主要有以下几个要点:
  • 反应式栈web框架
  • 完全异步非阻塞
  • 运行?.NETty,undertow,Servlet3.1 + 容器
  • 核心反应式库 Reactor
  • 返回 Flux 或Mono
  • 支持注解和函数编程两种编程模式
Spring WebFlux示例下面我们给出几个SpringBoot 的响应式web示例 。
可以去https://start.spring.io/ 新建webflux的项目也可以 。
项目中的主要依赖就是spring-boot-starter-webflux
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>基于注解的WebFlux:以下是一个最简单的基于注解的WebFlux
@GetMapping("/hello/mono1")public Mono<String> mono(){return Mono.just("Hello Mono -Java North");}@GetMapping("/hello/flux1")public Flux<String> flux(){return Flux.just("Hello Flux","Hello Java North");}基于函数式编程的WebFlux:创建RouterFunction,将其注入到Spring中即可 。
@Beanpublic RouterFunction<ServerResponse> testRoutes1() {return RouterFunctions.route().GET("/flux/function", new HandlerFunction<ServerResponse>() {@Overridepublic Mono<ServerResponse> handle(ServerRequest request) {return ServerResponse.ok().bodyValue("hello web flux , Hello Java North");}}).build();}//上面的方法使用函数式编程替换之后如下@Beanpublic RouterFunction<ServerResponse> testRoutes() {return RouterFunctions.route().GET("/flux/function",request -> ServerResponse.ok().bodyValue("Hello web flux , Hello Java North")).build();}Flux与Mono的响应式编程延迟示例下面我们编写一段返回Mono的响应式Web服务 。
@GetMapping("/hello/mono")public Mono<String> stringMono(){Mono<String> from = Mono.fromSupplier(() -> {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "Hello, Spring Reactivedate time:"+ LocalDateTime.now();});System.out.println( "thread : " + Thread.currentThread().getName()+ " ===" + LocalDateTime.now() +"==========Mono function complete==========");return from;}使用postman请求如下,5秒钟后返回数据 。后台却在5秒中之前已经处理完整个方法 。
一文搞懂响应式编程

文章插图
 
后台打印日志:
一文搞懂响应式编程

文章插图
 
再看一组Flux
@GetMapping(value = https://www.isolves.com/it/cxkf/bk/2022-07-15/"/hello/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux flux1(){Flux


推荐阅读