这里实际上和spark的算子有重复的,但还是两者相辅相成,spark中的算子主要是从parallelize和textFile开始的,因此spark更加注重List接口和Tuple元素,并且有很强的读写倾向和探究实际工程的倾向。Java的函数式编程则更注重流的转换,从文档理解和抓方法应用,方法实际操作等角度着墨更多,两者实际上是一回事儿,底层并没有改变。
目录
这里接着补充剩下的终结算子的作用/特点,应用 ,并将之前几天的问题一起解决一下
终结算子
collect
作用:
1.用于结合最后的foreach进行输出
2.利用其中的Collectors接口并且重写其中的两个Function方法
代码实现1
sampleAuthors.stream() .map(Author::getName) .collect(Collectors.toList()) .forEach(System.out::println);
数据输出类型:
这里使用Collectors的toList方法和toSet方法,这两个方法都返回的是Collector接口类型,只不过接口的具体泛型一个是List<>,一个是Set<>。并且这里的collect起到的作用是收集器,收集map的输出内容并以XX形式返回。
注意,如果这里最后接一个toString方法那么就是以列表的形式打印出来
如果是按照上述foreach打印的话就是一个一个打印出来
代码实现2:复杂
List<Author> sampleAuthors = AuthorFactory.createSampleAuthors(); Map<String, List<Author.BookList>> map = sampleAuthors.stream() .distinct() .collect(Collectors.toMap(new Function<Author, String>() { @Override public String apply(Author author) { return author.getName(); } }, new Function<Author, List<Author.BookList>>() { @Override public List<Author.BookList> apply(Author author) { return author.getBooklist(); } })); System.out.println(map);
输出数据类型
实际生产场景:数据聚合与转换
案例:电商订单分析// 分析每个客户的订单总额 Map<Long, Double> customerTotalSpending = orders.stream() .collect(Collectors.groupingBy( Order::getCustomerId, Collectors.summingDouble(Order::getAmount) )); // 生成CSV报告 String report = products.stream() .filter(p -> p.getStock() < 10) .sorted(Comparator.comparing(Product::getStock)) .map(p -> p.getName() + "," + p.getStock()) .collect(Collectors.joining("\n"));
适用场景:
将流数据转换为 Map/List/Set 等集合
生成汇总报表
构建缓存数据结构
数据分组统计(如按地区、类别分组)
anyMatch
作用:只要有一个符合要求则返回boolean类型的结果
代码实现
boolean b = sampleAuthors.stream() .anyMatch(author -> author.getBooklist().isEmpty()); System.out.println(b);
数据输出类型
true or false
实际生产场景:快速存在性检查
案例:风控系统
// 检查交易是否存在高风险特征 boolean isHighRisk = transactions.stream() .anyMatch(tx -> tx.getAmount() > 100000 || RISK_COUNTRIES.contains(tx.getCountry()) ); // 系统健康检查 boolean isServiceDown = endpoints.stream() .anyMatch(ep -> ep.getResponseTime() > TIMEOUT_THRESHOLD);
allMatch
作用:所有的流数据都要符合要求,返回一个boolean类型结果
代码实现boolean b1 = sampleAuthors.stream() .allMatch(author -> author.getBooklist().isEmpty()); System.out.println(b1);
数据输出类型
true or false
实际生产场景:
完整性验证案例:订单处理系统
// 验证订单所有商品都有库存 boolean canFulfillOrder = order.getItems().stream() .allMatch(item -> inventoryService.getStock(item.getSku()) >= item.getQuantity() ); // 配置校验 boolean validConfig = configParams.stream() .allMatch(param -> !param.isBlank());
noneMatch
作用:判断流中元素都不符合匹配条件,都不符合为true,反之则为false
代码实现
boolean b2 = sampleAuthors.stream() .noneMatch(author -> author.getBooklist().isEmpty()); System.out.println(b2);
数据输出类型
true or false
实际生产场景:排他性检查
案例:用户注册系统// 检查用户名未被使用 boolean usernameAvailable = existingUsers.stream() .noneMatch(user -> user.getUsername().equals(newUsername)); // 安全扫描 boolean isClean = systemFiles.stream() .noneMatch(file -> MALWARE_SIGNATURES.contains(file.getHash()));
实际生产场景:获取首个可用资源
案例:服务发现
// 获取第一个可用的支付网关 paymentGateways.stream() .filter(PaymentGateway::isAvailable) .sorted(Comparator.comparing(PaymentGateway::getPriority)) .findFirst() .ifPresent(gateway -> processPayment(order, gateway)); // 故障转移处理 backupServers.stream() .filter(Server::isHealthy) .findFirst() .orElseThrow(() -> new ServiceUnavailableException());
findFirst
作用:获取流当中的第一个元素
代码实现:sampleAuthors.stream() .findFirst().ifPresent(n -> System.out.println(n.getName()));
数据输出类型
findFirst方法会返回一个Optional开头的Author的json串
ifPresent方法传入Lambda表达式,可以消除Optional前缀
实际生产场景:获取首个可用资源
案例:服务发现// 获取第一个可用的支付网关 paymentGateways.stream() .filter(PaymentGateway::isAvailable) .sorted(Comparator.comparing(PaymentGateway::getPriority)) .findFirst() .ifPresent(gateway -> processPayment(order, gateway)); // 故障转移处理 backupServers.stream() .filter(Server::isHealthy) .findFirst() .orElseThrow(() -> new ServiceUnavailableException());
reduce
作用:将结果按照你设定的逻辑进行聚合
和map搭配使用
第一种模式:只传入Fuction类型sampleAuthors.stream() .map(author -> author.getAge()) .reduce(new BinaryOperator<Integer>() { @Override public Integer apply(Integer a, Integer b3) { return Integer.sum(a, b3); } }).ifPresent(System.out::println);
第二种模式:传递两个参数,第一个是Integer类型,第二个是Fuction类型
Integer reduce = sampleAuthors.stream() .map(author -> author.getAge()) .reduce(Integer.MIN_VALUE, new BinaryOperator<Integer>() { @Override public Integer apply(Integer a, Integer b3) { return Integer.max(a, b3); } }); System.out.println(reduce);
应用:使用reduce求最大最小值
sampleAuthors.stream() .map(author -> author.getAge()) .reduce(new BinaryOperator<Integer>() { @Override public Integer apply(Integer a, Integer b3) { return Integer.max(a, b3); } }).ifPresent(System.out::println);
实际生产场景:聚合计算
案例:金融分析系统
// 计算投资组合总价值 BigDecimal portfolioValue = holdings.stream() .map(holding -> holding.getShares().multiply(marketData.getPrice(holding.getSymbol())) ) .reduce(BigDecimal.ZERO, BigDecimal::add); // 构建复杂对象 ShippingLabel label = order.getItems().stream() .map(this::createLabelFragment) .reduce(new ShippingLabel(), ShippingLabel::merge, ShippingLabel::combine);
ifPresent
作用:接在optional对象的后面
ifPresent
是Optional
类的方法,用于在值存在时执行特定操作。它在生产环境中非常实用,特别是在处理可能为 null 的值时,可以避免空指针异常并使代码更加优雅。
代码实现
sampleAuthors.stream() .findFirst().ifPresent(n -> System.out.println(n.getName()));
实际生产场景
用户操作确认
场景:用户提交表单后的结果处理
// 用户注册服务 userService.register(newUser) .ifPresent(userId -> { // 仅当注册成功时发送欢迎邮件 emailService.sendWelcomeEmail(userId); logger.info("New user registered: {}", userId); });
作者类和作者工厂类
作者工厂类
public class AuthorFactory { public static List<Author> createSampleAuthors() { List<String> books = Arrays.asList("Java", "Python", "C++", "JavaScript", "C#", "PHP","html","CSS"); return Arrays.asList( new Author(35, "John Doe", new Author.BookList(books)), new Author(28, "Jane Smith", new Author.BookList(books.subList(0, 3))), new Author(42, "Robert Johnson", new Author.BookList(books.subList(2, 5))), new Author(42, "Robert Johnson", new Author.BookList(books.subList(2, 5))), new Author(45, "Bruce Venn", new Author.BookList(books.subList(6, 7))) ); } }
作者类
public class Author implements Comparable<Author> { private int age; private String name; private BookList booklist; // 构造函数 public Author(int age, String name, BookList booklist) { this.age = age; this.name = name; this.booklist = booklist; } // 书籍列表内部类 public static class BookList { private List<String> books; // 更合理的字段名 public BookList(List<String> books) { this.books = books; } public List<String> getBooks() { return books; } @Override public String toString() { return books.toString(); } } // 获取书籍流 public Stream<String> getBookStream() { return booklist.getBooks().stream(); } // 其他访问方法 public int getAge() { return age; } public String getName() { return name; } public BookList getBooklist() { return booklist; } // 重写方法 @Override public String toString() { return "Author{" + name + " (" + age + "), books: " + booklist + "}"; } @Override public int compareTo(Author o) { return Integer.compare(o.age, this.age); } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof Author)) return false; Author author = (Author) o; return age == author.age && name.equals(author.name); } @Override public int hashCode() { return Objects.hash(age, name); } }
问题答疑
1. RDD分区对齐问题
为什么需要对齐?
数据本地性优化:分区对齐确保相关数据位于同一节点,减少网络传输
计算效率提升:避免Shuffle操作,降低I/O开销
资源利用率优化:任务分配更均衡,避免数据倾斜
分区不对齐的后果:
性能严重下降:触发Shuffle操作,增加网络传输和磁盘I/O
数据倾斜风险:某些分区负载过重,导致任务执行时间不均衡
内存溢出风险:单个Executor处理过大分区可能导致OOM
结果不可预测:在聚合操作中可能导致数据重复或丢失
2. Arrays工具类没有子类的原因
设计目的:提供静态工具方法,无需实例化
final修饰:Java核心库中
java.util.Arrays
被声明为final
防止误用:避免开发者通过继承改变数组操作行为
性能考虑:保持简单轻量,没有对象创建开销
完整方法集:已提供所有必要的数组操作方法
3. 输出文件末尾换行问题
根本原因:
java
// Spark保存文本的源码逻辑 def saveAsTextFile(path: String): Unit = { this.map(_.toString).saveAsHadoopFile(path, classOf[TextOutputFormat[_, _]]) } // Hadoop TextOutputFormat 实现 public void write(K key, V value) throws IOException { out.write(key.toString()); out.write('\t'); out.write(value.toString()); out.write('\n'); // 这里添加换行符 }
解决方案:
scala
// 1. 使用自定义OutputFormat rdd.saveAsHadoopFile(path, classOf[NullWritable], classOf[Text], classOf[CustomOutputFormat]) // 2. 保存前移除换行 rdd.map(_.stripLineEnd).saveAsTextFile(path) // 3. 使用二进制格式 rdd.saveAsSequenceFile(path) // 不添加换行
4. 集合框架设计
单列集合:
Collection
List:有序可重复
ArrayList
,LinkedList
Set:无序唯一
HashSet
,TreeSet
Queue:队列
PriorityQueue
,ArrayDeque
双列集合:
Map
键值对存储
HashMap
,TreeMap
特殊实现
ConcurrentHashMap
,LinkedHashMap
5. 类名冲突优先级
IDEA类加载顺序:
当前项目源代码(src目录)
项目依赖库(按pom/gradle声明顺序)
JDK核心库
解决方案:
java
// 明确指定全限定名 java.util.List<String> sysList = new ArrayList<>(); com.mypackage.List<String> myList = new MyList<>();
6. DAG与迭代计算
DAG是Spark迭代计算的基础:
优化迭代:将多次迭代融合为单一DAG执行计划
内存持久化:支持将中间RDD缓存到内存
scala
val data = sc.parallelize(1 to 1000000).cache() // 缓存基础数据 // 迭代计算示例 for (i <- 1 to 10) { data.map(x => complexCalculation(x, i)).reduce(_ + _) }
执行计划优化:Catalyst优化器重组DAG
容错机制:基于DAG的血缘关系实现快速恢复
7. Lambda嵌套问题
可以嵌套但需谨慎:
java
// 嵌套Lambda示例 list.stream() .filter(outer -> outer.getItems().stream() .anyMatch(inner -> inner.getValue() > 10) ) .collect(Collectors.toList());
最佳实践:
深度限制:嵌套不超过3层
提取方法:将内层Lambda转为方法引用
java
list.stream() .filter(this::hasHighValueItems) .collect(Collectors.toList()); private boolean hasHighValueItems(Outer outer) { return outer.getItems().stream() .anyMatch(inner -> inner.getValue() > 10); }
性能注意:嵌套Lambda增加闭包创建开销
8. findAny行为解析
看似返回第一个元素的原因:
顺序流特性:在顺序流中,
findAny()
默认返回首个元素实现优化:JDK内部短路操作从流起始处查找
并行流差异:在并行流中真正返回任意元素
验证并行流:
java
List<Integer> nums = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); // 顺序流 - 总是0 System.out.println(nums.stream().findAny().get()); // 并行流 - 随机结果 System.out.println(nums.parallelStream().findAny().get());
正确使用场景:
当仅需确认存在性时使用
findAny()
当需要首个元素时使用
findFirst()