StreamAPI介绍以及基本使用

引入

Stream就是流,流式操作就跟生活中的流水线一样。比如我有一批原材料,经过流水线 挑选–> 清洗 –> 包装。就变成了一个产品了。

Stream流就好比我需要组装一条流水线,当数据就像原料一样源源不断的走上流水线的传送带上,经过各种步骤的处理,知道走完所有的流水线,原料数据就组装完成了,最终变成一个我想要的东西。

响应式编程跟传统的命令行编程的思想有一个重大的差异:

命令行编程:拿到一堆数据之后,需要自己写for循环推进,需要自己控制循环逻辑,比如大于2了怎么办,小于3了又怎么办。
响应式编程:拿到一堆数据之后,他需要经过123456步的流水线,来转化成我需要的数据或者数据集。


StreamAPI介绍

image-20240226100008783

流是什么?流里面有什么?
我们可以把他比喻为一条生产流水线,流水线的传送带上就会有一些东西,这些东西会从原料一步步的加工成为一个完整的产品。
声明式处理集合数据,包括:筛选、转换、组合等,而这些数据,就算流里面的原料。

Stream流的思想:

  1. 首先得有一个流,流里面可以有一堆数据,或者一个数据,甚至是空流都可以。反正得有一个流给我操作。就相当于,一条流水线,可以没有原材料,但是得有存放原材料的仓库,有仓库,流水线才能有起点,流水线才能进行下去。
  2. 中间操作,就是流水线的每一个步骤。比如我想挑出来原料中的瑕疵品,那么流水线的第一个步骤就是选品。每一个流水线的步骤,就叫做流的中间操作。
  3. 新流,在经过一系列的中间操作之后,最终会得到一个新流。比如流水线对原材料进行了选品,加工,分组,包装,就得到了一些新的产品。
  4. 终止操作,得到了新流之后也不能直接使用,因为流是动态的。比如原材料遍历完流水线的所有操作了,但是还放在传送带上传送着。这时候,我们需要把产品从传送带上拿下来,统计数量或者拿到最大值。这个就叫做终止操作
  5. 结果,终止操作中,我们把新流的元素拿出来了或者统计数量了之类的,得到某个结果了,那么整个流操作就结束了。

总结:一个流要处理结束,有三大关键步骤:

  1. 创建流
  2. 有中间操作
  3. 有终止操作

例子:

List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
我需要找出了list列表里最大的偶数

方法一:用for循环

        // 方法1:用for循环
        int max = 0;
        for(Integer integer : list){
            if(integer % 2 ==0){
                // 是偶数,和max进行比较交换
                max = integer >= max ? integer : max;
            }
        }
        System.out.println("最大偶数:" + max);

方法二:用StreamAPI

        // 方法2:用Stream流
        list.stream()
                .filter(ele -> {
                    System.out.println("正在filter:" + ele);
                    return ele % 2 == 0;
                })
                .max(Integer::compareTo)
                .ifPresent(System.out::println);

流管道组成:

解析例子二:

首先来认识几个名词:

Stream Pipeline:流水线、流管道
Intermediate Operations:中间操作
Terminal Operation:终止操作

Stream所有数据和操作被组合成流管道。

  1. 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
  2. 零个或多个中间操作(将一个流变形成另一个流)
  3. 一个终止操作(产生最终结果)

流是惰性的。只有在启动最终操作时才会对数据源进行计算,而且只有在需要时才会消耗元素。
什么意思呢?让我们来看例子二:

        list.stream()
                // intermediate operation.中间操作
                .filter(ele -> {
                    System.out.println("正在filter:" + ele);
                    return ele % 2 == 0;
                });

如果是像上面一样写的话,没有后面的.max(Integer::compareTo).ifPresent(System.out::println);的话,打印语句是不会执行的。

image-20240226172240272

因为还没有到终止操作,所以不会对数据源进行计算。比如一条流水线,传送带都修好了,但是最后的收货环节没做好,相当于原料在流水线上加工完成后,没地方可去,那么这条流水线是不会启动的。

在例子二中,

filter()方法在源码的文档中有写是intermediate operation中间操作。

image-20240226173122888

max()方法则是terminal operation终止操作。

image-20240226173247642

max()方法后,还可以传一个条件判断ifPresent()

image-20240226174559387

        // 方法2:用Stream流
        list.stream()
                // intermediate operation.中间操作
                .filter(ele -> {
                    System.out.println("正在filter:" + ele);
                    return ele % 2 == 0;
                })

                //terminal operation.终止操作。过滤出我们想要的值,如果断言返回true就算我们要的值,如果断言返回false,就忽略掉
                .max(Integer::compareTo)

                // 简写:.ifPresent(ele -> System.out.println("最大偶数:" + ele));
                .ifPresent(System.out::println); //如果存在这个数,就输出

Stream().filter()是一个中间操作,用于筛选满足特定条件的元素,并生成一个新的流。

它接受一个Predicate函数式接口作为参数,该接口定义了一个用于判断元素是否满足条件的方法。filter()方法会遍历流中的每个元素,对每个元素应用Predicate接口的test()方法,如果返回true,则将该元素包含在新的流中,否则不包含。

相对应的还有collect()方法,如上例子还可以这样写:.filter(n -> n % 2 == 0).collect(Collectors.toList());,使用filter()方法筛选出符合条件(即能被2整除)的元素,最后使用collect()方法将结果收集到一个新的List中。

当加上了.max()方法后,流水线就完整了,再次启动,就能输出所有的filter打印语句

image-20240226175434410

总结:

  1. 流是惰性的,如,生活中流水线完整了才会启动传送带。
  2. 流管道组成:一个数据源、N个中间操作(N可以为0)、一个终止操作。

StreamAPI的基本使用

一、创建流:

image-20240227173051049

可以直接创建流:

        //1、直接创建流:
        //创建了一个包含整数1、2和3的流
        Stream<Integer> stream = Stream.of(1, 2, 3);

        // 使用Stream.concat()方法将两个流连接起来,创建了一个包含整数4、5、6以及之前流中的整数1、2、3的新流
        Stream<Integer> concat = Stream.concat(Stream.of(4, 5, 6), stream);

        //使用了Stream.builder()创建了一个流构建器,然后添加了两个字符串"11"和"22",最后通过build()方法构建了一个包含这两个字符串的流
        Stream<Object> build = Stream.builder().add("11").add("22").build();

也可以集合容器中获取这个流

        //2、从集合容器中获取这个流,List、Set、Map
        List<Integer> list1 = List.of(1, 2);
        Stream<Integer> stream1 = list1.stream();

        Set<Integer> integers1 = Set.of(1, 2);
        integers1.stream();

        Map<Object, Object> map = Map.of();
        map.keySet().stream();
        map.values().stream();

map有Key有Value,可以分别拿这两个的Stream流。


中间操作:

image-20240227175647602

流量并发还是不并发?和for有什么区别?

要验证这个问题,可以通过代码来测试一下:

System.out.println("主线程: " + Thread.currentThread());
        // 流量并发还是不并发?和for有什么区别?
        long count = Stream.of(1,2,3,4,5)
                .filter(i -> {
                    System.out.println("filter线程:" + Thread.currentThread());
                    System.out.println("正在filter:" + i);
                    return i > 2;
                }) // intermediate operation. 中间操作
                .count();   //terminal operation.终止操作

运行结果:

image-20240228113939703

可以看到,主线程和filter线程是同一个,说明流也是用for循环挨个处理的

但是!流有一个很帅的效果:可以并发.parallel()方法。
在流的后面调用一下.parallel()方法,就会并发执行

        System.out.println("主线程: " + Thread.currentThread());
        // 流量并发还是不并发?和for有什么区别?
        long count = Stream.of(1,2,3,4,5)
                .parallel()
                .filter(i -> {
                    System.out.println("filter线程:" + Thread.currentThread());
                    System.out.println("正在filter:" + i);
                    return i > 2;
                }) // intermediate operation. 中间操作
                .count();   //terminal operation.终止操作
    }

运行结果:

image-20240228135254429

这时候可以看到,只有一个线程的名字是跟主线程是一样的。而且也不是按顺序执行的。说明这是个并发的过程。

默认是不并发的,但也可以并发。但只要加上了.parallel()方法,就会变成一个并发流。
并发以后,要自行解决并发安全问题。

什么是并发安全问题呢?看一下下面的例子:

        List testList = new ArrayList<>();
        // 流量并发还是不并发?和for有什么区别?
        long count = Stream.of(1,2,3,4,5)
                .parallel()
                .filter(i -> {
                    testList.add(i);
                    return i > 2;
                }) // intermediate operation. 中间操作
                .count();   //terminal operation.终止操作

假设线程1执行了之后,testList就会变成了[1]
这个时候,线程2和线程3同时执行,那么testList就会分别变成:线程2:[1,2],线程3:[1,3]
add了3个数,正确的testList应该是[1,2,3]的,但是因为并发的问题,分别变成了两个不同的列表。
那么接下来线程4执行的时候,是该拿[1,2]还是[1,3]呢?这时候就出问题了。

流的所有操作都是无状态,就是说,流的数据状态仅在本次流有效,不溢出至函数外

我可以在流中处理完数据后把数据交给别人,但是不能在处理的时候让别人帮我存。

在写流的时候,要把写成一个整体外面的容器不能对流里的任何数据有任何的引用、存储、增删改

上面的问题可以用锁来解决:

        System.out.println("主线程: " + Thread.currentThread());

        List testList = new ArrayList<>();
        // 流量并发还是不并发?和for有什么区别?
        long count = Stream.of(1,2,3,4,5)
                .parallel()
                .filter(i -> {
                    synchronized (Object.class){
                        System.out.println("filter线程:" + Thread.currentThread());
                        System.out.println("正在filter:" + i);
                        testList.add(i);
                        System.out.println("testList:" + testList);
                        return i > 2;
                    }
                }) // intermediate operation. 中间操作
                .count();   //terminal operation.终止操作

执行结果:

image-20240228144318704

但是用锁之后就串行了,跟之前的不用.parallel()方法是类似的执行过程,所以也相当于没有并发。

synchronized可以控制并发。类似一把琐,所有的线程去抢这把锁,谁先拿到锁谁就先执行。

总结:

如果要用并发,就要明确流里面不会产生问题,是安全的。如果可能会出现问题,就不要用并发。
如果用了并发,且出现问题了,可以用锁解决

创作不易!转载请注明作者及文章链接或作者博客链接——
- 作者:pidanxia
- 链接:https://pidanxia.ink
(链接可为:**文章链接**或者**作者博客链接**)
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇