flink学习(5)——预定义source

发布于:2024-12-08 ⋅ 阅读:(114) ⋅ 点赞:(0)

Collection [测试]--本地集合Source

fromElements

// env.fromElements

// 使用这种方式的时候 后面的数据类型需要一致 
// 1、  
/*
* 1、String 可以 2、Object 不行(String,Float,Long不能同时传入)
* 3、数组可以  4、集合可以
*/
//第一种
DataStreamSource<String> elementsStreamSource = env.fromElements("hello word", "hello nihao", "hello hi");

// 第二种 会报错
// DataStreamSource<Object> source1 = env.fromElements("hello word",2);

// 第三种 使用数组格式 
String[] strings = {"1","2","3"};
/* 结果是地址 若使用Array 转为字符串 那就是第一种了
* [I@5b5eabd3
* [I@646e68e6
* [I@c7511aa
*/
DataStreamSource<String[]> dataStreamSource = env.fromElements(strings, strings, strings);

// 第四种 使用集合格式

String[] strings = {1,2,3};
List<String> list = Arrays.asList(strings);
DataStreamSource<List<String>> listDataStreamSource = env.fromElements(list, list, list);

// 结果就没什么问题了

fromCollection

//env.fromColletion  只能转换单个集合
String[] strings = {"1","2","3"};
List<String> list = Arrays.asList(strings);
DataStreamSource<String> dataStreamSource = env.fromCollection(list);

fromSequence

// env.fromSequence(开始,结束); ——快速创建一个dataStream

DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 100);

Socket [测试]

DataStreamSource<String> dataStreamSource = env.socketTextStream("bigdata02", 9000);

int parallelism = env.getParallelism();

yum install -y nc 
nc -lk 8888
windows平台:nc -lp 8888

这种情况下,不论你的电脑有多少核,并行度均为1 

File

// readTextFile
// 假如你有一个相对路径 怎么变成绝对路径 使用File 进行转换即可
DataStreamSource<String> dataStreamSource = env.readTextFile("datas/wc.txt");
DataStreamSource<String> dataStreamSource1 = env.readTextFile("hdfs://bigdata01:9820/home/homedata/data.txt");


网站公告

今日签到

点亮在社区的每一天
去签到