【Java函数式编程】补充终结算子和问题解决

发布于:2025-08-07 ⋅ 阅读:(15) ⋅ 点赞:(0)

这里实际上和spark的算子有重复的,但还是两者相辅相成,spark中的算子主要是从parallelize和textFile开始的,因此spark更加注重List接口和Tuple元素,并且有很强的读写倾向和探究实际工程的倾向。Java的函数式编程则更注重流的转换,从文档理解和抓方法应用,方法实际操作等角度着墨更多,两者实际上是一回事儿,底层并没有改变。

        


目录

终结算子

collect

anyMatch

allMatch

noneMatch

findFirst

reduce

ifPresent

问题答疑

2. Arrays工具类没有子类的原因

3. 输出文件末尾换行问题

4. 集合框架设计

5. 类名冲突优先级

6. DAG与迭代计算

7. Lambda嵌套问题

8. findAny行为解析


         

这里接着补充剩下的终结算子的作用/特点,应用 ,并将之前几天的问题一起解决一下

终结算子

collect

作用:

1.用于结合最后的foreach进行输出
2.利用其中的Collectors接口并且重写其中的两个Function方法

代码实现1
        

        sampleAuthors.stream()
                .map(Author::getName)
                .collect(Collectors.toList())
                .forEach(System.out::println);

数据输出类型:
这里使用Collectors的toList方法和toSet方法,这两个方法都返回的是Collector接口类型,只不过接口的具体泛型一个是List<>,一个是Set<>。并且这里的collect起到的作用是收集器,收集map的输出内容并以XX形式返回。

注意,如果这里最后接一个toString方法那么就是以列表的形式打印出来

如果是按照上述foreach打印的话就是一个一个打印出来




代码实现2:复杂
        

        List<Author> sampleAuthors = AuthorFactory.createSampleAuthors();
        Map<String, List<Author.BookList>> map = sampleAuthors.stream()
                .distinct()
                .collect(Collectors.toMap(new Function<Author, String>() {
                    @Override
                    public String apply(Author author) {
                        return author.getName();
                    }
                }, new Function<Author, List<Author.BookList>>() {
                    @Override
                    public List<Author.BookList> apply(Author author) {
                        return author.getBooklist();
                    }
                }));


        System.out.println(map);

输出数据类型




实际生产场景:数据聚合与转换
案例:电商订单分析

// 分析每个客户的订单总额
Map<Long, Double> customerTotalSpending = orders.stream()
    .collect(Collectors.groupingBy(
        Order::getCustomerId,
        Collectors.summingDouble(Order::getAmount)
    ));

// 生成CSV报告
String report = products.stream()
    .filter(p -> p.getStock() < 10)
    .sorted(Comparator.comparing(Product::getStock))
    .map(p -> p.getName() + "," + p.getStock())
    .collect(Collectors.joining("\n"));

适用场景

  • 将流数据转换为 Map/List/Set 等集合

  • 生成汇总报表

  • 构建缓存数据结构

  • 数据分组统计(如按地区、类别分组)

       


anyMatch

作用:只要有一个符合要求则返回boolean类型的结果

代码实现
 

        boolean b = sampleAuthors.stream()
                .anyMatch(author -> author.getBooklist().isEmpty());
        System.out.println(b);

数据输出类型
true  or  false



实际生产场景:快速存在性检查

案例:风控系统
                

// 检查交易是否存在高风险特征
boolean isHighRisk = transactions.stream()
    .anyMatch(tx -> 
        tx.getAmount() > 100000 || 
        RISK_COUNTRIES.contains(tx.getCountry())
    );

// 系统健康检查
boolean isServiceDown = endpoints.stream()
    .anyMatch(ep -> ep.getResponseTime() > TIMEOUT_THRESHOLD);

                        


allMatch

作用:所有的流数据都要符合要求,返回一个boolean类型结果

代码实现

        boolean b1 = sampleAuthors.stream()
                .allMatch(author -> author.getBooklist().isEmpty());
        System.out.println(b1);

数据输出类型
true  or  false
        
        
实际生产场景:
完整性验证案例:订单处理系统


         

// 验证订单所有商品都有库存
boolean canFulfillOrder = order.getItems().stream()
    .allMatch(item -> 
        inventoryService.getStock(item.getSku()) >= item.getQuantity()
    );

// 配置校验
boolean validConfig = configParams.stream()
    .allMatch(param -> !param.isBlank());

                            


noneMatch

作用:判断流中元素都不符合匹配条件,都不符合为true,反之则为false


代码实现
 

        boolean b2 = sampleAuthors.stream()
                .noneMatch(author -> author.getBooklist().isEmpty());
        System.out.println(b2);

数据输出类型
true  or  false


实际生产场景:排他性检查

案例:用户注册系统

// 检查用户名未被使用
boolean usernameAvailable = existingUsers.stream()
    .noneMatch(user -> user.getUsername().equals(newUsername));

// 安全扫描
boolean isClean = systemFiles.stream()
    .noneMatch(file -> MALWARE_SIGNATURES.contains(file.getHash()));

实际生产场景:获取首个可用资源
案例:服务发现
 

// 获取第一个可用的支付网关
paymentGateways.stream()
    .filter(PaymentGateway::isAvailable)
    .sorted(Comparator.comparing(PaymentGateway::getPriority))
    .findFirst()
    .ifPresent(gateway -> processPayment(order, gateway));

// 故障转移处理
backupServers.stream()
    .filter(Server::isHealthy)
    .findFirst()
    .orElseThrow(() -> new ServiceUnavailableException());

        


findFirst

作用:获取流当中的第一个元素

代码实现:

        sampleAuthors.stream()
                .findFirst().ifPresent(n -> System.out.println(n.getName()));

数据输出类型
findFirst方法会返回一个Optional开头的Author的json串


ifPresent方法传入Lambda表达式,可以消除Optional前缀



实际生产场景:获取首个可用资源
案例:服务发现

// 获取第一个可用的支付网关
paymentGateways.stream()
    .filter(PaymentGateway::isAvailable)
    .sorted(Comparator.comparing(PaymentGateway::getPriority))
    .findFirst()
    .ifPresent(gateway -> processPayment(order, gateway));

// 故障转移处理
backupServers.stream()
    .filter(Server::isHealthy)
    .findFirst()
    .orElseThrow(() -> new ServiceUnavailableException());

        


reduce

作用:将结果按照你设定的逻辑进行聚合

和map搭配使用

第一种模式:只传入Fuction类型

        sampleAuthors.stream()
                .map(author -> author.getAge())
                .reduce(new BinaryOperator<Integer>() {
                    @Override
                    public Integer apply(Integer a, Integer b3) {
                        return Integer.sum(a, b3);
                    }
                }).ifPresent(System.out::println);

第二种模式:传递两个参数,第一个是Integer类型,第二个是Fuction类型

        Integer reduce = sampleAuthors.stream()
                .map(author -> author.getAge())
                .reduce(Integer.MIN_VALUE, new BinaryOperator<Integer>() {
                    @Override
                    public Integer apply(Integer a, Integer b3) {
                        return Integer.max(a, b3);
                    }
                });
        System.out.println(reduce);


应用:使用reduce求最大最小值

        

        sampleAuthors.stream()
                .map(author -> author.getAge())
                .reduce(new BinaryOperator<Integer>() {
                    @Override
                    public Integer apply(Integer a, Integer b3) {
                        return Integer.max(a, b3);
                    }
                }).ifPresent(System.out::println);

实际生产场景:聚合计算
案例:金融分析系统
 

// 计算投资组合总价值
BigDecimal portfolioValue = holdings.stream()
    .map(holding -> 
        holding.getShares().multiply(marketData.getPrice(holding.getSymbol()))
    )
    .reduce(BigDecimal.ZERO, BigDecimal::add);

// 构建复杂对象
ShippingLabel label = order.getItems().stream()
    .map(this::createLabelFragment)
    .reduce(new ShippingLabel(), 
            ShippingLabel::merge, 
            ShippingLabel::combine);

        


ifPresent

作用:接在optional对象的后面

ifPresent 是 Optional 类的方法,用于在值存在时执行特定操作。它在生产环境中非常实用,特别是在处理可能为 null 的值时,可以避免空指针异常并使代码更加优雅。

代码实现
 

            sampleAuthors.stream()
                    .findFirst().ifPresent(n -> System.out.println(n.getName()));

实际生产场景

用户操作确认

场景:用户提交表单后的结果处理
        

// 用户注册服务
userService.register(newUser)
    .ifPresent(userId -> {
        // 仅当注册成功时发送欢迎邮件
        emailService.sendWelcomeEmail(userId);
        logger.info("New user registered: {}", userId);
    });

        

作者类和作者工厂类

作者工厂类
 

public class AuthorFactory {

    public static List<Author> createSampleAuthors() {
        List<String> books = Arrays.asList("Java", "Python", "C++", "JavaScript", "C#", "PHP","html","CSS");

        return Arrays.asList(
                new Author(35, "John Doe", new Author.BookList(books)),
                new Author(28, "Jane Smith", new Author.BookList(books.subList(0, 3))),
                new Author(42, "Robert Johnson", new Author.BookList(books.subList(2, 5))),
                new Author(42, "Robert Johnson", new Author.BookList(books.subList(2, 5))),
                new Author(45, "Bruce Venn", new Author.BookList(books.subList(6, 7)))
        );
    }
}

作者类
 

public class Author implements Comparable<Author> {
    private int age;
    private String name;
    private BookList booklist;

    // 构造函数
    public Author(int age, String name, BookList booklist) {
        this.age = age;
        this.name = name;
        this.booklist = booklist;
    }

    // 书籍列表内部类
    public static class BookList {
        private List<String> books;  // 更合理的字段名

        public BookList(List<String> books) {
            this.books = books;
        }

        public List<String> getBooks() {
            return books;
        }

        @Override
        public String toString() {
            return books.toString();
        }
    }

    // 获取书籍流
    public Stream<String> getBookStream() {
        return booklist.getBooks().stream();
    }

    // 其他访问方法
    public int getAge() {
        return age;
    }

    public String getName() {
        return name;
    }

    public BookList getBooklist() {
        return booklist;
    }

    // 重写方法
    @Override
    public String toString() {
        return "Author{" + name + " (" + age + "), books: " + booklist + "}";
    }

    @Override
    public int compareTo(Author o) {
        return Integer.compare(o.age, this.age);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Author)) return false;
        Author author = (Author) o;
        return age == author.age && name.equals(author.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(age, name);
    }
}

问题答疑

1. RDD分区对齐问题

为什么需要对齐?

  • 数据本地性优化:分区对齐确保相关数据位于同一节点,减少网络传输

  • 计算效率提升:避免Shuffle操作,降低I/O开销

  • 资源利用率优化:任务分配更均衡,避免数据倾斜

分区不对齐的后果

  1. 性能严重下降:触发Shuffle操作,增加网络传输和磁盘I/O

  2. 数据倾斜风险:某些分区负载过重,导致任务执行时间不均衡

  3. 内存溢出风险:单个Executor处理过大分区可能导致OOM

  4. 结果不可预测:在聚合操作中可能导致数据重复或丢失

2. Arrays工具类没有子类的原因

  • 设计目的:提供静态工具方法,无需实例化

  • final修饰:Java核心库中java.util.Arrays被声明为final

  • 防止误用:避免开发者通过继承改变数组操作行为

  • 性能考虑:保持简单轻量,没有对象创建开销

  • 完整方法集:已提供所有必要的数组操作方法

3. 输出文件末尾换行问题

根本原因

java

// Spark保存文本的源码逻辑
def saveAsTextFile(path: String): Unit = {
  this.map(_.toString).saveAsHadoopFile(path, 
    classOf[TextOutputFormat[_, _]])
}

// Hadoop TextOutputFormat 实现
public void write(K key, V value) throws IOException {
  out.write(key.toString());
  out.write('\t'); 
  out.write(value.toString());
  out.write('\n');  // 这里添加换行符
}

解决方案

scala

// 1. 使用自定义OutputFormat
rdd.saveAsHadoopFile(path, classOf[NullWritable], 
  classOf[Text], classOf[CustomOutputFormat])

// 2. 保存前移除换行
rdd.map(_.stripLineEnd).saveAsTextFile(path)

// 3. 使用二进制格式
rdd.saveAsSequenceFile(path)  // 不添加换行

4. 集合框架设计

  • 单列集合Collection

    • List:有序可重复 ArrayListLinkedList

    • Set:无序唯一 HashSetTreeSet

    • Queue:队列 PriorityQueueArrayDeque

  • 双列集合Map

    • 键值对存储 HashMapTreeMap

    • 特殊实现 ConcurrentHashMapLinkedHashMap

5. 类名冲突优先级

IDEA类加载顺序

  1. 当前项目源代码(src目录)

  2. 项目依赖库(按pom/gradle声明顺序)

  3. JDK核心库

解决方案

java

// 明确指定全限定名
java.util.List<String> sysList = new ArrayList<>();
com.mypackage.List<String> myList = new MyList<>();

6. DAG与迭代计算

DAG是Spark迭代计算的基础

  • 优化迭代:将多次迭代融合为单一DAG执行计划

  • 内存持久化:支持将中间RDD缓存到内存

scala

val data = sc.parallelize(1 to 1000000).cache() // 缓存基础数据

// 迭代计算示例
for (i <- 1 to 10) {
  data.map(x => complexCalculation(x, i)).reduce(_ + _)
}

  • 执行计划优化:Catalyst优化器重组DAG

  • 容错机制:基于DAG的血缘关系实现快速恢复

7. Lambda嵌套问题

可以嵌套但需谨慎

java

// 嵌套Lambda示例
list.stream()
    .filter(outer -> 
        outer.getItems().stream()
             .anyMatch(inner -> inner.getValue() > 10)
    )
    .collect(Collectors.toList());

最佳实践

  1. 深度限制:嵌套不超过3层

  2. 提取方法:将内层Lambda转为方法引用

java

list.stream()
    .filter(this::hasHighValueItems)
    .collect(Collectors.toList());

private boolean hasHighValueItems(Outer outer) {
    return outer.getItems().stream()
                .anyMatch(inner -> inner.getValue() > 10);
}

  1. 性能注意:嵌套Lambda增加闭包创建开销

8. findAny行为解析

看似返回第一个元素的原因

  1. 顺序流特性:在顺序流中,findAny()默认返回首个元素

  2. 实现优化:JDK内部短路操作从流起始处查找

  3. 并行流差异:在并行流中真正返回任意元素

验证并行流

java

List<Integer> nums = IntStream.range(0, 1000).boxed().collect(Collectors.toList());

// 顺序流 - 总是0
System.out.println(nums.stream().findAny().get()); 

// 并行流 - 随机结果
System.out.println(nums.parallelStream().findAny().get());

正确使用场景

  • 当仅需确认存在性时使用findAny()

  • 当需要首个元素时使用findFirst()


网站公告

今日签到

点亮在社区的每一天
去签到