当前位置: 首页 > news >正文

[Java/并发编程] 深度解析:Java 并行流(parallelStream) [JDK8-]

  • 项目中利用了Java 8 的并行流(parallelStream)来优化程序处理性能:
    public static LinkedList<CycleCanSequenceDto> batchParseCloudMessageToCycleSequences(List<byte []> cloudMessageBytesList, CanHeaderConfigDto cloudMessageHeaderConfig) {List<LinkedList<CycleCanSequenceDto>> cycleCanSequenceDtoListList = cloudMessageBytesList.parallelStream().map(cloudMessageBytes -> {//并行处理LinkedList<CycleCanSequenceDto> canSequenceDtos = null;try {canSequenceDtos = parseCloudMessageToCycleSequences(cloudMessageBytes, cloudMessageHeaderConfig);} catch (IOException e) {String errorMessage = "Parse cloud message to cycle sequences fail!cloudMessageBytesHex:" + BytesUtils.bytesToHexString(cloudMessageBytes);log.error( errorMessage );throw new RuntimeException(errorMessage);}return canSequenceDtos;} ).collect(Collectors.toCollection(LinkedList::new));LinkedList<CycleCanSequenceDto> cycleCanSequenceDtoList = cycleCanSequenceDtoListList.parallelStream().flatMap(cycleCanSequenceDtoListElement -> {//并行处理return cycleCanSequenceDtoListElement.stream();}).collect(Collectors.toCollection(LinkedList::new));return cycleCanSequenceDtoList;}
@Setter
@Getter
public class CycleCanSequenceDto extends CycleMessageSequenceDto {/*** 获取 MessagePayloadDto 的总个数* @param cycleCanSequences* @return*/public static Long getMessagePayloadSize(List<CycleCanSequenceDto> cycleCanSequences){AtomicLong messagePayloadSize = new AtomicLong(0);if(cycleCanSequences==null) {return -1L;}cycleCanSequences.parallelStream().forEach(cycleCanSequenceDto -> {Integer currentCycleCanSequenceDtoMessagePayloadSize = cycleCanSequenceDto.getContent().size();messagePayloadSize.addAndGet( currentCycleCanSequenceDtoMessagePayloadSize );});return messagePayloadSize.get();}
}

概述:Java 并行流(parallelStream)[JDK8 - ]

  • 并行流(parallelStream)是Java 8引入的强大特性,它能够自动将流操作【并行化】,以利用多核处理器的优势

java.util.Collection#parallelStream()
Java 8引入了流的概念去对数据进行复杂的操作,而且使用并行流Parallel Steams)支持并发,大大加快了运行效率。

  • 与【并行流】对应的是【顺序流】
//顺序流
list.stream().filter(i -> i > 10).collect( Collectors.toList() );//并行流
list.parallelStream().filter(i -> i > 10).collect( Collectors.toList() );

下面我们将全面探讨parallelStream的使用方法、原理和最佳实践。

并行流基础

创建并行流

// 从集合创建并行流
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> parallelStream = list.parallelStream();// 将顺序流转为并行流
Stream<String> parallelStream2 = Stream.of("a", "b", "c").parallel();

基本使用示例

List<Integer> numbers = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());// 并行计算平方和
long sum = numbers.parallelStream().mapToLong(i -> i * i).sum();

并行流工作原理

底层机制

  • 并行流使用Fork/Join框架实现:
  • 将任务分割为多个子任务(fork)
  • 并行执行这些子任务
  • 合并结果(join)

算法思想: 分治

  • 案例讲解: 以代码list.parallelStream().filter(...).collect(...)为例
  • Stage链构建:通过Head节点(Stage0)和中间操作(如filter、sorted)形成双向链表,每个阶段(Stage)封装操作逻辑。
  • 任务拆分:Spliterator将数据分割为多个子任务,分发到ForkJoinPool的线程队列。
  • 并行执行:各线程独立处理子任务,通过opWrapSink方法将操作链应用到数据流。
  • 结果合并:终端操作(如collect)调用combiner合并子任务结果。
// 示例:ArrayList的Spliterator实现
public Spliterator<E> spliterator() {return new ArrayListSpliterator<>(this, 0, -1, 0); // 初始范围[0, size)
}

底层框架:Fork/Join 框架

  • 并行流基于Java 7引入的Fork/Join框架实现,其核心是ForkJoinPool线程池,采用工作窃取算法Work-Stealing优化任务分配

每个线程维护一个双端队列,优先处理自己的任务,空闲时窃取其他线程队列尾部的任务最大化CPU利用率

  • 关键类分析:
  • ForkJoinTask:任务基类,子类包括RecursiveTask(有返回值)和RecursiveAction(无返回值)。
  • Spliterator:数据拆分器,负责将数据源分割为可并行处理的子块。

例如: ArrayListSpliterator支持高效随机访问分割。

源码级关键机制解析

1) 数据拆分与合并

  • Spliterator特性:通过characteristics()方法返回特性值(如ORDERED、SIZED),影响拆分策略

例如: ArrayList支持高效平均分割,而LinkedList拆分成本高。

  • 任务链构造:中间操作(如filter、map)通过StatelessOpStatefulOp节点构建操作链StatefulOp(如sorted)需缓存中间数据。

2) 并行流线程模型

  • 默认线程池:使用ForkJoinPool.commonPool() (JVM内共享的公共线程池, 被【整个应用程序】所使用)
  • 默认的线程数为: Runtime.getRuntime().availableProcessors() - 1 即: CPU核心数-1。

-1是因为还有 JVM 的主线程需要占用1个线程

  • 可自定义系统属性: java.util.concurrent.ForkJoinPool.common.parallelism

最佳实践: 由于主线程也会参与任务抢占CPU,所以 ForkJoinPool.commonPool 的线程数尽量设置为 (CPU核心数*N - 1)

// 设置全局并行度
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
  • 自定义线程池:可通过自定义ForkJoinPool提交任务,但需注意避免资源竞争。

支持通过 ForkJoinPool 定义私有线程池:

ForkJoinPool forkJoinPool = new ForkJoinPool(8);
List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream().map( e -> {return e + 1;
}).collect(Collectors.toList())).get();

适用场景

适合使用并行流的场景

  • 数据量大:通常超过10,000个元素
  • 计算密集型操作(CPU):如复杂的数学运算
  • 无状态操作:如map、filter、flatMap等
  • 独立操作:元素处理不依赖其他元素

不适合的场景

  • 顺序依赖操作:如limit、findFirst等
  • 有状态操作:如sorted、distinct
  • I/O密集型操作:可能导致线程阻塞 (补充意见:但也不绝对不适合,有些情况下顺序执行,反而更慢)
  • 小数据集:并行开销可能超过收益

性能优化技巧

正确测量性能

long start = System.nanoTime();
result = list.parallelStream().[...].collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("耗时: " + duration + " ms");

选择合适的并行度

// 自定义线程池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {list.parallelStream().[...].collect(Collectors.toList());
}).get();

避免共享可变状态

// 错误示例 - 存在竞态条件
List<String> result = new ArrayList<>();
list.parallelStream().forEach(s -> result.add(s.toUpperCase()));  // 可能抛出异常// 正确做法
List<String> safeResult = list.parallelStream().map(String::toUpperCase).collect(Collectors.toList());

高级应用

自定义Spliterator

class CustomSpliterator<T> implements Spliterator<T> {// 实现方法...
}Spliterator<String> spliterator = new CustomSpliterator<>(data);
Stream<String> parallelStream = StreamSupport.stream(spliterator, true);

并行收集器

// 使用线程安全的收集器
Map<String, List<Student>> studentsByClass = students.parallelStream().collect(Collectors.groupingByConcurrent(Student::getClassName));

FAQ: 并行流的常见陷阱与解决方案

Q:并行流与顺序流的性能对比?

  • 测试示例
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000).boxed().collect(Collectors.toList());// 顺序流
long seqTime = measureTime(() -> numbers.stream().reduce(0, Integer::sum));// 并行流
long parTime = measureTime(() -> numbers.parallelStream().reduce(0, Integer::sum));System.out.println("顺序流: " + seqTime + "ms");
System.out.println("并行流: " + parTime + "ms");
  • 对比结果
操作 数据量 顺序流耗时 并行流耗时
求和 100万 15ms 8ms
过滤 1000万 120ms 45ms
排序 100万 650ms 750ms

Q:线程安全问题

  • 问题
int[] counter = new int[1];
list.parallelStream().forEach(e -> counter[0]++);  // 竞态条件
  • 解决
// 使用原子类
AtomicInteger counter = new AtomicInteger();
list.parallelStream().forEach(e -> counter.incrementAndGet());// 或使用归约操作
int sum = list.parallelStream().mapToInt(e -> 1).sum();

Q:顺序敏感操作

  • 问题
// 并行流中findFirst可能不如预期
Optional<Integer> first = list.parallelStream().filter(i -> i > 10).findFirst();
  • 解决
// 如需顺序保证,使用【顺序流】,而非并行流
Optional<Integer> first = list.stream().filter(i -> i > 10).findFirst();

Q:性能层面的考量:是否需要单独构建线程池?

  • ✅ 建议单独构建线程池的场景
    | 场景 | 原因 |
    | ------------- | ------------------------------------------------------------------ |
    | I/O 密集型任务 | 默认线程数较少(CPU-1),不适合阻塞操作(如 DB、HTTP),容易拖慢整个 commonPool(),影响其他并行任务 。 |
    | 任务隔离需求 | 避免与其他模块共享线程池,防止任务间资源竞争、死锁或阻塞 。 |
    | 需要精确控制并发度 | 自定义线程池可设置合适的线程数,避免过度切换或资源浪费 。 |

  • ❌ 可不单独构建线程池的场景

场景 原因
CPU 密集型任务 默认 commonPool() 的线程数已接近 CPU 核心数,适合计算密集型任务 。
简单一次性任务 代码简洁、无需复杂控制,使用默认线程池即可 。

Q:最佳实践经验

  • 先测试后优化:不要假设并行一定更快,实际测量性能

  • 避免副作用:确保lambda表达式没有副作用

  • 考虑顺序性:需要顺序保证时使用顺序流

  • 合理设置并行度:根据CPU核心数和任务特性调整

  • 注意数据结构:ArrayList比LinkedList更适合并行处理

  • 避免自动装箱:使用原始类型流(IntStream等)提升性能

  • 是否需要单独创建线程池来执行并行流?

  • CPU 密集型任务:可直接使用 parallelStream(),无需额外线程池。
  • I/O 密集型或关键业务:建议如下方式使用自定义 ForkJoinPool:

若任务为 I/O 密集型或对隔离性、并发度有要求,有必要单独构建线程池以提升性能与稳定性 。

ForkJoinPool customPool = new ForkJoinPool(20); // 自定义线程数
customPool.submit(() -> list.parallelStream().forEach(item -> doSomething(item))
).get();
customPool.shutdown();

并行流是强大的工具,但需要谨慎使用。正确使用时可以显著提升性能,错误使用则可能导致潜在问题。理解其工作原理和适用场景是有效使用并行流的关键。

X 参考文献

  • Java并行流(parallelStream)深度解析 - CSDN
  • Java8并行流parallelStream原理深度解析 - CSDN
  • 如何自定义ForkJoinPool提升并行流 ParallelStream执行速度 - 亿速云/大数据
  • java8中修改parallelStream默认并发数 - CSDN
http://www.aitangshan.cn/news/737.html

相关文章:

  • 实用指南:vue3对比vue2的性能优化和提升 :Vue 3 vs Vue 2
  • 最大流模板大全
  • cut命令
  • 重组蛋白表达系统|原核大肠杆菌|酵母|昆虫杆状病毒|哺乳动物表达系统
  • sort命令
  • Rocky10 编译安装 Asp.net Core_9 Nginx_1.28.0 Mariadb_11.8.3 Redis_8.2.0 (实测 笔记)
  • 8.13
  • STM32 Study Note
  • seq命令
  • UWA发布 | Unity手游性能年度蓝皮书
  • WPF优秀项目推荐:Stylet 一个非常轻量但强大的 ViewModel-First MVVM 框架
  • GNOME安装扩展配置工具及常用扩展
  • AtCoder Beginner Contest 410 (A - F)
  • 反向代理,重定向,forward
  • 内网DNS-dnsmasq服务详解
  • 【自学嵌入式:stm32单片机】TIM定时中断
  • 手艺融合赋能文旅元宇宙:虚实共生重构产业新生态
  • C语言数据结构《顺序表》教案
  • 数据库获得当前日期和时间
  • 【大二病也要学离散!】第三章 函数
  • QOJ5459 Goose, goose, DUCK? 题解 [ 蓝 ] [ 扫描线 ] [ 线段树 ]
  • 【日记】谈判失败(2273 字)
  • LSB隐写原理解析
  • 利用Active Directory进行攻击防御 - 实战技术与工具解析
  • 数据结构《课程导入 绪论》教案
  • Windows11正式版如何修改开机音乐的问题
  • 深度技术win10专业版电脑出现假死的问题
  • Spring boot SseEmitter 推送数据客户端乱码
  • Apache SeaTunnel 新定位!迈向多模态数据集成的统一工具
  • [完结22章]LLM应用全流程开发 全新技术+多案例实战+私有化部署