flink流的connect 和 union 的区别

2020-08-27 09:38发布

flink流的connect 和 union 的区别

flink流的connect 和 union 的区别

2条回答
我的网名不再改
2楼 · 2020-08-27 14:32

join:

1 可用于DataStream和DataSet。只能2个DataStream一起join,或者2个DataSet一起join

2 用于DataStream时返回是JoinedStreams  ,用于DataSet时返回是JoinOperatorSets  

3 用于DataStream时需要与窗口同时使用,语法是:join where equalTo window apply ,用于DataSet时的语法是:join where equalTo with (where是指定第一个输入的分区字段,equalTo是指定第二个输入的分区字段,这2个字段类型需要一致)

4 与SQL中的inner join同义,只输出2个实时窗口内或2个数据集合内能匹配上的笛卡尔积,不能匹配上的不输出。

5 apply方法中或with方法中均可以使用JoinFunction或 FlatJoinFunction处理匹配上的数据对(用于DataStream和DataSet时均可)

6 侧重对2个输入里的 数据对 进行处理,join方法的入参是单个数据

7 可以join2个类型不同的流或join2个类型不同的数据集(比如Tuple2 join Tuple2),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

Flink,join双流,join实例,java版本

Flink,join DataSet数据集,join实例,java版本

 

coGroup:

1 可用于DataStream和DataSet。只能2个DataStream一起coGroup,或者2个DataSet一起coGroup

2 用于DataStream时返回是CoGroupedStreams,用于DataSet时返回是CoGroupOperatorSets

3 用于DataStream时需要与窗口同时使用,语法是:coGroup where equalTo window apply ,用于DataSet时的语法是:coGroup where equalTo with,

4 把2个实时窗口内或2个数据集合内key相同的数据分组同一个分区,key不能匹配上的数据(只在一个窗口或集合内存在的数据)也分组到另一个分区上。

5 apply方法中或with方法中均可以使用CoGroupFunction对数据分组(用于DataStream和DataSet时均可,无FlatCoGroupFunction)

6 侧重对2个输入的 集合 进行处理,coGroup方法的入参是Iterable类型

7 可以coGroup2个类型不同的流或coGroup2个类型不同的数据集(比如Tuple2 join Tuple2),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

Flink,coGroup双流,coGroup实例,java版本

Flink,coGroup DataSet数据集,coGroup实例,java版本

 

connect:

1 只能用于DataStream,返回是ConnectedStreams。不能用于DataSet.

2 只能2个流一起connect(stream1.connect(stream2))

3 connect后可以对2个流分别处理(使用CoMapFunction或CoFlatMapFunction)

4  可以connect2个类型不同的流(比如Tuple2 connect Tuple2

Flink,connect双流,connect实例,java版本

 

union:

1 用于DataStream时,返回是Datastream;用于DataSet时,返回是DataSet;

2 可以多个流一起合并(stream1.union(stream2,stream3,stream4)),合并结果是一个新Datastream;只能2个DataSet一起合并,合并结果是一个新DataSet

3 无论是合并Datastream还是合并DataSet,都不去重,2个源的消息或记录都保存。

4 不可以union 2个类型不同的流或union 2个类型不同的数据集

Flink,union双流,union实例,java版本

Flink,union DataSet数据集,union实例,java版本

 

---------------------------------更多精辟的见解可见下方----------------------------------

0:

https://www.jianshu.com/p/aa7d0f6d0dc4

 

1 :

ConnectedStreams

在 DataStream 上有一个 union 的转换 dataStream.union(otherStream1, otherStream2, ...),用来合并多个流,新的流会包含所有流中的数据。union 有一个限制,就是所有合并的流的类型必须是一致的。ConnectedStreams 提供了和 union 类似的功能,用来连接两个流,但是与 union 转换有以下几个区别:

  1. ConnectedStreams 只能连接两个流,而 union 可以连接多于两个流。

  2. ConnectedStreams 连接的两个流类型可以不一致,而 union 连接的流的类型必须一致。

  3. ConnectedStreams 会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态。这在第一个流的输入会影响第二个流时, 会非常有用。

如下 ConnectedStreams 的样例,连接 input 和 other 流,并在input流上应用map1方法,在other上应用map2方法,双流可以共享状态(比如计数)。

val input: DataStream[MyType] = ...val other: DataStream[AnotherType] = ...val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other)val result: DataStream[ResultType] =connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() {override def map1(value: MyType): ResultType = { ... }override def map2(value: AnotherType): ResultType = { ... }})

当并行度为2时,其执行图如下所示:

http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/

 

2:

union

DataStream上使用union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图union对白色和深色两个数据流进行合并,生成一个数据流。

 

union示意图

 

 

假设股票价格数据流来自不同的交易所,我们将其合并成一个数据流:

val shenzhenStockStream: DataStream[StockPrice] = ...val hongkongStockStream: DataStream[StockPrice] = ...val shanghaiStockStream: DataStream[StockPrice] = ...val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream)复制代码

connect

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  1. connect只能连接两个数据流,union可以连接多个数据流。

  2. connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

  3. 两个DataStream经过connect之后被转化为ConnectedStreamsConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上,如下图所示。控制流可以是阈值、规则、机器学习模型或其他参数。

 

对一个数据流进行控制处理

 

 

对于ConnectedStreams,我们需要重写CoMapFunctionCoFlatMapFunction。这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。在重写函数时,对于CoMapFunctionmap1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunctionflatMap1处理第一个流的数据,flatMap2处理第二个流的数据。Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1flatMap1会被调用,第二个数据流有数据到达时,map2flatMap2会被调用。


LERRR
3楼 · 2021-11-18 14:50

connect:

1 只能用于DataStream,返回是ConnectedStreams。不能用于DataSet.

2 只能2个流一起connect(stream1.connect(stream2))

3 connect后可以对2个流分别处理(使用CoMapFunction或CoFlatMapFunction)

4  可以connect2个类型不同的流(比如Tuple2 connect Tuple2)

Flink,connect双流,connect实例,java版本

 

union:

1 用于DataStream时,返回是Datastream;用于DataSet时,返回是DataSet;

2 可以多个流一起合并(stream1.union(stream2,stream3,stream4)),合并结果是一个新Datastream;只能2个DataSet一起合并,合并结果是一个新DataSet

3 无论是合并Datastream还是合并DataSet,都不去重,2个源的消息或记录都保存。

4 不可以union 2个类型不同的流或union 2个类型不同的数据集

Flink,union双流,union实例,java版本

Flink,union DataSet数据集,union实例,java版本


相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问