SpringBoot3响应式编程入门

博主在该知识点上不是特别熟练,在之前的工作经历中没有过多的使用场景,仅限于自己学习或者准备面试时突击一下,准备利用好这段闲暇时间记录学习阶段。

Lambda表达式

语法糖

Function函数式接口

  • Supplier<T> get();

  • Consumer<T> accept(T t);

  • Predicate<T> test(T t);

  • Function<T, R> apply(T t);

  • Runnable run();

StreamAPI

流的特性:懒加载

只有存在终止操作时才会启动

创建流

中间操作(intermediate operation)

parallel

开启并发,开启后自行解决多线程安全问题,推荐操作无状态数据

public class Mylambda {
    public static void main(String[] args) {
        var list = new ArrayList<Integer>();
        list.add(1);
        list.add(5);
        list.add(8);
        list.add(11);
        list.add(23);
        list.add(37);

        list.stream()
                .parallel() //开启并发
                .filter(x -> (x & 1) == 0)
                .max(Integer::compareTo)
                .ifPresent(System.out::println);
    }
}

filter

过滤操作

map

一对一映射

flatMap

一对多映射,打散、散列后成为新流

distinct

去重

sorted

排序

peek

偷看——消费者函数式接口

limit

限制——按照排序截取流

skip

跳过——按照排序跳过流

takeWhile

只保留流中第一个不满足条件的元素之前的所有元素

dropWhile

删除第一个不满足条件的元素之前的所有元素

等等

新流

终止操作(terminal operation)

forEach

toArray

reduce

collect

toList

min

max

count

anyMatch

findFirst

iterator

等等

结果

Reactive-Stream

JVM面向流的库的标准和规范

  1. 处理可能无限数量的元素

  2. 有序

  3. 在组件之间异步传递元素

  4. 强制性非阻塞背压模式

Publisher

发布者

Subscriber

订阅者

Subscription

订阅关系

Processor

处理器

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * MyFlow
 *
 * @author <b>孤寂灬无痕</b>
 * <p>
 * 994300880@qq.com
 * @version 1.0
 * @date 2024/2/5 13:17
 */
public class MyFlow {
    //5.定义流中间处理器
    static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println(Thread.currentThread().getName() + "#####中间处理器开始订阅");
            this.subscription.request(1L);
        }

        @Override
        public void onNext(String item) {
            System.out.println(Thread.currentThread().getName() + "#####中间处理器接收到订阅消息" + item);
            item = "{[加工]-" + item + "}";
            submit(item);
            this.subscription.request(1L);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println(Thread.currentThread().getName() + "#####中间处理器订阅出错消息" + throwable.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println(Thread.currentThread().getName() + "#####中间处理器订阅完成");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //1.定义一个发布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        MyProcessor myProcessor = new MyProcessor();
        //2.定义一个订阅者
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription temp;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.temp = subscription;
                //开始订阅
                System.out.println(Thread.currentThread().getName() + "开始订阅");
                this.temp.request(1L);
            }

            @Override
            public void onNext(String item) {
                //接收到订阅消息
                System.out.println(Thread.currentThread().getName() + "接收到订阅消息" + item);
                this.temp.request(1L);
            }

            @Override
            public void onError(Throwable throwable) {
                //订阅出错消息
                System.out.println(Thread.currentThread().getName() + "订阅出错消息" + throwable.getMessage());
            }

            @Override
            public void onComplete() {
                //订阅完成
                System.out.println(Thread.currentThread().getName() + "订阅完成");
            }
        };
        //3.绑定订阅关系
        publisher.subscribe(myProcessor);
        myProcessor.subscribe(subscriber);
        //4.发布者生产消息
        for (int i = 0;i < 10; i++) {
            publisher.submit("(p-" + i + ")");
        }

        Thread.sleep(3000L);
        System.out.println("关闭发布者");
        myProcessor.close();
        publisher.close();
    }
}

响应式编程

  • 底层:数据缓冲队列+消息驱动模型+异步回调机制

  • 编码:流式编程+链式调用+声明式API

  • 效果:优雅全异步+消息实时处理+高吞吐量+占用少量资源

Reactor

Mono[0|1]

Flux[N]

响应式流:元素(内容) + 信号(完成/异常)

subscribe()

自定义流的信号感知回调

flux.subscribe(
        v-> System.out.println("v = " + v), //流元素消费
        throwable -> System.out.println("throwable = " + throwable), //感知异常结束
        ()-> System.out.println("流结束了...") //感知正常结束
);

自定义消费者

flux.subscribe(new BaseSubscriber<String>() {

            // 生命周期钩子1: 订阅关系绑定的时候触发
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                // 流被订阅的时候触发
                System.out.println("绑定了..."+subscription);

                //找发布者要数据
                request(1); //要1个数据
//                requestUnbounded(); //要无限数据
            }

            @Override
            protected void hookOnNext(String value) {
                System.out.println("数据到达,正在处理:"+value);
                request(1); //要1个数据
            }


            //  hookOnComplete、hookOnError 二选一执行
            @Override
            protected void hookOnComplete() {
                System.out.println("流正常结束...");
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.out.println("流异常..."+throwable);
            }

            @Override
            protected void hookOnCancel() {
                System.out.println("流被取消...");
            }

            @Override
            protected void hookFinally(SignalType type) {
                System.out.println("最终回调...一定会被执行");
            }
        });