引入
Stream就是流,流式操作就跟生活中的流水线一样。比如我有一批原材料,经过流水线 挑选–> 清洗 –> 包装。就变成了一个产品了。
Stream流就好比我需要组装一条流水线,当数据就像原料一样源源不断的走上流水线的传送带上,经过各种步骤的处理,知道走完所有的流水线,原料数据就组装完成了,最终变成一个我想要的东西。
响应式编程跟传统的命令行编程的思想有一个重大的差异:
命令行编程:拿到一堆数据之后,需要自己写for循环推进,需要自己控制循环逻辑,比如大于2了怎么办,小于3了又怎么办。
响应式编程:拿到一堆数据之后,他需要经过123456步的流水线,来转化成我需要的数据或者数据集。
StreamAPI介绍
流是什么?流里面有什么?
我们可以把他比喻为一条生产流水线,流水线的传送带上就会有一些东西,这些东西会从原料一步步的加工成为一个完整的产品。
声明式处理集合数据,包括:筛选、转换、组合等,而这些数据,就算流里面的原料。
Stream流的思想:
- 首先得有一个流,流里面可以有一堆数据,或者一个数据,甚至是空流都可以。反正得有一个流给我操作。就相当于,一条流水线,可以没有原材料,但是得有存放原材料的仓库,有仓库,流水线才能有起点,流水线才能进行下去。
- 中间操作,就是流水线的每一个步骤。比如我想挑出来原料中的瑕疵品,那么流水线的第一个步骤就是选品。每一个流水线的步骤,就叫做流的中间操作。
- 新流,在经过一系列的中间操作之后,最终会得到一个新流。比如流水线对原材料进行了选品,加工,分组,包装,就得到了一些新的产品。
- 终止操作,得到了新流之后也不能直接使用,因为流是动态的。比如原材料遍历完流水线的所有操作了,但是还放在传送带上传送着。这时候,我们需要把产品从传送带上拿下来,统计数量或者拿到最大值。这个就叫做终止操作。
- 结果,终止操作中,我们把新流的元素拿出来了或者统计数量了之类的,得到某个结果了,那么整个流操作就结束了。
总结:一个流要处理结束,有三大关键步骤:
- 创建流
- 有中间操作
- 有终止操作
例子:
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
我需要找出了list
列表里最大的偶数
方法一:用for循环
// 方法1:用for循环
int max = 0;
for(Integer integer : list){
if(integer % 2 ==0){
// 是偶数,和max进行比较交换
max = integer >= max ? integer : max;
}
}
System.out.println("最大偶数:" + max);
方法二:用StreamAPI
// 方法2:用Stream流
list.stream()
.filter(ele -> {
System.out.println("正在filter:" + ele);
return ele % 2 == 0;
})
.max(Integer::compareTo)
.ifPresent(System.out::println);
流管道组成:
解析例子二:
首先来认识几个名词:
Stream Pipeline:流水线、流管道
Intermediate Operations:中间操作
Terminal Operation:终止操作
Stream所有数据和操作被组合成流管道。
- 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
- 零个或多个中间操作(将一个流变形成另一个流)
- 一个终止操作(产生最终结果)
流是惰性的。只有在启动最终操作时才会对数据源进行计算,而且只有在需要时才会消耗元素。
什么意思呢?让我们来看例子二:
list.stream()
// intermediate operation.中间操作
.filter(ele -> {
System.out.println("正在filter:" + ele);
return ele % 2 == 0;
});
如果是像上面一样写的话,没有后面的.max(Integer::compareTo).ifPresent(System.out::println);
的话,打印语句是不会执行的。
因为还没有到终止操作,所以不会对数据源进行计算。比如一条流水线,传送带都修好了,但是最后的收货环节没做好,相当于原料在流水线上加工完成后,没地方可去,那么这条流水线是不会启动的。
在例子二中,
filter()方法
在源码的文档中有写是intermediate operation
中间操作。
而max()方法
则是terminal operation
终止操作。
max()方法
后,还可以传一个条件判断ifPresent()
,
// 方法2:用Stream流
list.stream()
// intermediate operation.中间操作
.filter(ele -> {
System.out.println("正在filter:" + ele);
return ele % 2 == 0;
})
//terminal operation.终止操作。过滤出我们想要的值,如果断言返回true就算我们要的值,如果断言返回false,就忽略掉
.max(Integer::compareTo)
// 简写:.ifPresent(ele -> System.out.println("最大偶数:" + ele));
.ifPresent(System.out::println); //如果存在这个数,就输出
Stream().filter()
是一个中间操作,用于筛选满足特定条件的元素,并生成一个新的流。它接受一个Predicate函数式接口作为参数,该接口定义了一个用于判断元素是否满足条件的方法。filter()方法会遍历流中的每个元素,对每个元素应用Predicate接口的test()方法,如果返回true,则将该元素包含在新的流中,否则不包含。
相对应的还有
collect()方法
,如上例子还可以这样写:.filter(n -> n % 2 == 0).collect(Collectors.toList());
,使用filter()方法
筛选出符合条件(即能被2整除)的元素,最后使用collect()方法将结果收集到一个新的List中。
当加上了.max()方法
后,流水线就完整了,再次启动,就能输出所有的filter
打印语句
总结:
- 流是惰性的,如,生活中流水线完整了才会启动传送带。
- 流管道组成:一个数据源、N个中间操作(N可以为0)、一个终止操作。
StreamAPI的基本使用
一、创建流:
可以直接创建流:
//1、直接创建流:
//创建了一个包含整数1、2和3的流
Stream<Integer> stream = Stream.of(1, 2, 3);
// 使用Stream.concat()方法将两个流连接起来,创建了一个包含整数4、5、6以及之前流中的整数1、2、3的新流
Stream<Integer> concat = Stream.concat(Stream.of(4, 5, 6), stream);
//使用了Stream.builder()创建了一个流构建器,然后添加了两个字符串"11"和"22",最后通过build()方法构建了一个包含这两个字符串的流
Stream<Object> build = Stream.builder().add("11").add("22").build();
也可以集合容器中获取这个流
//2、从集合容器中获取这个流,List、Set、Map
List<Integer> list1 = List.of(1, 2);
Stream<Integer> stream1 = list1.stream();
Set<Integer> integers1 = Set.of(1, 2);
integers1.stream();
Map<Object, Object> map = Map.of();
map.keySet().stream();
map.values().stream();
map有Key有Value,可以分别拿这两个的Stream流。
中间操作:
流量并发还是不并发?和for有什么区别?
要验证这个问题,可以通过代码来测试一下:
System.out.println("主线程: " + Thread.currentThread());
// 流量并发还是不并发?和for有什么区别?
long count = Stream.of(1,2,3,4,5)
.filter(i -> {
System.out.println("filter线程:" + Thread.currentThread());
System.out.println("正在filter:" + i);
return i > 2;
}) // intermediate operation. 中间操作
.count(); //terminal operation.终止操作
运行结果:
可以看到,主线程和filter线程是同一个,说明流也是用for循环挨个处理的。
但是!流有一个很帅的效果:可以并发。.parallel()
方法。
在流的后面调用一下.parallel()
方法,就会并发执行
System.out.println("主线程: " + Thread.currentThread());
// 流量并发还是不并发?和for有什么区别?
long count = Stream.of(1,2,3,4,5)
.parallel()
.filter(i -> {
System.out.println("filter线程:" + Thread.currentThread());
System.out.println("正在filter:" + i);
return i > 2;
}) // intermediate operation. 中间操作
.count(); //terminal operation.终止操作
}
运行结果:
这时候可以看到,只有一个线程的名字是跟主线程是一样的。而且也不是按顺序执行的。说明这是个并发的过程。
流默认是不并发的,但也可以并发。但只要加上了.parallel()
方法,就会变成一个并发流。
并发以后,要自行解决并发安全问题。
什么是并发安全问题呢?看一下下面的例子:
List testList = new ArrayList<>();
// 流量并发还是不并发?和for有什么区别?
long count = Stream.of(1,2,3,4,5)
.parallel()
.filter(i -> {
testList.add(i);
return i > 2;
}) // intermediate operation. 中间操作
.count(); //terminal operation.终止操作
假设线程1执行了之后,testList就会变成了[1]
,
这个时候,线程2和线程3同时执行,那么testList就会分别变成:线程2:[1,2],线程3:[1,3]
。
add了3个数,正确的testList应该是[1,2,3]
的,但是因为并发的问题,分别变成了两个不同的列表。
那么接下来线程4执行的时候,是该拿[1,2]
还是[1,3]
呢?这时候就出问题了。
流的所有操作都是无状态,就是说,流的数据状态仅在本次流有效,不溢出至函数外。
我可以在流中处理完数据后把数据交给别人,但是不能在处理的时候让别人帮我存。
在写流的时候,要把流写成一个整体,外面的容器不能对流里的任何数据有任何的引用、存储、增删改。
上面的问题可以用锁来解决:
System.out.println("主线程: " + Thread.currentThread());
List testList = new ArrayList<>();
// 流量并发还是不并发?和for有什么区别?
long count = Stream.of(1,2,3,4,5)
.parallel()
.filter(i -> {
synchronized (Object.class){
System.out.println("filter线程:" + Thread.currentThread());
System.out.println("正在filter:" + i);
testList.add(i);
System.out.println("testList:" + testList);
return i > 2;
}
}) // intermediate operation. 中间操作
.count(); //terminal operation.终止操作
执行结果:
但是用锁之后就串行了,跟之前的不用.parallel()
方法是类似的执行过程,所以也相当于没有并发。
synchronized可以控制并发。类似一把琐,所有的线程去抢这把锁,谁先拿到锁谁就先执行。
总结:
如果要用并发,就要明确流里面不会产生问题,是安全的。如果可能会出现问题,就不要用并发。
如果用了并发,且出现问题了,可以用锁解决。