Spring Webflux的学习

WebFlux是一种响应式编程框架,是一种异步非阻塞式的开发方式,使用传统的Web框架,SpringMVC,基于老版本Servlet不支持非阻塞方式使用,在Servlet3.1之后,可以运行WebFlux。

WebFlux核心基于Reactor相关API实现,但是也支持在Servlet3.1之后的版本运行,也可以在netty等容器中运行。

  • 同步和异步:当调用者发送请求后,等待回应,则被称为异步
  • 阻塞和非阻塞:针对被调用者而言,收到请求后,直接反馈被称为非阻塞,处理结束后进行反馈是非阻塞

优势:

  1. 采用异步框架可以提升吞吐量提高性能,以Reactor为基础实现响应式编程
  2. 函数式编程,使用WebFlux函数式编程实现路由请求

WebFLux和SpringMVC

SpringMVC是命令式编程,一行行进行执行,都可以使用注解方式运行,都可以使用Tomcat、Jetty等容器中。远程父调用中使用WebFlux,在网关中也可以使用WebFlux进行处理。

响应式编程

RP是一种面向数据流和变化传播的编程范式,类似于Excel表格中的sum(a1+b1)而不是一个具体的值。

Java 8中采用了观察者模式进行处理(Observe、和Observable)

观察者模式:会对数据本身进行变化监控,如果数据发生变化,会进行操作。观察者模式和订阅模式反正一直说不清楚,主要我觉得区别就是观察者模式是强耦合的一种模式,而订阅模式被订阅者并不关心自己被订阅了多少人。

package com.example.demowebflux.reactor8;

import java.util.Observable;

public class ObserverDemo extends Observable {
    public static void main(String[] args) {
        ObserverDemo observerDemo = new ObserverDemo();
        observerDemo.addObserver((o, arg) -> {
            System.out.println("数据出现了变化");
        });
        observerDemo.addObserver((o, arg) -> {
            System.out.println("手动被观察者通知,准备变化");
        });
//        数据变化
        observerDemo.setChanged();
//        通知
        observerDemo.notifyObservers();
    }
}

Java 9中采用的是Flow类,这个类被Reactor进行了封装,最终形成了netty,实现了响应式编程,是严格的订阅模式。

Flow.Publisher<String> Publisher=subsriber->{
    subsriber.onNext("1");
    subsriber.onNext("2");
    subsriber.onError(new RuntimeException("error"));
    subsriber.onComplete();
}

Reactor实现

响应式编程操作中,Reactor是满足Reactive规范框架,Reactor中有两个核心类,Mono和Flux,这两个接口中实现了接口Publisher,提供了丰富的操作符,Flux对象实现了发布者,返回了N个元素,Mono实现了发布者,只能返回0或者一个元素。

FLux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号(元素值、错误信号(终止)、完成信号(终止))

flux发生信号

mono信号

  1. 引入依赖

```xml

io.projectreactor reactor-core 3.4.8 ```

  1. 使用just或者是其他方式发送信号

```java package com.example.demowebflux.reactor8;

import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;

import java.util.Arrays; import java.util.List; import java.util.stream.Stream;

public class testReactor { public static void main(String[] args) { // reactor中的just方法,可以直接发送元素 Flux.just(1, 2, 3, 4); Mono.just(1); // 其他方法进行数据的传入 Integer[] array = {1, 2, 3, 4}; Flux.fromArray(array);

       List<Integer> list = Arrays.asList(array);
       Flux.fromIterable(list);

       Stream<Integer> stream = list.stream();
       Flux.fromStream(stream);

   }

} ```

  1. 三种信号

错误信号和完成信号都是终止信号不能共存

如果没有发送任何元素值,而是直接发送错误或者完成信号,则是空数据流

没有终止信号,表示是无限数据流

  1. 调用订阅方法

java Flux.just(1, 2, 3, 4).subscribe(System.out::println); Mono.just(1).subscribe(System.out::println);

  1. 操作符

  2. map

    要将元素映射为新的元素

    map处理

  3. flatMap

    要将元素映射为流,最终返回一整个数据流

    flatMap

SpringWebflux执行流程和核心API

SpringWebflux基于Reactor,默认使用的是Netty框架,Netty是高性能NIO框架(异步非阻塞)。

阻塞方式(BIO)

BIO操作

非阻塞(NIO)通过注册实现多路复用的方式

NIO

SpringWebflux执行过程和SpringMVC类似

  • 在SpringWebflux核心控制器DispatchHandler中,实现接口WebHandler
  • 首先修改starter依赖

xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>

具体执行过程

  1. SpringWebflux里面的DIspatcherHandler负责请求的处理

  2. HandlerMapping:请求查询到的处理方法

  3. HandlerAdapter:真正负责请求处理
  4. HandlerResultHandler:返回响应结果

  5. SpringWebflux实现函数式编程,需要实现两个接口RouterFunction(路由处理)和HandlerFunction(函数业务处理)

  6. SpringWebflux注解编程模型

  7. 创建Springboot引入依赖

    ```xml org.springframework.boot spring-boot-starter-webflux

          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
          </dependency>
      </dependencies>
    

    ```

  8. 配置项目(这里使用的是yaml)

    yaml server: port: 8082

  9. 创建接口和相关的类采用Springboot写法,分包entity、service、controller

    首先编写entity,没有区别

    ```java package com.example.demowebflux1.entity;

    import lombok.Data;

    @Data public class User { private String name; private String gender; private Integer age;

      public User(String name, String gender, Integer age) {
          this.name = name;
          this.gender = gender;
          this.age = age;
      }
    

    }

    ```

    编写服务接口和服务类,在服务实现的时候,需要注意使用Mono和Flux,这里使用一个成员变量Map来对数据进行临时记录,这个成员变量是单例的,所以多用户时需要注意。

    ```java package com.example.demowebflux1.service;

    import com.example.demowebflux1.entity.User; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;

    // Webflux中用户的操作接口 public interface UserService { Mono getUserById(int id);

      //    返回所有用户时返回的不止一个参数
      Flux<User> getAllUser();
    
      //    添加用户
      Mono<Void> saveUserInfo(Mono<User> user);
    

    }


    package com.example.demowebflux1.service.impl;

    import com.example.demowebflux1.entity.User; import com.example.demowebflux1.service.UserService; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;

    import java.util.HashMap; import java.util.Map;

    @Service public class UserServiceImpl implements UserService {

      //    创建一个Map数据存储器临时存储数据
      private final Map<Integer, User> users = new HashMap<>();
    
      //    默认构造方法中增加数据信息
      public UserServiceImpl() {
          this.users.put(1, new User("lucy", "na", 20));
          this.users.put(2, new User("lucy2", "na", 30));
      }
    
      @Override
      public Mono<User> getUserById(int id) {
          return Mono.justOrEmpty(this.users.get(id));
      }
    
      @Override
      public Flux<User> getAllUser() {
          return Flux.fromIterable(this.users.values());
      }
    
      @Override
      public Mono<Void> saveUserInfo(Mono<User> userMono) {
          return userMono.doOnNext(user -> {
    

    // 向map放值 Integer id = users.size() + 1; users.put(id, user); }).thenEmpty(Mono.empty()); } }

    ```

    编写Controller,Springboot会自动返回JSON的字符串结果

    ```java package com.example.demowebflux1.controller;

    import com.example.demowebflux1.entity.User; import com.example.demowebflux1.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;

    @RestController public class UserController { @Autowired private UserService userService;

      @RequestMapping("/get-user-id")
      public Mono<User> getUserById(Integer id) {
          return userService.getUserById(id);
      }
    
      @RequestMapping("/get-all-user")
      public Flux<User> getAllUser() {
          return userService.getAllUser();
      }
    
      @RequestMapping("/saveUser")
      public Mono<Void> saveNewUser(User user) {
          Mono<User> userMono = Mono.just(user);
          return userService.saveUserInfo(userMono);
      }
    

    }

    ```

    结果

  10. SpringWebflux基于函数式编程模型

  11. 在函数式编程模型进行操作时,需要自己初始化服务器

  12. 使用函数式编程模型时,有两个核心接口(RouterFunction路由功能,转发请求给对应的Handler)和(HandlerFunction处理请求的响应函数)核心任务就是定义两个函数式接口的实现并启动需要的服务器。

  13. SpringWebflux请求和响应不再是ServletRequest和ServletResponse,而是ServerRequest和ServerResponse

  14. 在新版本的Springboot中,对于函数式编程依旧不需要自己初始化服务器就可以实现了

    ```xml org.springframework.boot spring-boot-starter-webflux

    ```

    不需要更改Mono和Flux返回的service类,只需要删除Controller并且增加一个路由配置类。

    ```java e com.example.demowebflux1;

    import com.example.demowebflux1.Handler.UserHandler; import com.example.demowebflux1.service.UserService; import com.example.demowebflux1.service.impl.UserServiceImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse;

    import javax.xml.ws.spi.http.HttpHandler; @Configuration public class Server {

     //    1. 创建Router路由
     @Bean
     public RouterFunction<ServerResponse> routerFunction() {
         UserService userService = new UserServiceImpl();
         UserHandler userHandler = new UserHandler(userService);
    
         return RouterFunctions.route().GET("/users/{id}", userHandler::getUserById).
                 GET("/users", userHandler::getAllUser).POST("/saveUser", userHandler::saveUsers).build();
     }
    

    } ```

    修改application入口main函数

    ```java package com.example.demowebflux1;

    import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder;

    @SpringBootApplication public class Demowebflux1Application {

     public static void main(String[] args) {
         SpringApplication springApplication = new SpringApplicationBuilder(Demowebflux1Application.class).build(args);
         springApplication.setWebApplicationType(WebApplicationType.REACTIVE);
         springApplication.run();
     }
    

    }

    ```

    使用Spring自带的webClient进行测试。

    ```java package com.example.demowebflux1;

    import com.example.demowebflux1.entity.User; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient;

    public class Client { public static void main(String[] args) { WebClient webClient = WebClient.create("http://localhost:8082");

         String id = "2";
         User user = webClient.get().uri("/users/" + id).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).block();
         System.out.println(user.getName());
    
     }
    

    }

    ```

    运行结果

链接