首页>国内 > 正文

今日视点:关于 Flink Regular Join 与 TTL 的理解

2022-06-20 09:00:14来源:今日头条


【资料图】

对于流查询,Regular Join的语法是最灵活的,它允许任何类型的更新(插入、更新、删除)输入表。

Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

Inner Join(Inner Equal Join):当两条流 Join 到才会输出 +[L, R]Left Join(Outer Equal Join):左流数据到达之后 Join 到 R 流数据则输出 +[L, R],没 Join 到输出 +[L, null])。如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null],然后输出 +[L, R]。Right Join(Outer Equal Join):与 Left Join 逻辑相反。Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R],没 Join 到输出 +[null, R];对左流来说:Join 到输出 +[L, R],没 Join 到输出 +[L, null])。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R],输出 +[L, R],右流数据到达为例:回撤 -[L, null],输出 +[L, R])。Regular Inner Join

Flink SQL​:

CREATE TABLE matchResult (    guid STRING) WITH (    "connector" = "kafka",    "topic" = "match_result_log_test",    "properties.bootstrap.servers" = "xxxxxxxxxxxxxxxxxxx",    "properties.group.id" = "flinkTestGroup",    "scan.startup.mode" = "latest-offset",    "format" = "json");CREATE TABLE readRecord (    guid STRING,    book_name STRING) WITH (    "connector" = "kafka",    "topic" = "read_record_log_test",    "properties.bootstrap.servers" = "xxxxxxxxxxxxxxxxxxx",    "properties.group.id" = "flinkTestGroup",    "scan.startup.mode" = "latest-offset",    "format" = "json");CREATE TABLE sink_table (    guid STRING,    book_name STRING) WITH (  "connector" = "print");INSERT INTO sink_tableSELECT    matchResult.guid,    readRecord.book_nameFROM matchResultINNER JOIN readRecord ON  matchResult.guid = readRecord.guid;

输出结果解析​:

-- L 流数据达到,由于没有 Join 到 R 流数据而且是 inner join 便不输出结果+I[111, book1]       -- R 流数据达到, Join 到 L 流数据,便输出 +I[111, book1]                               -- R 流数据达到,由于没有 Join 到 L 流数据而且是 inner join 便不输出结果+I[222, book2]       -- L 流数据达到, Join 到 R 流数据便输出结果
Regular Left Join(Right join 则相反)

Flink SQL:

CREATE TABLE matchResult (    guid STRING) WITH (    "connector" = "kafka",    "topic" = "match_result_log_test",    "properties.bootstrap.servers" = "xxxxxxxxxxxxxxxxxxx",    "properties.group.id" = "flinkTestGroup",    "scan.startup.mode" = "latest-offset",    "format" = "json");CREATE TABLE readRecord (    guid STRING,    book_name STRING) WITH (    "connector" = "kafka",    "topic" = "read_record_log_test",    "properties.bootstrap.servers" = "xxxxxxxxxxxxxxxxxxx",    "properties.group.id" = "flinkTestGroup",    "scan.startup.mode" = "latest-offset",    "format" = "json");CREATE TABLE sink_table (    guid STRING,    book_name STRING) WITH (  "connector" = "print");INSERT INTO sink_tableSELECT    matchResult.guid,    readRecord.book_nameFROM matchResultLEFT JOIN readRecord ON  matchResult.guid = readRecord.guid;

输出结果解析:

+I[111, null]           -- L 流数据达到,没有 Join 到 R 流数据,便输出 +[L, null]-D[111, null]          -- R 流的数据到达,发现 L 流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null]+I[111, book1]      -- 再输出 +[L, R]                              -- 这里模拟一条 R 流 guid = 222 的数据到达,由于是 left join 且没有 join 到 L 流,因此不做输出+I[222, book2]      -- 当 L 流 guid = 222 的数据达到 join  R 流 后输出结果 +[L, R]
Regular Full Join

Flink SQL:

CREATE TABLE matchResult (    guid STRING) WITH (    "connector" = "kafka",    "topic" = "match_result_log_test",    "properties.bootstrap.servers" = "xxxxxxxxxxxxxxxxxxx",    "properties.group.id" = "flinkTestGroup",    "scan.startup.mode" = "latest-offset",    "format" = "json");CREATE TABLE readRecord (    guid STRING,    book_name STRING) WITH (    "connector" = "kafka",    "topic" = "read_record_log_test",    "properties.bootstrap.servers" = "xxxxxxxxxxxxxxxxxxx",    "properties.group.id" = "flinkTestGroup",    "scan.startup.mode" = "latest-offset",    "format" = "json");CREATE TABLE sink_table (    guid STRING,    book_name STRING) WITH (  "connector" = "print");INSERT INTO sink_tableSELECT    matchResult.guid,    readRecord.book_nameFROM matchResultFULL JOIN readRecord ON  matchResult.guid = readRecord.guid;

输出结果解析:

+I[111, null]         -- L 流数据达到,没有 Join 到 R 流数据,便输出 +I[L, null]  +I[null, book2]    -- R 流数据达到,没有 Join 到 R 流数据,便输出 +I[null, R]-D[null, book2]    -- L 流新数据到达,发现之前 R 流之前输出过没有 Join 到的数据,则发起回撤流,先输出 -D[null, R]+I[222, book2]    -- 再输出 +I[L, R]-D[111, null]        -- 反之同理+I[111, book1]
TTL 概念

在 Regular Join 时 Flink 会将两条没有时间窗口限制的流的所有数据存储在 State 中,由于流是无穷无尽持续流入的,随着时间的不断推进,内存中积累的状态会越来越多。

针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。这样,就可以确保每个状态都能得到及时的清理,可以通过table.exec.state.ttl 参数进行控制(注意:这同时也会对结果的准确性有所影响,因此需要合理的权衡)。

关键词: 数据到达 数据存储 会越来越 这个问题

相关新闻

Copyright 2015-2020   三好网  版权所有