深入浅出Flink-第二天
目标
- 掌握常见的DataStream常见的source
- 掌握常见的DataStream的transformation操作
- 掌握常见的DataStream的sink操作
- 了解入门的DataSet API算子
📖 1. DataStream 的编程模型
- DataStream 的编程模型包括四个部分:Environment、DataSource、Transformation、Sink
📖 2. Flink的DataSource数据源
2.1 基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat读取规则,逐行读取并返回。
案例
package com.kaikeba.demo1 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time object StreamingSourceFromFile { def main(args: Array[String]): Unit = { //构建流处理的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //从socket获取数据 val sourceStream: DataStream[String] = env.readTextFile("d:\\words.txt") //导入隐式转换的包 import org.apache.flink.api.scala._ //对数据进行处理 val result: DataStream[(String, Int)] = sourceStream .flatMap(x => x.split(" ")) //按照空格切分 .map(x => (x, 1)) //每个单词计为1 .keyBy(0) //按照下标为0的单词进行分组 .sum(1) //按照下标为1累加相同单词出现的1 //保存结果 result.writeAsText("d:\\result") //开启任务 env.execute("FlinkStream") } }
2.2 基于socket
socketTextStream
从socker中读取数据,元素可以通过一个分隔符切开。
案例
package com.kaikeba.demo1 import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow /** * 使用滑动窗口 * 每隔1秒钟统计最近2秒钟的每个单词出现的次数 */ object FlinkStream { def main(args: Array[String]): Unit = { //构建流处理的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //从socket获取数据 val sourceStream: DataStream[String] = env.socketTextStream("node01",9999) //导入隐式转换的包 import org.apache.flink.api.scala._ //对数据进行处理 val result: DataStream[(String, Int)] = sourceStream .flatMap(x => x.split(" ")) //按照空格切分 .map(x => (x, 1)) //每个单词计为1 .keyBy(0) //按照下标为0的单词进行分组 .sum(1) //按照下标为1累加相同单词出现的1 //对数据进行打印 result.print() //开启任务 env.execute("FlinkStream") } }
2.3 基于集合
fromCollection(Collection)
通过collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
案例
package com.kaikeba.demo2 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object StreamingSourceFromCollection { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换的包 import org.apache.flink.api.scala._ //准备数据源--数组 val array = Array("hello world","world spark","flink test","spark hive","test") val fromArray: DataStream[String] = environment.fromCollection(array) sparkConotext. // val value: DataStream[String] = environment.fromElements("hello world") val resultDataStream: DataStream[(String, Int)] = fromArray .flatMap(x => x.split(" ")) .map(x =>(x,1)) .keyBy(0) .sum(1) //打印 resultDataStream.print() //启动 environment.execute() } }
2.4 自定义输入
addSource
可以实现读取第三方数据源的数据
自定义单并行度数据源
继承SourceFunction来自定义单并行度source
代码开发
package com.kaikeba.demo2 import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} /** * 自定义单并行度source * */ object MySourceRun { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val getSource: DataStream[Long] = environment.addSource(new MySource).setParallelism(1) val resultStream: DataStream[Long] = getSource.filter(x => x %2 ==0) resultStream.setParallelism(1).print() environment.execute() } } //继承SourceFunction来自定义单并行度source class MySource extends SourceFunction[Long] { private var number = 1L private var isRunning = true override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = { while (isRunning){ number += 1 sourceContext.collect(number) Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } }
自定义多并行度数据源
继承ParallelSourceFunction来自定义多并行度的source
代码开发
package com.kaikeba.demo2 import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} /** * * 多并行度的source */ object MyMultipartSourceRun { def main(args: Array[String]): Unit = { //构建流处理环境 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //添加source val getSource: DataStream[Long] = environment.addSource(new MultipartSource).setParallelism(2) //处理 val resultStream: DataStream[Long] = getSource.filter(x => x %2 ==0) resultStream.setParallelism(2).print() environment.execute() } } //继承ParallelSourceFunction来自定义多并行度的source class MultipartSource extends ParallelSourceFunction[Long]{ private var number = 1L private var isRunning = true override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = { while(true){ number +=1 sourceContext.collect(number) Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } }
此外系统内置提供了一批connectors,连接器会提供对应的source支持
- Apache Kafka (source/sink) 后面重点分析
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
📖 3. Flink的Sink数据目标
- writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
- print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
- 自定义输出addSink【kafka、redis】
- 我们可以通过sink算子,将我们的数据发送到指定的地方去,例如kafka或者redis或者hbase等等,前面我们已经使用过将数据打印出来调用print()方法,接下来我们来实现自定义sink将我们的数据发送到redis里面去
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
3.1 Flink写数据到redis中
导入flink整合redis的jar包
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
代码开发
package com.kaikeba.demo2 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} /** * flink实时程序处理保存结果到redis中 */ object Stream2Redis { def main(args: Array[String]): Unit = { //获取程序入口类 val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //组织数据 val streamSource: DataStream[String] = executionEnvironment.fromElements("1 hadoop","2 spark","3 flink") //将数据包装成为key,value对形式的tuple val tupleValue: DataStream[(String, String)] = streamSource.map(x =>(x.split(" ")(0),x.split(" ")(1))) val builder = new FlinkJedisPoolConfig.Builder //设置redis客户端参数 builder.setHost("node01") builder.setPort(6379) builder.setTimeout(5000) builder.setMaxTotal(50) builder.setMaxIdle(10) builder.setMinIdle(5) val config: FlinkJedisPoolConfig = builder.build() //获取redis sink val redisSink = new RedisSink[Tuple2[String,String]](config,new MyRedisMapper) //使用我们自定义的sink tupleValue.addSink(redisSink) //执行程序 executionEnvironment.execute("redisSink") } } //定义一个RedisMapper类 class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{ override def getCommandDescription: RedisCommandDescription = { //设置插入数据到redis的命令 new RedisCommandDescription(RedisCommand.SET) } //指定key override def getKeyFromData(data: (String, String)): String = { data._1 } //指定value override def getValueFromData(data: (String, String)): String = { data._2 } }
📖 4. DataStream 转换算子
通过从一个或多个 DataStream 生成新的 DataStream 的过程被称为 Transformation 操作。在转换过程中,每种操作类型被定义为不同的 Operator, Flink 程序能够将多个 Transformation 组成一个 DataFlow 的拓扑。
DataStream 官网转换算子操作:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html
4.1 map、filter
package com.kaikeba.demo3
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object MapFilter {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val sourceStream: DataStream[Int] = environment.fromElements(1,2,3,4,5,6)
val mapStream: DataStream[Int] = sourceStream.map(x =>x*10)
val resultStream: DataStream[Int] = mapStream.filter(x => x%2 ==0)
resultStream.print()
environment.execute()
}
}
4.2 flatMap、keyBy、sum
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 使用滑动窗口
* 每隔1秒钟统计最近2秒钟的每个单词出现的次数
*/
object FlinkStream {
def main(args: Array[String]): Unit = {
//获取程序入口类
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//从socket当中获取数据
val resultDataStream: DataStream[String] = environment.socketTextStream("node01",9999)
//导入隐式转换的包
import org.apache.flink.api.scala._
//对数据进行计算操作
val resultData: DataStream[(String, Int)] = resultDataStream
.flatMap(x => x.split(" ")) //按照空格进行切分
.map(x => (x, 1)) //程序出现一次记做1
.keyBy(0) //按照下标为0的单词进行统计
.timeWindow(Time.seconds(2), Time.seconds(1)) //每隔一秒钟计算一次前两秒钟的单词出现的次数
.sum(1)
resultData.print()
//执行程序
environment.execute()
}
}
4.3 reduce
- 是将输入的 KeyedStream 流通过传入的用户自定义的ReduceFunction滚动地进行数据聚合处理
package com.kaikeba.demo3
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object ReduceStream {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val sourceStream: DataStream[(String,Int)] = environment
.fromElements(("a",1),("a",2),("b",2),("a",3),("c",2))
val keyByStream: KeyedStream[(String, Int), Tuple] = sourceStream.keyBy(0)
val resultStream: DataStream[(String, Int)] = keyByStream.reduce((t1,t2)=>(t1._1,t1._2+t2._2))
resultStream.print()
environment.execute()
}
}
4.4 union
- 把2个流的数据进行合并,2个流的数据类型必须保持一致
package com.kaikeba.demo3
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* union算子
*/
object UnionStream {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val firstStream: DataStream[String] = environment.fromCollection(Array("hello spark","hello flink"))
val secondStream: DataStream[String] = environment.fromCollection(Array("hadoop spark","hive flink"))
//两个流合并成为一个流,必须保证两个流当中的数据类型是一致的
val resultStream: DataStream[String] = firstStream.union(secondStream)
resultStream.print()
environment.execute()
}
}
4.5 connect
- 和union类似,但是只能连接两个流,两个流的数据类型可以不同
package com.kaikeba.demo3
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
/**
* 和union类似,但是只能连接两个流,两个流的数据类型可以不同,
* 会对两个流中的数据应用不同的处理方法
*/
object ConnectStream {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
import org.apache.flink.api.scala._
val firstStream: DataStream[String] = environment.fromCollection(Array("hello world","spark flink"))
val secondStream: DataStream[Int] = environment.fromCollection(Array(1,2,3,4))
//调用connect方法连接多个DataStream
val connectStream: ConnectedStreams[String, Int] = firstStream.connect(secondStream)
val unionStream: DataStream[Any] = connectStream.map(x => x + "abc",y => y*2)
val coFlatMapStream: DataStream[String] = connectStream.flatMap(new CoFlatMapFunction[String, Int, String] {
//对第一个流中的数据操作
override def flatMap1(value: String, out: Collector[String]): Unit = {
out.collect(value.toUpperCase())
}
//对第二个流中的数据操作
override def flatMap2(value: Int, out: Collector[String]): Unit = {
out.collect( value * 2 + "")
}
})
unionStream.print()
coFlatMapStream.print()
environment.execute()
}
}
4.6 split、select
- 根据规则把一个数据流切分为多个流
package com.kaikeba.demo3
/**
* 根据规则把一个数据流切分为多个流
应用场景:
* 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
* 把一个数据流切分成多个数据流,这样每个数据流就可以使用不同的处理逻辑了
*/
import java.{lang, util}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
object SplitAndSelect {
def main(arg s: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
import org.apache.flink.api.scala._
//构建DataStream
val firstStream: DataStream[String] = environment.fromCollection(Array("hadoop hive","spark flink"))
val selectStream: SplitStream[String] = firstStream.split(new OutputSelector[String] {
override def select(value: String): lang.Iterable[String] = {
var list = new util.ArrayList[String]()
//如果包含hello字符串
if (value.contains("hadoop")) {
//存放到一个叫做first的stream里面去
list.add("first")
}else{
//否则存放到一个叫做second的stream里面去
list.add("second")
}
list
}
})
//获取first这个stream
selectStream.select("first").print()
environment.execute()
}
}
4.7 重分区算子
- 重算子允许我们对数据进行重新分区,或者解决数据倾斜等问题
Random Partitioning
- 随机分区
- 根据均匀分布随机分配元素(类似于random.nextInt(5),0 - 5 在概率上是均匀的)
- dataStream.shuffle()
- 随机分区
Rebalancing
- 均匀分区
- 分区元素循环,每个分区创建相等的负载。数据发生倾斜的时候可以用于性能优化。
- 对数据集进行再平衡,重分区,消除数据倾斜
- dataStream.rebalance()
- 均匀分区
Rescaling:
跟rebalance有点类似,但不是全局的,这种方式仅发生在一个单一的节点,因此没有跨网络的数据传输。
- dataStream.rescale()
Custom partitioning:自定义分区
- 自定义分区需要实现Partitioner接口
- dataStream.partitionCustom(partitioner, “someKey”)
- 或者dataStream.partitionCustom(partitioner, 0);
- 自定义分区需要实现Partitioner接口
Broadcasting:广播变量,后面详细讲解
4.7.1 对filter之后的数据进行重新分区
package com.kaikeba.demo4
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* 对filter之后的数据进行重新分区
*/
object FlinkPartition {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val dataStream: DataStream[Int] = environment.fromCollection(1 to 100)
val filterStream: DataStream[Int] = dataStream.filter(x => x>10)
//.shuffle //随机的重新分发数据,上游的数据,随机的发送到下游的分区里面去
//.rescale
.rebalance //对数据重新进行分区,涉及到shuffle的过程
val resultStream: DataStream[(Int, Int)] = filterStream.map(new RichMapFunction[Int, (Int, Int)] {
override def map(value: Int): (Int, Int) = {
//获取任务id,以及value
(getRuntimeContext.getIndexOfThisSubtask, value)
}
})
resultStream.print()
environment.execute()
}
}
4.7.2 自定义分区策略
如果以上的几种分区方式还没法满足我们的需求,我们还可以自定义分区策略来实现数据的分区
需求
- 自定义分区策略,实现不同分区的数据发送到不同分区里面去进行处理,将包含hello的字符串发送到一个分区里面去,其他的发送到另外一个分区里面去
定义分区类
import org.apache.flink.api.common.functions.Partitioner class MyPartitioner extends Partitioner[String]{ override def partition(line: String, num: Int): Int = { println("分区个数为" + num) if(line.contains("hello")){ 0 }else{ 1 } } }
定义分区class类
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkCustomerPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //获取dataStream val sourceStream: DataStream[String] = environment.fromElements("hello laowang","spark flink","hello tony","hive hadoop") val rePartition: DataStream[String] = sourceStream.partitionCustom(new MyPartitioner,x => x +"") rePartition.map(x =>{ println("数据的key为" + x + "线程为" + Thread.currentThread().getId) x }) rePartition.print() environment.execute() } }
📖 5. DataSet 转换算子
DataSet官网转换算子操作:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/index.html#dataset-transformations
Map
- 输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
FlatMap
- 输入一个元素,可以返回零个,一个或者多个元素
MapPartition
- 类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
Filter
- 过滤函数,对传入的数据进行判断,符合条件的数据会被留下
Reduce
- 对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
Aggregate
- sum、max、min等
Distinct
- 返回一个数据集中去重之后的元素,data.distinct()
Join
- 内连接
OuterJoin
- 外链接
Cross
- 获取两个数据集的笛卡尔积
Union
- 返回两个数据集的总和,数据类型需要一致
First-n
- 获取集合中的前N个元素
Sort Partition
- 在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序
5.1 mapPartition
package com.kaikeba.demo5
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import scala.collection.mutable.ArrayBuffer
object MapPartitionDataSet {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val arrayBuffer = new ArrayBuffer[String]()
arrayBuffer.+=("hello world1")
arrayBuffer.+=("hello world2")
arrayBuffer.+=("hello world3")
arrayBuffer.+=("hello world4")
val collectionDataSet: DataSet[String] = environment.fromCollection(arrayBuffer)
val resultPartition: DataSet[String] = collectionDataSet.mapPartition(eachPartition => {
eachPartition.map(eachLine => {
val returnValue = eachLine + " result"
returnValue
})
})
resultPartition.print()
}
}
5.2 distinct
package com.kaikeba.demo5
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ArrayBuffer
object DistinctDataSet {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val arrayBuffer = new ArrayBuffer[String]()
arrayBuffer.+=("hello world1")
arrayBuffer.+=("hello world2")
arrayBuffer.+=("hello world3")
arrayBuffer.+=("hello world4")
val collectionDataSet: DataSet[String] = environment.fromCollection(arrayBuffer)
val dsDataSet: DataSet[String] = collectionDataSet.flatMap(x => x.split(" ")).distinct()
dsDataSet.print()
}
}
5.3 join
package com.kaikeba.demo5
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ArrayBuffer
object JoinDataSet {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val array1 = ArrayBuffer((1,"张三"),(2,"李四"),(3,"王五"))
val array2 =ArrayBuffer((1,"18"),(2,"35"),(3,"42"))
val firstDataStream: DataSet[(Int, String)] = environment.fromCollection(array1)
val secondDataStream: DataSet[(Int, String)] = environment.fromCollection(array2)
val joinResult: UnfinishedJoinOperation[(Int, String), (Int, String)] = firstDataStream.join(secondDataStream)
//where指定左边流关联的字段 ,equalTo指定与右边流相同的字段
val resultDataSet: DataSet[(Int, String, String)] = joinResult.where(0).equalTo(0).map(x => {
(x._1._1, x._1._2, x._2._2)
})
resultDataSet.print()
}
}
5.4 leftOuterJoin、rightOuterJoin
package com.kaikeba.demo5
import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ArrayBuffer
object OutJoinDataSet {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val array1 = ArrayBuffer((1,"张三"),(2,"李四"),(3,"王五"),(4,"张飞"))
val array2 =ArrayBuffer((1,"18"),(2,"35"),(3,"42"),(5,"50"))
val firstDataStream: DataSet[(Int, String)] = environment.fromCollection(array1)
val secondDataStream: DataSet[(Int, String)] = environment.fromCollection(array2)
//左外连接
val leftOuterJoin: UnfinishedOuterJoinOperation[(Int, String), (Int, String)] = firstDataStream.leftOuterJoin(secondDataStream)
//where指定左边流关联的字段 ,equalTo指定与右边流相同的字段
val leftDataSet: JoinFunctionAssigner[(Int, String), (Int, String)] = leftOuterJoin.where(0).equalTo(0)
//对关联的数据进行函数操作
val leftResult: DataSet[(Int, String,String)] = leftDataSet.apply(new JoinFunction[(Int, String), (Int, String), (Int,String, String)] {
override def join(left: (Int, String), right: (Int, String)): (Int, String, String) = {
val result = if (right == null) {
Tuple3[Int, String, String](left._1, left._2, "null")
} else {
Tuple3[Int, String, String](left._1, left._2, right._2)
}
result
}
})
leftResult.print()
//右外连接
val rightOuterJoin: UnfinishedOuterJoinOperation[(Int, String), (Int, String)] = firstDataStream.rightOuterJoin(secondDataStream)
//where指定左边流关联的字段 ,equalTo指定与右边流相同的字段
val rightDataSet: JoinFunctionAssigner[(Int, String), (Int, String)] = rightOuterJoin.where(0).equalTo(0)
//对关联的数据进行函数操作
val rightResult: DataSet[(Int, String,String)] = rightDataSet.apply(new JoinFunction[(Int, String), (Int, String), (Int,String, String)] {
override def join(left: (Int, String), right: (Int, String)): (Int, String, String) = {
val result = if (left == null) {
Tuple3[Int, String, String](right._1, right._2, "null")
} else {
Tuple3[Int, String, String](right._1, right._2, left._2)
}
result
}
})
rightResult.print()
}
}
5.5 cross
package com.kaikeba.demo5
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ArrayBuffer
object CrossJoinDataSet {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val array1 = ArrayBuffer((1,"张三"),(2,"李四"),(3,"王五"),(4,"张飞"))
val array2 =ArrayBuffer((1,"18"),(2,"35"),(3,"42"),(5,"50"))
val firstDataStream: DataSet[(Int, String)] = environment.fromCollection(array1)
val secondDataStream: DataSet[(Int, String)] = environment.fromCollection(array2)
//cross笛卡尔积
val crossDataSet: CrossDataSet[(Int, String), (Int, String)] = firstDataStream.cross(secondDataStream)
crossDataSet.print()
}
}
5.6 first-n 和 sortPartition
package com.kaikeba.demo5
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ArrayBuffer
object TopNAndPartition {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//数组
val array = ArrayBuffer((1,"张三",10),(2,"李四",20),(3,"王五",30),(3,"赵6",40))
val collectionDataSet: DataSet[(Int, String,Int)] = environment.fromCollection(array)
//获取前3个元素
collectionDataSet.first(3).print()
collectionDataSet
.groupBy(0) //按照第一个字段进行分组
.sortGroup(2,Order.DESCENDING) //按照第三个字段进行排序
.first(1) //获取每组的前一个元素
.print()
/**
* 不分组排序,针对所有元素进行排序,第一个元素降序,第三个元素升序
*/
collectionDataSet.sortPartition(0,Order.DESCENDING).sortPartition(2,Order.ASCENDING).print()
}
}
5.7 partition分区算子
package com.kaikeba.demo5
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ArrayBuffer
object PartitionDataSet {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val array = ArrayBuffer((1,"hello"),
(2,"hello"),
(2,"hello"),
(3,"hello"),
(3,"hello"),
(3,"hello"),
(4,"hello"),
(4,"hello"),
(4,"hello"),
(4,"hello"),
(5,"hello"),
(5,"hello"),
(5,"hello"),
(5,"hello"),
(5,"hello"),
(6,"hello"),
(6,"hello"),
(6,"hello"),
(6,"hello"),
(6,"hello"),
(6,"hello"))
environment.setParallelism(2)
val sourceDataSet: DataSet[(Int, String)] = environment.fromCollection(array)
//partitionByHash:按照指定的字段hashPartitioner分区
sourceDataSet.partitionByHash(0).mapPartition(eachPartition => {
eachPartition.foreach(t=>{
println("当前线程ID为" + Thread.currentThread().getId +"============="+t._1)
})
eachPartition
}).print()
//partitionByRange:按照指定的字段进行范围分区
sourceDataSet.partitionByRange(x => x._1).mapPartition(eachPartition =>{
eachPartition.foreach(t=>{
println("当前线程ID为" + Thread.currentThread().getId +"============="+t._1)
})
eachPartition
}).print()
}
}
📖 6. Flink的dataSet connector介绍
查看官网
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/connectors.html
6.1 文件系统connector
- 为了从文件系统读取数据,Flink内置了对以下文件系统的支持:
文件系统 | Schema | 备注 |
---|---|---|
HDFS | hdfs:// | Hdfs文件系统 |
S3 | s3:// | 通过hadoop文件系统实现支持 |
MapR | maprfs:// | 需要用户添加jar |
Alluxio | alluxio:// | 通过hadoop文件系统实现 |
- 注意
- Flink允许用户使用实现org.apache.hadoop.fs.FileSystem接口的任何文件系统。例如S3、 Google Cloud Storage Connector for Hadoop、 Alluxio、 XtreemFS、 FTP等各种文件系统
Flink与Apache Hadoop MapReduce接口兼容,因此允许重用Hadoop MapReduce实现的代码:
使用Hadoop Writable data type
使用任何Hadoop InputFormat作为DataSource(flink内置HadoopInputFormat)
使用任何Hadoop OutputFormat作为DataSink(flink内置HadoopOutputFormat)
使用Hadoop Mapper作为FlatMapFunction
使用Hadoop Reducer作为GroupReduceFunction
6.2 Flink集成Hbase之数据读取
Flink也可以直接与hbase进行集成,将hbase作为Flink的source和sink等
- 第一步:创建hbase表并插入数据
create 'hbasesource','f1'
put 'hbasesource','0001','f1:name','zhangsan'
put 'hbasesource','0001','f1:age','18'
- 第二步:导入整合jar包
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<!-- 暂时没有1.9.2这个版本 -->
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.14.2</version>
</dependency>
- 第三步:开发flink集成hbase读取hbase数据
package com.kaikeba.demo6
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.flink.api.java.tuple
/**
* flink从hbase表中读取数据
*/
object FlinkReadHBase {
def main(args: Array[String]): Unit = {
//获取批处理的环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//通过InputFormat添加数据源
val hbaseDataSet=env.createInput(new TableInputFormat[tuple.Tuple2[String, String]] {
//初始化配置方法
override def configure(parameters: Configuration): Unit = {
val conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, "node01,node02,node03")
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
val conn: Connection = ConnectionFactory.createConnection(conf)
table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource")))
scan = new Scan() {
addFamily(Bytes.toBytes("f1"))
}
}
override def getTableName: String = {
"hbasesource"
}
override def getScanner: Scan = {
scan
}
//封装hbase表数据
override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = {
//获取rowkey
val rowkey: String = Bytes.toString(result.getRow)
val rawCells: Array[Cell] = result.rawCells()
val sb = new StringBuffer()
for (cell <- rawCells) {
val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
sb.append(value).append(",")
}
val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString
val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String]
//给元素的下标赋值
tuple2.setField(rowkey, 0)
tuple2.setField(valueString, 1)
tuple2
}
})
hbaseDataSet.print()
}
}
6.3 Flink读取数据,然后写入hbase
Flink也可以集成Hbase实现将数据写入到Hbase里面去
第一种:实现OutputFormat接口
第二种:继承RichSinkFunction重写父类方法
package com.kaikeba.demo6
import java.util
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
/**
* flink写数据到hbase表中
*/
object FlinkWriteHBase {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//准备数据
val sourceDataSet: DataSet[String] = environment.fromElements("0002,lisi,28","0003,wangwu,30")
//使用OutputFormat接口,写数据到hbase表中
sourceDataSet.output(new HBaseOutputFormat)
environment.execute()
}
}
//定义OutputFormat接口
class HBaseOutputFormat extends OutputFormat[String]{
val zkServer = "node01,node02,node03"
val port = "2181"
var conn: Connection = null
override def configure(parameters: Configuration): Unit = {
}
override def open(taskNumber: Int, numTasks: Int): Unit = {
val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
conn = ConnectionFactory.createConnection(config)
}
//写数据的方法
override def writeRecord(record: String): Unit ={
val tableName: TableName = TableName.valueOf("hbasesource")
val cf1 = "f1"
val array: Array[String] = record.split(",")
val put: Put = new Put(Bytes.toBytes(array(0)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
val puts = new util.ArrayList[Put]()
puts.add(put)
//设置缓存1m,当达到1m时数据会自动刷到hbase
val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
//设置缓存的大小
params.writeBufferSize(1024 * 1024)
val mutator: BufferedMutator = conn.getBufferedMutator(params)
mutator.mutate(puts)
mutator.flush()
puts.clear()
}
override def close(): Unit ={
if(null != conn){
conn.close()
}
}
}
📖 7. Flink之广播变量
- 概念
广播变量允许编程人员在每台机器上保持一个只读的缓存变量,而不是传送变量的副本给tasks,
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
- 用法
(1):初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
(2):广播数据
.withBroadcastSet(toBroadcast, "broadcastSetName");
(3):获取数据
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意:
a:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
b:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
- 案例
package com.kaikeba.demo6
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ArrayBuffer
/**
* flink广播变量使用案例
*/
object FlinkBroadCast {
def main(args: Array[String]): Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//准备数据集
val userInfo =ArrayBuffer(("zs", 10),("ls", 20),("ww", 30))
//加载数据集构建DataSet--需要广播的数据
val userDataSet: DataSet[(String, Int)] = environment.fromCollection(userInfo)
//原始数据
val data = environment.fromElements("zs","ls","ww")
//在这里需要使用到RichMapFunction获取广播变量
val result = data.map(new RichMapFunction[String,String] {
//定义一个list集合,用户接受open方法中获取到的广播变量
var listData: java.util.List[(String,Int)] = null
//定义一个map集合,存储广播变量中的内容
var allMap = Map[String,Int]()
//初始化方法 可以在open方法中获取广播变量数据
override def open(parameters: Configuration): Unit ={
//获取广播变量(broadcastMapName)的值
listData= getRuntimeContext.getBroadcastVariable[(String,Int)]("broadcastMapName")
val it = listData.iterator()
while (it.hasNext){
val tuple = it.next()
allMap +=(tuple._1 -> tuple._2)
}
}
//使用广播变量操作数据
override def map(name: String): String = {
val age = allMap.getOrElse(name,20)
name+","+age
}
}).withBroadcastSet(userDataSet,"broadcastMapName")
result.print()
}
}
📖 8. Flink之Counter(计数器/累加器)
- 概念
Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化,可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器(Accumulator)实现IntCounter, LongCounter 和 DoubleCounter
- 用法
(1):创建累加器
val counter=new IntCounter()
(2):注册累加器
getRuntimeContext.addAccumulator("num-lines",counter)
(3):使用累加器
counter.add(1)
(4):获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
- 案例
- 需求
- 通过计数器来实现统计文件当中Exception关键字出现的次数
- 需求
package com.kaikeba.demo6
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
* 通过计数器来实现统计文件当中Exception关键字出现的次数
*/
object FlinkCounterAndAccumulator {
def main(args: Array[String]): Unit = {
val env=ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//统计tomcat日志当中exception关键字出现了多少次
val sourceDataSet: DataSet[String] = env.readTextFile("E:\\catalina.out")
sourceDataSet.map(new RichMapFunction[String,String] {
//创建累加器
var counter=new LongCounter()
override def open(parameters: Configuration): Unit = {
//注册累加器
getRuntimeContext.addAccumulator("my-accumulator",counter)
}
//实现业务逻辑
override def map(value: String): String = {
if(value.toLowerCase().contains("exception")){
//满足条件累加器加1
counter.add(1)
}
value
}
}).writeAsText("E:\\test")
val job=env.execute()
//获取累加器,并打印累加器的值
val count=job.getAccumulatorResult[Long]("my-accumulator")
//打印
println(count)
}
}
📖 9. 分布式缓存
- 概念
Flink提供了一个类似于hadoop分布式缓存,可以使用户在并行函数中很方便的读取本地文件。
前面讲到的广播变量是将一些共享的数据放在TaskManager内存中,而Distribute cache是从外部加载一个文件/目录(例如hdfs),然后分别复制到每一个TaskManager的本地磁盘中。
- 用法
(1):使用Flink运行环境调用registerCachedFile注册一个分布式缓存
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
(2): 获取分布式缓存
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
- 案例
package com.kaikeba.demo6
import java.util
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source
/**
* flink的分布式缓存使用
*/
object FlinkDistributedCache {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//准备数据集
val scoreDataSet = env.fromElements((1, "语文", 50),(2, "数学", 60), (3, "英文", 80))
//todo:1、注册分布式缓存文件
env.registerCachedFile("E:\\distribute_cache_student.txt","student")
//对成绩数据集进行map转换,将(学生ID, 学科, 分数)转换为(学生姓名,学科,分数)
val result: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {
var list: List[(Int, String)] = _
//初始化方法
override def open(parameters: Configuration): Unit = {
//获取分布式缓存的文件
val file = getRuntimeContext.getDistributedCache.getFile("student")
//获取文件的内容
import scala.collection.JavaConverters._
val listData: List[String] = FileUtils.readLines(file).asScala.toList
//将文本转换为元组(学生ID,学生姓名)
list = listData.map {
line =>{
val array = line.split(",")
(array(0).toInt, array(1))
}
}
}
//在map方法中使用分布式缓存数据进行转换
override def map(value: (Int, String, Int)): (String, String, Int) = {
//获取学生id
val studentId: Int = value._1
val studentName: String = list.filter(x => studentId == x._1)(0)._2
//封装结果返回
// 将成绩数据(学生ID,学科,成绩) -> (学生姓名,学科,成绩)
(studentName, value._2, value._3)
}
})
result.print()
}
}
📖 10 Flink的task之间传输数据方式以及Operator Chain
10.1 数据传输的方式
forward strategy
- 转发策略
(1) 一个 task 的输出只发送给一个 task 作为输入 (2) 如果两个 task 都在一个 JVM 中的话,那么就可以避免网络开销
key-based strategy
- 基于键的策略
(1)数据需要按照某个属性(我们称为 key)进行分组(或者说分区) (2)相同key的数据需要传输给同一个task,在一个task中进行处理
broadcast strategy
- 广播策略
(1)在该情况下,一个数据集不动,另一个数据集会copy到有第一个数据集部分数据的所有机器上。 如果使用小数据集与大数据集进行join,可以选择broadcast-forward策略,将小数据集广播,避免代价高的重分区。
random strategy
- 随机策略
(1)数据随机的从一个task中传输给下一个operator所有的subtask (2)保证数据能均匀的传输给所有的subtask,以便在任务之间均匀地分配负载
PS:
转发与随机策略是基于key-based策略的;转发策略和随机策略也可以看作是基于键的策略的变体,其中前者保存上游元组的键,而后者执行键的随机重新分配。
10.2 Operator Chain
概念
operator chain是指将满足一定条件的
operator
链在一起,放在同一个task里面执行,是Flink任务优化的一种方式,在同一个task里面的operator的数据传输变成函数调用关系,它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
常见的chain,例如:source->map->filter,这样的任务链可以chain在一起,那么其内部是如何决定是否能够chain在一起的呢?
Operator Chain的条件
(1) 数据传输策略是 forward strategy (2) 在同一个TaskManager中运行 (3) 上下游task的并行度相同
在我们的单词技术统计程序当中,设置对应的并行度,便会发生operator chain这个动作了