Stream API 讲解

发布于:2025-08-31 ⋅ 阅读:(24) ⋅ 点赞:(0)

1. Stream 的基本概念

什么是 Stream?

Stream 不是数据结构,而是对数据源(集合、数组等)的元素序列进行函数式操作的管道。

主要特点:

  • 不存储数据:只是数据的视图

  • 不修改源数据:操作产生新流,不影响原集合

  • 惰性操作:中间操作不会立即执行

  • 只能消费一次:流被终端操作消费后就不能再使用

2. Stream 操作的三种类型

2.1 创建流(Stream Creation)

// 从集合创建
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> stream1 = list.stream();
Stream<String> parallelStream = list.parallelStream();

// 从数组创建
String[] array = {"a", "b", "c"};
Stream<String> stream2 = Arrays.stream(array);

// 使用 Stream.of
Stream<String> stream3 = Stream.of("a", "b", "c");

// 使用 Stream.generate(无限流)
Stream<Double> randomStream = Stream.generate(Math::random).limit(5);

// 使用 Stream.iterate(无限流)
Stream<Integer> iterateStream = Stream.iterate(0, n -> n + 2).limit(10);

2.2 中间操作(Intermediate Operations)

返回新流,可以链式调用,都是惰性的。

filter - 过滤
List<String> filtered = list.stream()
    .filter(s -> s.startsWith("a"))
    .collect(Collectors.toList());
map - 转换
List<Integer> lengths = list.stream()           // 1. 创建Stream
    .map(String::length)                       // 2. 映射操作
    .collect(Collectors.toList());             // 3. 收集结果

// flatMap - 扁平化
List<List<String>> nestedList = Arrays.asList(
    Arrays.asList("a", "b"),
    Arrays.asList("c", "d")
);
List<String> flatList = nestedList.stream()
    .flatMap(Collection::stream)
    .collect(Collectors.toList());

方法引用语法

User::getName 是以下方法引用的形式:

  • 类名::实例方法名

  • 表示对某个类的实例调用指定的方法

其他常见用法

// 字符串转大写
.map(String::toUpperCase)

// 获取字符串长度
.map(String::length)

// 调用静态方法
.map(Integer::valueOf)

// 构造函数引用
.map(User::new)
distinct - 去重
List<String> distinct = Arrays.asList("a", "b", "a", "c").stream()
    .distinct()
    .collect(Collectors.toList());
sorted - 排序
List<String> sorted = list.stream()
    .sorted()
    .collect(Collectors.toList());

// 自定义排序
List<String> customSorted = list.stream()
    .sorted((s1, s2) -> s2.compareTo(s1))
    .collect(Collectors.toList());
limit 和 skip
List<String> limited = list.stream()           // 1. 创建Stream
    .limit(2)                                // 2. 限制数量
    .collect(Collectors.toList());           // 3. 收集结果

List<String> skipped = list.stream()           // 1. 创建Stream
    .skip(1)                                 // 2. 跳过元素
    .collect(Collectors.toList());           // 3. 收集结果
peek - 查看元素(调试用)
List<String> result = list.stream()
    .peek(System.out::println)
    .map(String::toUpperCase)
    .peek(System.out::println)
    .collect(Collectors.toList());

2.3 终端操作(Terminal Operations)

触发流的处理,返回非流结果。

forEach - 遍历
list.stream().forEach(System.out::println);
collect - 收集结果
// 转换为List
List<String> listResult = stream.collect(Collectors.toList());

// 转换为Set
Set<String> setResult = stream.collect(Collectors.toSet());

// 转换为Map
Map<String, Integer> mapResult = list.stream()
    .collect(Collectors.toMap(
        s -> s,           // key mapper
        String::length    // value mapper
    ));

// 分组
Map<Integer, List<String>> grouped = list.stream()
    .collect(Collectors.groupingBy(String::length));

// 分区
Map<Boolean, List<String>> partitioned = list.stream()
    .collect(Collectors.partitioningBy(s -> s.length() > 3));
reduce - 归约
// 求和
Optional<Integer> sum = Arrays.asList(1, 2, 3).stream()
    .reduce(Integer::sum);

// 带初始值的reduce
Integer sumWithIdentity = Arrays.asList(1, 2, 3).stream()
    .reduce(0, Integer::sum);

// 复杂的reduce
String concatenated = Arrays.asList("a", "b", "c").stream()
    .reduce("", (partial, element) -> partial + element);
匹配操作
boolean anyMatch = list.stream().anyMatch(s -> s.startsWith("a"));
boolean allMatch = list.stream().allMatch(s -> s.length() > 2);
boolean noneMatch = list.stream().noneMatch(s -> s.isEmpty());
查找操作
Optional<String> first = list.stream().findFirst();
Optional<String> any = list.stream().findAny();
统计操作
 List<Integer> numbers = Arrays.asList(10, 20, 30, 40, 50);
        
 // 基本统计
long count = numbers.stream().count();
Optional<Integer> max = numbers.stream().max(Integer::compare);
Optional<Integer> min = numbers.stream().min(Integer::compare);
        
System.out.println("Count: " + count);       // 输出: Count: 5
System.out.println("Max: " + max.orElse(0)); // 输出: Max: 50
System.out.println("Min: " + min.orElse(0)); // 输出: Min: 10
        
// 数值流统计
IntSummaryStatistics stats = numbers.stream()
    .mapToInt(Integer::intValue)
    .summaryStatistics();
        
System.out.println("=== 综合统计 ===");
System.out.println("Max: " + stats.getMax());        // 50
System.out.println("Min: " + stats.getMin());        // 10
System.out.println("Average: " + stats.getAverage());// 30.0
System.out.println("Sum: " + stats.getSum());        // 150
System.out.println("Count: " + stats.getCount());    // 5

3. 数值流特化

处理基本类型时更高效:

// IntStream
IntStream intStream = IntStream.range(1, 10); // 1-9
IntStream closedStream = IntStream.rangeClosed(1, 10); // 1-10

// LongStream
LongStream longStream = LongStream.range(1, 100);

// DoubleStream
DoubleStream doubleStream = DoubleStream.of(1.0, 2.0, 3.0);

// 对象流转换为数值流
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
IntStream intStreamFromObjects = numbers.stream().mapToInt(Integer::intValue);

4. 并行流

并行流是 Java 8 中引入的一种能够自动将数据处理任务分配到多个线程并行执行的流。它基于 Fork/Join 框架,能够充分利用多核处理器的优势。

核心概念

1. 并行 vs 并发
  • 并发:多个任务交替执行(看起来同时)

  • 并行:多个任务真正同时执行(多核处理器)

2. 工作窃取(Work Stealing)

并行流使用 Fork/Join 框架,采用工作窃取算法:

  • 每个线程有自己的任务队列

  • 空闲线程可以从其他线程的队列末尾"偷取"任务

  • 提高CPU利用率,减少线程空闲时间

创建并行流的三种方式

方式1:直接创建并行流
List<String> list = Arrays.asList("a", "b", "c", "d", "e");

// 使用 parallelStream() 直接创建并行流
List<String> parallelResult = list.parallelStream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
方式2:将顺序流转换为并行流
List<String> result = list.stream()        // 先创建顺序流
    .parallel()                           // 转换为并行流
    .map(String::toUpperCase)
    .collect(Collectors.toList());
方式3:将并行流转换回顺序流
List<String> sequentialResult = list.parallelStream()  // 创建并行流
    .sequential()                                     // 转换回顺序流
    .map(String::toUpperCase)
    .collect(Collectors.toList());

并行流的工作原理

数据分割(Splitting)
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// 并行流会自动将数据分割成多个块
int sum = numbers.parallelStream()
    .mapToInt(Integer::intValue)
    .sum();

// 相当于:
// 线程1处理: [1, 2, 3, 4, 5] → sum=15
// 线程2处理: [6, 7, 8, 9, 10] → sum=40
// 最终结果: 15 + 40 = 55
执行流程
  1. 分割:将数据源分割成多个子任务

  2. 处理:多个线程并行处理各自的数据块

  3. 合并:将各个线程的结果合并为最终结果

并行流的适用场景

✅ 适合使用并行流的场景

1. 数据量大的计算密集型任务

// 计算1到1千万的和
long sum = LongStream.rangeClosed(1, 10_000_000)
    .parallel()  // 使用并行流大幅提升性能
    .sum();

2. 独立的元素处理

List<String> words = Arrays.asList("apple", "banana", "cherry", "date");

// 每个字符串的处理是独立的
List<String> upperCaseWords = words.parallelStream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());

3. 过滤操作

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// 并行过滤偶数
List<Integer> evenNumbers = numbers.parallelStream()
    .filter(n -> n % 2 == 0)
    .collect(Collectors.toList());
❌ 不适合使用并行流的场景

1. 有状态的操作(可能产生错误结果)

// 错误的用法:skip() 和 limit() 在并行流中行为不确定
List<Integer> result = numbers.parallelStream()
    .skip(2)
    .limit(5)
    .collect(Collectors.toList()); // 结果可能不一致

2. 有顺序依赖的操作

// 错误的用法:findFirst() 在并行流中可能返回任意第一个完成的元素
Optional<Integer> first = numbers.parallelStream()
    .filter(n -> n > 5)
    .findFirst(); // 结果不确定

3. 数据量太小

List<Integer> smallList = Arrays.asList(1, 2, 3);

// 开销大于收益:创建线程的开销可能超过计算收益
long sum = smallList.parallelStream()
    .mapToInt(Integer::intValue)
    .sum();

5. 实战示例

示例1:数据处理管道

List<Person> people = Arrays.asList(
    new Person("Alice", 25, "New York"),
    new Person("Bob", 30, "London"),
    new Person("Charlie", 35, "New York"),
    new Person("David", 28, "Paris")
);

// 复杂的流操作
Map<String, Double> averageAgeByCity = people.stream()
    .filter(p -> p.getAge() > 25)                    // 过滤年龄>25
    .map(p -> {                                      // 转换城市名为大写
        p.setCity(p.getCity().toUpperCase());
        return p;
    })
    .sorted(Comparator.comparing(Person::getAge))    // 按年龄排序
    .collect(Collectors.groupingBy(                  // 按城市分组
        Person::getCity,
        Collectors.averagingInt(Person::getAge)      // 计算平均年龄
    ));

示例2:文件处理

try (Stream<String> lines = Files.lines(Paths.get("data.txt"))) {
    List<String> longWords = lines
        .flatMap(line -> Arrays.stream(line.split("\\s+")))
        .filter(word -> word.length() > 5)
        .distinct()
        .sorted()
        .collect(Collectors.toList());
} catch (IOException e) {
    e.printStackTrace();
}

示例3:数据库式查询

// 模拟SQL查询:SELECT name FROM users WHERE age > 25 ORDER BY name
List<String> result = users.stream()
    .filter(user -> user.getAge() > 25)
    .map(User::getName)
    .sorted()
    .collect(Collectors.toList());

6. 最佳实践和注意事项

性能考虑:

// 不好的写法:多次终端操作
long count = list.stream().filter(s -> s.length() > 3).count();
List<String> result = list.stream().filter(s -> s.length() > 3).collect(Collectors.toList());

// 好的写法:重用流
Stream<String> filteredStream = list.stream().filter(s -> s.length() > 3);
long count = filteredStream.count();
// 注意:流只能消费一次,这里会抛出异常

避免副作用:

// 不好的写法:在流中修改外部状态
List<String> result = new ArrayList<>();
list.stream()
    .filter(s -> s.length() > 3)
    .forEach(result::add);  // 副作用!

// 好的写法:使用collect
List<String> result = list.stream()
    .filter(s -> s.length() > 3)
    .collect(Collectors.toList());

空值处理:

List<String> safeList = Optional.ofNullable(list)
    .orElse(Collections.emptyList())
    .stream()
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

7. 常用的 Collectors 方法

// 连接字符串
String joined = list.stream().collect(Collectors.joining(", "));

// 统计
Double average = list.stream().collect(Collectors.averagingInt(String::length));
Long count = list.stream().collect(Collectors.counting());

// 最大最小值
Optional<String> max = list.stream().collect(Collectors.maxBy(Comparator.naturalOrder()));

// 多级分组
Map<Integer, Map<String, List<Person>>> multiLevel = people.stream()
    .collect(Collectors.groupingBy(
        Person::getAge,
        Collectors.groupingBy(Person::getCity)
    ));


网站公告

今日签到

点亮在社区的每一天
去签到