spark SQL和flink的table sql有什么关系和区别吗

2020-08-18 22:34发布

1条回答
无需指教
2楼 · 2020-08-19 08:31

Apache Flink 具有两个关系型API:Table API 和SQL,用于统一流和批处理。
Table API 是用于 Scala 和 Java 语言的查询API,允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

Table API 和 SQL 还没有完全支持并且正在积极开发中。

要使用 Table API 和SQL,需要将以下依赖引入项目:


  org.apache.flink
  flink-table_2.11
  1.6.1

Table API 和SQL

批处理和流式传输的 Table API 和SQL程序都遵循相同的模式。以下代码示例显示了常见的程序结构:

// 批处理使用 ExecutionEnvironmentval env = StreamExecutionEnvironment.getExecutionEnvironment// 创建 TableEnvironmentval tableEnv = TableEnvironment.getTableEnvironment(env)// 注册 TabletableEnv.registerTable("table1", ...)// Table API queryval tapiResult = tableEnv.scan("table1").select(...)// SQL queryval sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")// Sink query resulttapiResult.writeToSink(...)// executeenv.execute()

TableEnvironment

TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:

  • 在内部目录中注册表

  • 注册外部目录

  • 执行SQL查询

  • 注册用户定义的函数

  • DataStream 或 DataSet 转换为 Table

  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 总是与特定的 TableEnvironment 绑定。不能在同一查询中组合不同 TableEnvironments 的表(例如,union 或 join)。创建 TableEnvironment:

// STREAMING QUERYval sEnv = StreamExecutionEnvironment.getExecutionEnvironment// create a TableEnvironment for streaming queriesval sTableEnv = TableEnvironment.getTableEnvironment(sEnv)// BATCH QUERYval bEnv = ExecutionEnvironment.getExecutionEnvironment// create a TableEnvironment for batch queriesval bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

注册 Table

TableEnvironment 维护一个按名称注册的表的目录。有两种类型的表,输入表(input table)和输出表(output table)。输入表可以在 Table API 和SQL查询中引用,并提供输入数据。输出表可用于将 Table API 或SQL查询的结果发送到外部系统。

输入表的注册源:

  • Table API 或SQL查询的结果表

  • 访问外部数据的 TableSource,例如文件,数据库或消息系统

  • DataStream 或 DataSet。

输出表的注册源:TableSink

代码示例:

val tableEnv = TableEnvironment.getTableEnvironment(env)// from Table API or SQLval projTable: Table = tableEnv.scan("X").select(...)tableEnv.registerTable("projectedTable", projTable)// from TableSourceval csvSource: TableSource = new CsvTableSource("/path/to/file", ...)tableEnv.registerTableSource("CsvTable", csvSource)// from TableSinkval csvSink: TableSink = new CsvTableSink("/path/to/file", ...)// define the field names and typesval fieldNames: Array[String] = Array("a", "b", "c")val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注册外部目录

外部目录(external catalog)可以提供有关外部数据库和表的信息(如名称,schema,统计信息以及访问信息)。可以通过实现 ExternalCatalog 接口创建外部目录,并在 TableEnvironment 中注册:

// 获取一个 StreamTableEnvironmentval tableEnv = TableEnvironment.getTableEnvironment(env)// 创建一个外部目录val catalog: ExternalCatalog = new InMemoryExternalCatalog// 注册外部目录tableEnv.registerExternalCatalog("InMemCatalog", catalog)

查询

Table API

Table API 是一个 Scala 和 Java 的语言集成查询API,是基于 Table类。Table类代表了一个流或者批表,并提供方法来使用关系型操作。这些方法返回一个新的 Table 对象,这个新的 Table 对象代表着输入的 Table 应用关系型操作后的结果。下面的例子展示了一个简单的 Table API 聚合查询:

// 获取一个 StreamTableEnvironmentval tableEnv = TableEnvironment.getTableEnvironment(env)// 注册一个名叫 Orders 的表 ...// 扫描注册的 Orders 表val orders = tableEnv.scan("Orders")// 计算所有来自法国的客户的收入val revenue = orders  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName')
  .select('cID, 'cName, 'revenue.sum AS 'revSum)// 执行查询

SQL

Flink SQL 集成是基于 Apache Calcite,Apache Calcite 实现了标准的SQL。下面的例子展示了如何指定一个查询并返回结果:

// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)// 注册一个名叫 Orders 的表// 计算所有来自法国的客户的收入
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)// 执行查询

指定将其结果插入已注册表的更新查询:

// 注册一个名叫 RevenueFrance 的输出表// 计算所有来自法国的客户的收入,并更新到 RevenueFrance 表
tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)// 执行查询

混合使用 Table API 和SQL,Table API 和SQL查询可以很容易地合并因为它们都返回 Table 对象:

  1. Table API 查询可以基于SQL查询结果的 Table 来进行

  2. SQL查询可以基于 Table API 查询的结果来定义

输出表

要输出 Table 可以写入 TableSink。TableSink 是通用接口,支持各种文件格式(如:CSV,Apache Parquet,Apache Avro)、存储系统(如:JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系统(如:Apache Kafka,RabbitMQ)。

批处理 Table 只能写入 BatchTableSink,而流式处理 Table 需要 AppendStreamTableSink,RetractStreamTableSink 或 UpsertStreamTableSink。有关可用接收器的详细信息,请参阅 Sources & Sinks。

有两种方法可以发送表:

  • Table.writeToSink(TableSink sink) 自动匹配 schema

  • Table.insertInto(String sinkTable) 使用特定 schema

以下示例显示如何发出Table:

// 获取一个 StreamTableEnvironmentval tableEnv = TableEnvironment.getTableEnvironment(env)// 使用Table API和/或SQL查询获取一个 Tableval result: Table = ...// 创建一个 TableSinkval sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")// METHOD 1://   将结果表写入 TableSinkresult.writeToSink(sink)// METHOD 2://   注册指定 schema 的 TableSinkval fieldNames: Array[String] = Array("a", "b", "c")val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)//   将结果表写入 TableSinkresult.insertInto("CsvSinkTable")// 执行程序

与 DataStream 和 DataSet API 集成

Table API 和SQL查询可以很容易地进行集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查询一个外部表(来自关系型数据库的表),做一些处理(如过滤、映射、聚合或者关联元数据),然后使用 DataStream 或者 DataSet API(以及在这些API之上构建的任何库,例如CEP或 Gelly) 进行进一步处理。

同样,Table API 或者SQL查询也可以应用于 DataStream 或者 DataSet 程序的结果中。这种交互可以通过将 DataStream 或者 DataSet 转换成一个 Table 及将 Table 转换成 DataStream 或者 DataSet 来实现。

Scala 隐式转换

Scala Table API 支持 DataSet,DataStream 以及 Table 间的隐式转换。需要引入 org.apache.flink.table.api.scala._org.apache.flink.api.scala._

DataStream 或 DataSet 转换为 Table

DataStream 或 DataSet 可以在 TableEnvironment 中注册为表,表的 schema 根据注册的 DataStream 或 DataSet 的数据类型来定:

val stream: DataStream[(Long, String)] = ...// register the DataStream as Table "myTable" with fields "f0", "f1"tableEnv.registerDataStream("myTable", stream)// register the DataStream as table "myTable2" with fields "myLong", "myString"tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

也可以直接转换为表,而不需要注册:

val stream: DataStream[(Long, String)] = ...// convert the DataStream into a Table with default fields '_1, '_2val table1: Table = tableEnv.fromDataStream(stream)// convert the DataStream into a Table with fields 'myLong, 'myStringval table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

Table 转换为 DataStream 或 DataSet

Table 可以转换为 DataStream 或者 DataSet,通过这种方式,DataStream 或者 DataSet 程序就可以基于 Table API 或者SQL查询的结果来执行了。

当将一个 Table 转换为 DataStream 或 DataSet 时,需要指定生成的 DataStream 或 DataSet 的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是 Row,下面列表描述了不同选项的功能:

  1. Row:字段按位置、任意数量字段映射,支持 null 值,无类型安全访问

  2. POJO:字段按名称(POJO 字段命名为 Table 字段)、任意数量字段映射,支持 null 值,类型安全访问

  3. Case Class:字段按位置映射,不支持 null 值,类型安全访问

  4. Tuple:字段按位置映射,不得多于22(Scala)或 25(Java)个字段,不支持 null 值,类型安全访问

  5. Atomic Type:Table 必须有一个字段,不支持 null 值,类型安全访问

Table 转换 DataStream

流式查询的结果表会动态地更新,每个新的记录到达输入流时结果就会发生变化。有两种模式将 Table 转换为 DataStream:

  1. Append Mode:只适用于当动态表仅由 INSERT 修改时,之前的结果不会被更新。

  2. Retract Mode:始终都可以使用此模式,使用一个 boolean 标识来编码 INSERTDELETE 更改。

// 有两个字段的 Table(String name, Integer age)val table: Table = ...// 将 Table 转换为 Row 类型的 Append DataStreamval dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)// 将 Table 转换为 Tuple2 类型的 Append DataStream  val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)// 将 Table 转换为 Row 类型的 Retact DataStream//   一个 ReactDataStream 的类型X为表示为 DataStream[(Boolean, X)]//   boolean 字段指定了更改的类型//   True 是 INSERT, false 是 DELETEval retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

Table 转换 DataSet

// 有两个字段的 Table(String name, Integer age)val table: Table = ...// 将 Table 转换为 Row 类型的 DataSetval dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)//  将 Table 转换为 Tuple2 类型的 DataSetval dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

将数据类型映射到表模式(Table schema)

DataStream 和 DataSet API 支持多种数据类型,如:Tuple、POJO、case class 及 Row 类型。

原子类型

Flink 将原生类型(Integer、Double、String...)或泛型类型视为原子类型(Atomic type)。一个原子类型的 DataStream 或 DataSet 可以转换为只有一个属性的 Table,属性的类型根据原子类型推算,并且必须指定属性的名称。

val stream: DataStream[Long] = ...// convert DataStream into Table with default field name "f0"val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with field name "myLong"val table: Table = tableEnv.fromDataStream(stream, 'myLong)

Tuple 和 Case Class

Flink 支持 Scala 原生的 Tuple 类型,也为 Java 提供了 Tuple 类。两种类型的 DataStream 和 DataSet 都可以被转换为 Table。通过为所有字段提供名称(基于位置的映射),可以重命名字段。如果未指定字段名,则使用默认字段名。基于名称的映射允许使用别名(as)重新排序字段。

val stream: DataStream[(Long, String)] = ...// convert DataStream into Table with renamed default field names '_1, '_2val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with field names "myLong", "myString" (position-based)val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)// convert DataStream into Table with reordered fields "_2", "_1" (name-based)val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)// convert DataStream into Table with projected field "_2" (name-based)val table: Table = tableEnv.fromDataStream(stream, '_2)// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)// define case classcase class Person(name: String, age: Int)val streamCC: DataStream[Person] = ...// convert DataStream into Table with default field names 'name, 'ageval table = tableEnv.fromDataStream(streamCC)// convert DataStream into Table with field names 'myName, 'myAge (position-based)val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO

Flink 支持使用 POJO 作为复合类型。当将一个 POJO 类型的 DataStream 或 DataSet 转换为 Table 而不指定字段名称时,Table 的字段名称将采用 POJO 原生的字段名称。重命名原始的 POJO 字段需要关键字AS,因为 POJO 没有固定的顺序,名称映射需要原始名称并且不能通过位置来完成。

// get a TableEnvironmentval tableEnv = TableEnvironment.getTableEnvironment(env)// Person is a POJO with field names "name" and "age"val stream: DataStream[Person] = ...// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)// convert DataStream into Table with projected field "name" (name-based)val table: Table = tableEnv.fromDataStream(stream, 'name)// convert DataStream into Table with projected and renamed field "myName" (name-based)val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Row

Row 类型支持任意数量的字段,并且支持 null 值。字段名称可以通过 RowTypeInfo 来指定或者将一个 Row 类型的 DataStream 或 DataSet 转换为 Table 时指定。Row 类型支持按位置和名字映射。可以通过为所有字段提供名称(基于位置)或为 映射/排序/重命名(基于名称)单独选择字段来重命名字段。

// get a TableEnvironmentval tableEnv = TableEnvironment.getTableEnvironment(env)// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`val stream: DataStream[Row] = ...// convert DataStream into Table with default field names "name", "age"val table: Table = tableEnv.fromDataStream(stream)// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)// convert DataStream into Table with projected field "name" (name-based)val table: Table = tableEnv.fromDataStream(stream, 'name)// convert DataStream into Table with projected and renamed field "myName" (name-based)val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)



作者:Alex90
链接:https://www.jianshu.com/p/97eaade9fa08
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问