WebFlux是一种响应式编程框架,是一种异步非阻塞式的开发方式,使用传统的Web框架,SpringMVC,基于老版本Servlet不支持非阻塞方式使用,在Servlet3.1之后,可以运行WebFlux。
WebFlux核心基于Reactor相关API实现,但是也支持在Servlet3.1之后的版本运行,也可以在netty等容器中运行。
- 同步和异步:当调用者发送请求后,等待回应,则被称为异步
- 阻塞和非阻塞:针对被调用者而言,收到请求后,直接反馈被称为非阻塞,处理结束后进行反馈是非阻塞
优势:
- 采用异步框架可以提升吞吐量提高性能,以Reactor为基础实现响应式编程
- 函数式编程,使用WebFlux函数式编程实现路由请求
SpringMVC是命令式编程,一行行进行执行,都可以使用注解方式运行,都可以使用Tomcat、Jetty等容器中。远程父调用中使用WebFlux,在网关中也可以使用WebFlux进行处理。
响应式编程
RP是一种面向数据流和变化传播的编程范式,类似于Excel表格中的sum(a1+b1)而不是一个具体的值。
Java 8中采用了观察者模式进行处理(Observe、和Observable)
观察者模式:会对数据本身进行变化监控,如果数据发生变化,会进行操作。观察者模式和订阅模式反正一直说不清楚,主要我觉得区别就是观察者模式是强耦合的一种模式,而订阅模式被订阅者并不关心自己被订阅了多少人。
package com.example.demowebflux.reactor8;
import java.util.Observable;
public class ObserverDemo extends Observable {
public static void main(String[] args) {
ObserverDemo observerDemo = new ObserverDemo();
observerDemo.addObserver((o, arg) -> {
System.out.println("数据出现了变化");
});
observerDemo.addObserver((o, arg) -> {
System.out.println("手动被观察者通知,准备变化");
});
// 数据变化
observerDemo.setChanged();
// 通知
observerDemo.notifyObservers();
}
}
Java 9中采用的是Flow类,这个类被Reactor进行了封装,最终形成了netty,实现了响应式编程,是严格的订阅模式。
Flow.Publisher<String> Publisher=subsriber->{
subsriber.onNext("1");
subsriber.onNext("2");
subsriber.onError(new RuntimeException("error"));
subsriber.onComplete();
}
Reactor实现
响应式编程操作中,Reactor是满足Reactive规范框架,Reactor中有两个核心类,Mono和Flux,这两个接口中实现了接口Publisher,提供了丰富的操作符,Flux对象实现了发布者,返回了N个元素,Mono实现了发布者,只能返回0或者一个元素。
FLux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号(元素值、错误信号(终止)、完成信号(终止))
- 引入依赖
```xml
- 使用just或者是其他方式发送信号
```java package com.example.demowebflux.reactor8;
import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;
import java.util.Arrays; import java.util.List; import java.util.stream.Stream;
public class testReactor { public static void main(String[] args) { // reactor中的just方法,可以直接发送元素 Flux.just(1, 2, 3, 4); Mono.just(1); // 其他方法进行数据的传入 Integer[] array = {1, 2, 3, 4}; Flux.fromArray(array);
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);
}
} ```
- 三种信号
错误信号和完成信号都是终止信号不能共存
如果没有发送任何元素值,而是直接发送错误或者完成信号,则是空数据流
没有终止信号,表示是无限数据流
- 调用订阅方法
java
Flux.just(1, 2, 3, 4).subscribe(System.out::println);
Mono.just(1).subscribe(System.out::println);
-
操作符
-
map
要将元素映射为新的元素
-
flatMap
要将元素映射为流,最终返回一整个数据流
SpringWebflux执行流程和核心API
SpringWebflux基于Reactor,默认使用的是Netty框架,Netty是高性能NIO框架(异步非阻塞)。
阻塞方式(BIO)
非阻塞(NIO)通过注册实现多路复用的方式
SpringWebflux执行过程和SpringMVC类似
- 在SpringWebflux核心控制器DispatchHandler中,实现接口WebHandler
- 首先修改starter依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
-
SpringWebflux里面的DIspatcherHandler负责请求的处理
-
HandlerMapping:请求查询到的处理方法
- HandlerAdapter:真正负责请求处理
-
HandlerResultHandler:返回响应结果
-
SpringWebflux实现函数式编程,需要实现两个接口RouterFunction(路由处理)和HandlerFunction(函数业务处理)
-
SpringWebflux注解编程模型
-
创建Springboot引入依赖
```xml
org.springframework.boot spring-boot-starter-webflux <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
```
-
配置项目(这里使用的是yaml)
yaml server: port: 8082
-
创建接口和相关的类采用Springboot写法,分包entity、service、controller
首先编写entity,没有区别
```java package com.example.demowebflux1.entity;
import lombok.Data;
@Data public class User { private String name; private String gender; private Integer age;
public User(String name, String gender, Integer age) { this.name = name; this.gender = gender; this.age = age; }
}
```
编写服务接口和服务类,在服务实现的时候,需要注意使用Mono和Flux,这里使用一个成员变量Map来对数据进行临时记录,这个成员变量是单例的,所以多用户时需要注意。
```java package com.example.demowebflux1.service;
import com.example.demowebflux1.entity.User; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;
// Webflux中用户的操作接口 public interface UserService { Mono
getUserById(int id); // 返回所有用户时返回的不止一个参数 Flux<User> getAllUser(); // 添加用户 Mono<Void> saveUserInfo(Mono<User> user);
}
package com.example.demowebflux1.service.impl;
import com.example.demowebflux1.entity.User; import com.example.demowebflux1.service.UserService; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;
import java.util.HashMap; import java.util.Map;
@Service public class UserServiceImpl implements UserService {
// 创建一个Map数据存储器临时存储数据 private final Map<Integer, User> users = new HashMap<>(); // 默认构造方法中增加数据信息 public UserServiceImpl() { this.users.put(1, new User("lucy", "na", 20)); this.users.put(2, new User("lucy2", "na", 30)); } @Override public Mono<User> getUserById(int id) { return Mono.justOrEmpty(this.users.get(id)); } @Override public Flux<User> getAllUser() { return Flux.fromIterable(this.users.values()); } @Override public Mono<Void> saveUserInfo(Mono<User> userMono) { return userMono.doOnNext(user -> {
// 向map放值 Integer id = users.size() + 1; users.put(id, user); }).thenEmpty(Mono.empty()); } }
```
编写Controller,Springboot会自动返回JSON的字符串结果
```java package com.example.demowebflux1.controller;
import com.example.demowebflux1.entity.User; import com.example.demowebflux1.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;
@RestController public class UserController { @Autowired private UserService userService;
@RequestMapping("/get-user-id") public Mono<User> getUserById(Integer id) { return userService.getUserById(id); } @RequestMapping("/get-all-user") public Flux<User> getAllUser() { return userService.getAllUser(); } @RequestMapping("/saveUser") public Mono<Void> saveNewUser(User user) { Mono<User> userMono = Mono.just(user); return userService.saveUserInfo(userMono); }
}
```
-
SpringWebflux基于函数式编程模型
-
在函数式编程模型进行操作时,需要自己初始化服务器
-
使用函数式编程模型时,有两个核心接口(RouterFunction路由功能,转发请求给对应的Handler)和(HandlerFunction处理请求的响应函数)核心任务就是定义两个函数式接口的实现并启动需要的服务器。
-
SpringWebflux请求和响应不再是ServletRequest和ServletResponse,而是ServerRequest和ServerResponse
-
在新版本的Springboot中,对于函数式编程依旧不需要自己初始化服务器就可以实现了
```xml
org.springframework.boot spring-boot-starter-webflux ```
不需要更改Mono和Flux返回的service类,只需要删除Controller并且增加一个路由配置类。
```java e com.example.demowebflux1;
import com.example.demowebflux1.Handler.UserHandler; import com.example.demowebflux1.service.UserService; import com.example.demowebflux1.service.impl.UserServiceImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse;
import javax.xml.ws.spi.http.HttpHandler; @Configuration public class Server {
// 1. 创建Router路由 @Bean public RouterFunction<ServerResponse> routerFunction() { UserService userService = new UserServiceImpl(); UserHandler userHandler = new UserHandler(userService); return RouterFunctions.route().GET("/users/{id}", userHandler::getUserById). GET("/users", userHandler::getAllUser).POST("/saveUser", userHandler::saveUsers).build(); }
} ```
修改application入口main函数
```java package com.example.demowebflux1;
import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder;
@SpringBootApplication public class Demowebflux1Application {
public static void main(String[] args) { SpringApplication springApplication = new SpringApplicationBuilder(Demowebflux1Application.class).build(args); springApplication.setWebApplicationType(WebApplicationType.REACTIVE); springApplication.run(); }
}
```
使用Spring自带的webClient进行测试。
```java package com.example.demowebflux1;
import com.example.demowebflux1.entity.User; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient;
public class Client { public static void main(String[] args) { WebClient webClient = WebClient.create("http://localhost:8082");
String id = "2"; User user = webClient.get().uri("/users/" + id).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).block(); System.out.println(user.getName()); }
}
```