Java 声明式编程- Stream API 实战

发布于:2025-09-11 ⋅ 阅读:(17) ⋅ 点赞:(0)

Java 8 引入的 Stream API 极大地提升了集合操作的表达力。借助 函数式编程声明式风格,我们能够更简洁、更直观地处理集合数据。本文从 Java Stream API 的核心概念 出发,结合代码示例逐步讲解:流的创建、流的中间操作与终端操作、并行流、多样化收集器、声明式编程风格,帮助读者掌握 Stream 的使用。


一、Stream 的三大核心组成

一个完整的流式处理一般包括三部分:

  1. 数据源:集合、数组、文件、IO 通道等;
  2. 中间操作:返回 Stream 本身,用于加工数据(filtermapflatMapdistinctlimitsorted 等);
  3. 终端操作:触发流执行,返回具体结果(forEachcollectcountmaxmin 等)。

示例:

list.stream()                        // 数据流
    .filter(i -> i % 2 == 0)         // 中间操作:筛选偶数
    .max(Integer::compareTo)         // 终端操作:取最大值
    .ifPresent(System.out::println); // 输出结果

二、流的创建方式

Stream 的数据源非常灵活,常见方式包括:

// 1. 直接创建
Stream<Integer> stream = Stream.of(1, 2, 3);

// 2. 拼接
Stream<Integer> concat = Stream.concat(Stream.of(1, 2), Stream.of(3, 4));

// 3. 使用 builder 构建
Stream<Object> build = Stream.builder().add("11").add("22").build();

// 4. 从集合中获取
List<Integer> list = List.of(1, 2);
Stream<Integer> stream1 = list.stream();

Set<Integer> set = Set.of(1, 2);
Stream<Integer> stream2 = set.stream();

Map<String, Integer> map = Map.of("a", 1, "b", 2);
map.keySet().stream();
map.values().stream();

几乎所有集合类都支持 .stream() 方法,方便快捷。


三、并行流与线程模型

默认情况下,Stream 是顺序执行的。我们也可以使用 并行流 来加速数据处理:

long count = Stream.of(1, 2, 3, 4, 5)
        .parallel()  // 开启并行流
        .filter(i -> {
            System.out.println("当前线程:" + Thread.currentThread());
            return i > 2;
        })
        .count();
System.out.println(count);

并行流原理:底层基于 ForkJoinPool.commonPool,把数据拆分后并行处理,再合并结果。
⚠️ 注意:并行流需要关注 线程安全任务拆分开销,适用于 CPU 密集型场景,不适合所有情况。


四、常见中间操作示例

StreamDemo 中,我们演示了多种流操作(完整代码附在文末):

1. filter:条件过滤

List<Integer> collect = List.of(1, 2, 3, 4, 5, 6)
        .stream()
        .filter(i -> i > 2)
        .collect(Collectors.toList());
System.out.println(collect); // [3, 4, 5, 6]

2. takeWhile:满足条件则保留,否则终止流

List<Integer> collect1 = List.of(1, 2, 3, 4, 5, 6)
        .stream()
        .takeWhile(i -> i > 2)
        .collect(Collectors.toList());
System.out.println(collect1); // []

因为首元素 1 不满足条件,流立刻终止。

3. mapflatMap

personList.stream()
        .map(Person::name)                     // 映射为姓名
        .flatMap(name -> Stream.of(name.split(" "))) // 拆分字符串
        .distinct()                            // 去重
        .forEach(System.out::println);

4. peek:调试/监视

personList.stream()
        .filter(p -> p.age() > 18)
        .peek(p -> System.out.println("filter peek:" + p))
        .map(Person::name)
        .peek(name -> System.out.println("map peek:" + name))
        .forEach(System.out::println);

五、分组与收集

Stream 的强大之处在于内置了丰富的收集器,最常见的就是 Collectors.groupingBy

Map<String, List<Person>> listMap = personList.stream()
        .filter(person -> person.age() > 15)
        .collect(Collectors.groupingBy(Person::gender));

System.out.println(listMap);

输出结果:

{女=[Person[name=李 四, gender=女, age=19], Person[name=王 六, gender=女, age=21]], 男=[Person[name=张三, gender=男, age=18], Person[name=王 五, gender=男, age=20], Person[name=孙 七, gender=男, age=16]]}

六、Stream 与传统迭代的区别

  • 传统 for 循环:命令式,开发者需要逐个控制迭代过程。
  • Stream API:声明式,开发者只需声明“做什么”,至于“怎么做”交给框架。

模型差异:推 vs 拉

  • 迭代器(拉模型):调用者主动拉取数据;
  • Stream(推模型):上游有数据时自动推给下游。

这让 Stream 更适合函数式编程、响应式编程的场景。


七、完整示例代码

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamDemo {
    public static void main(String[] args) {
        // 1. 挑出最大偶数
        List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // StreamAPI:
        // 1). 把数据封装成流;要到数据流;集合类.stream
        // 2). 定义流式操作
        // 3). 获取最终结果
        list.stream()
                .filter(i -> i % 2 == 0)
                .max(Integer::compareTo)
                .ifPresent(System.out::println);

        // 流的三大部分:
        // 1. 数据流 2. N个中间操作 3. 一个最终操作
 
        // 1)、创建流
        Stream<Integer> stream = Stream.of(1, 2, 3);
        Stream<Integer> concat = Stream.concat(stream, Stream.of(4, 5, 6));
        Stream<Object> build = Stream.builder().add("11").add("22").build();

        // 2)、从集合容器中获取这个流,List Set Map
        List<Integer> integers = List.of(1, 2);
        Stream<Integer> stream1 = integers.stream();

        Set<Integer> integers1 = Set.of(1, 2);
        Stream<Integer> stream2 = integers1.stream();

        Map<Object, Object> objectMap = Map.of();
        objectMap.keySet().stream();
        objectMap.values().stream();

        System.out.println("--------------------------------------------------");

        System.out.println("主线程:" + Thread.currentThread());

        // 流是不是并发?和for有什么区别?
        // 流也是逐个处理,默认不并发,也可以并发
        // 并发后,要自行解决多线程安全问题
        // 推荐:流的所有操作都是无状态,数据状态仅在此函数内有效
        long count = Stream.of(1, 2, 3, 4, 5)
                .parallel()  // intermediate operation 并发流
                .filter(i -> {
                    System.out.println("当前线程:" + Thread.currentThread());
                    System.out.println("正在filter:" + i);
                    return i > 2;
                })
                .count();

        System.out.println(count);

        System.out.println("--------------------------------------------------");

        List<Person> personList = List.of(new Person("张三", "男", 18),
                new Person("李 四", "女", 19),
                new Person("王 五", "男", 20),
                new Person("王 六", "女", 21),
                new Person("孙 七", "男", 16));
        // 声明式编程:基于事件机制的回调
        // 定义一系列的回调函数,回调函数并非程序员自己调用,而是发生这个事件的时候由系统进行调用

        // 1. 挑出年龄大于18的人
        // 拿到集合其实就是拿到集合的深拷贝的值,流的所有操作都是流的元素引用,不会改变集合的值
        personList.stream()
                .limit(3)  // 流只会从 personList 里取前 3 个元素参与后续处理。
                .filter(person -> person.age() > 18)
                .peek(person -> System.out.println("filter peek:" + person))
                .map(person -> person.name())
                .peek(name -> System.out.println("map peek:" + name))
                .flatMap(name -> Stream.of(name.split(" ")))
                .distinct()
//                .limit(3)
//                .sorted((o1, o2) -> o1.compareTo(o2))
                .forEach(System.out::println);

        System.out.println("--------------------------------------------------");

        List<Integer> collect = List.of(1, 2, 3, 4, 5, 6)
                .stream()
                .filter(i -> i > 2)  // 无条件遍历流中的每一个元素
                .collect(Collectors.toList());
        System.out.println(collect);

        List<Integer> collect1 = List.of(1, 2, 3, 4, 5, 6)
                .stream()
                .takeWhile(i -> i > 2)  // 当满足条件,拿到这个元素,不满足直接结束流操作
                .collect(Collectors.toList());
        System.out.println(collect1);

        // 数据是自流动的,而不是靠迭代被动流动
        // 推拉模型:
        // 推:流模式,上游有数据,自动推给下游
        // 拉:迭代器,自己遍历,自己拉取
        Map<String, List<Person>> listMap = personList.stream()
                .filter(person -> person.age() > 15)
                .collect(Collectors.groupingBy(Person::gender));
        System.out.println(listMap);

        // 让少量线程一直忙,而不是让大量线程一直切换/等待
    }

    public record Person(String name, String gender, Integer age) {
    }
}

八、总结

  1. Stream API 提供了声明式的数据处理方式,避免繁琐的 for 循环;
  2. 流分为 数据源 → 中间操作 → 终端操作 三部分;
  3. 支持 并行流,利用多核 CPU 提升性能;
  4. Collectors 提供丰富的收集器,能方便地做分组、统计、聚合;
  5. 与传统迭代相比,Stream 更贴近 函数式编程响应式编程 思路。

借助 Stream,我们可以让 Java 代码更简洁、更优雅、更现代化。


网站公告

今日签到

点亮在社区的每一天
去签到