快消息!火山引擎在行为分析场景下的 ClickHouse JOIN 优化
2022-09-06 15:47:10来源:字节跳动技术团队
火山引擎增长分析 DataFinder 基于 ClickHouse 来进行行为日志的分析,ClickHouse 的主要版本是基于社区版改进开发的字节内部版本。主要的表结构:
(资料图)
事件表:存储用户行为数据,以用户ID分 shard 存储。
--列出了主要的字段信息 CREATE TABLE tob_apps_all ( `tea_app_id` UInt32, --应用ID `device_id` String DEFAULT "", --设备ID `time` UInt64,--事件日志接受时间 `event` String,--事件名称 `user_unique_id` String,--用户ID `event_date` Date , --事件日志日期,由time转换而来 `hash_uid` UInt64 --用户ID hash过后的id,用来join降低内存消耗 )│ ```
用户表:存储用户的属性数据,以用户ID分 shard 存储。
--列出了主要的字段信息CREATE TABLE users_unique_all( `tea_app_id` UInt32, --应用ID `user_unique_id` String DEFAULT "", -- 用户ID `device_id` String DEFAULT "", -- 用户最近的设备ID `hash_uid` UInt64,--用户ID hash过后的id,用来join降低内存消耗 `update_time` UInt64,--最近一次更新时间 `last_active_date` Date --用户最后活跃日期)
设备表:存储设备相关的数据,以设备ID分 shard 存储。
--列出了主要的字段信息CREATE TABLE devices_all( `tea_app_id` UInt32, --应用ID `device_id` String DEFAULT "", --设备ID `update_time` UInt64, --最近一次更新时间 `last_active_date` Date --用户最后活跃日期)
业务对象表:存储业务对象相关的数据,每个 shard存储全量的数据。
--列出了主要的字段信息CREATE TABLE rangers.items_all( `tea_app_id` UInt32, `hash_item_id` Int64, `item_name` String, --业务对象名称。比如商品 `item_id` String, --业务对象ID。比如商品id 1000001 `last_active_date` Date)1.1 业务挑战
随着接入应用以及应用的 DAU 日益增加,ClickHouse 表的事件量增长迅速;并且基于行为数据需要分析的业务指标越来越复杂,需要 JOIN 的表增多;我们遇到有一些涉及到 JOIN 的复杂 SQL 执行效率低,内存和 CPU 资源占用高,导致分析接口响应时延和错误率增加。
2. 关于 Clickhouse 的 JOIN在介绍优化之前,先介绍一下基本的 ClickHouse JOIN 的类型和实现方式。
2.1 分布式 JOINSELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06")) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = "app_launch") AND (event_date = "2022-08-06")
基本执行过程:
一个 Clickhouse 节点作为 Coordinator 节点,给每个节点分发子查询,子查询 sql(tob_apps_all 替换成本地表,users_unique_all保持不变依然是分布式表)。
每个节点执行 Coordinator 分发的 sql 时,发现users_unique_all是分布式表,就会去所有节点上去查询以下 SQL(一共有 N*N。N 为 shard 数量)。
SELECT device_id, hash_uid FROMusers_uniqueWHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06")每个节点从其他 N-1个节点拉取2中子查询的全部数据,全量存储(内存 or 文件),进行本地 JOIN。Coordinator 节点从每个节点拉取3中的结果集,然后做处理返回给 client。存在的问题:
子查询数量放大。每个节点都全量存储全量的数据。2.2 分布式 Global JOINSELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et GLOBAL ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06")) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = "app_launch") AND (event_date = "2022-08-06")
基本执行过程:
一个 Clickhouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行sql(tob_apps_all 替换成本地表,右表子查询替换成别名 ut)。Coordinator 节点去其他节点拉取 users_unique_all 的全部数据,然后分发到全部节点(作为1中别名表 ut 的数据)。每个节点都会存储全量的2中分发的数据(内存or文件),进行本地 local join。Coordinator 节点从每个节点拉取3中的结果集,然后做处理返回给 client。存在的问题:
每个节点都全量存储数据。如果右表较大,分发的数据较大,会占用网络带宽资源。2.3 本地 JOINSQL 里面只有本地表的 JOIN,只会在当前节点执行。
SELECT et.os_name,ut.device_id AS user_device_idFROM tob_apps et any LEFT JOIN (SELECT device_id, hash_uid FROM rangers.users_unique WHERE tea_app_id = 268411 AND last_active_date>="2022-08-06") ut ON et.hash_uid=ut.hash_uidWHERE tea_app_id = 268411 AND event="app_launch" AND event_date="2022-08-06"2.3.1 Hashjoin右表全部数据加载到内存,再在内存构建 hash table。key 为 joinkey。从左表分批读取数据,从右表 hash table匹配数据。优点是:速度快 缺点是:右表数据量大的情况下占用内存。2.3.2 Mergejoin对右表排序,内部 block 切分,超出内存部分 flush 到磁盘上,内存大小通过参数设定。左表基于 block 排序,按照每个 block 依次与右表 merge。优点是:能有效控制内存 缺点是:大数据情况下速度会慢。
优先使用 hash join 当内存达到一定阈值后再使用 merge join,优先满足性能要求。
3. 解决方案3.1 避免JOIN3.1.1 数据预生成数据预生成(由 Spark/Flink 或者 Clickhouse 物化视图产出数据),形成大宽表,基于单表的查询是 ClickHouse 最为擅长的场景。
我们有个指标,实现的 SQL 比较复杂(如下),每次实时查询很耗时,我们单独建了一个表 table,由 Spark 每日构建出这个指标,查询时直接基于 table 查询。
SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......FROM (SELECT event_date,hash_uid AS uc1,sum(et.float_params{ "amount" }) AS value, count(1) AS cnt, value*cnt AS multiple FROM tob_apps_all et GLOBAL ANY LEFT JOIN(SELECT hash_uid AS join_key,int_profiles{ "$ab_time_34" }*1000 AS first_time FROM users_unique_all WHERE app_id = 10000000 AND last_active_date >= "2022-07-19" AND first_time is NOT null) upt ON et.hash_uid=upt.join_key WHERE (查询条件) GROUP BY uc1,event_date)GROUP BY event_date;
数据量2300W,查询时间由7秒->0.008秒。当然这种方式,需要维护额外的数据构建任务。总的思路就是不要让 ClickHouse 实时去 JOIN。
3.1.2 使用 IN 代替 JOINJOIN 需要基于内存构建 hash table 且需要存储右表全部的数据,然后再去匹配左表的数据。而 IN 查询会对右表的全部数据构建 hash set,但是不需要匹配左表的数据,且不需要回写数据到 block。
比如:
SELECT event_date, count()FROM tob_apps_all et global any INNER JOIN (SELECT hash_uid AS join_key FROM users_unique_all WHERE app_id = 10000000 AND last_active_date >= "2022-01-01") upt ON et.hash_uid = upt.join_keyWHERE app_id = 10000000 AND event_date >= "2022-01-01" AND event_date <= "2022-08-02"GROUP BY event_date
可以改成如下形式:
SELECT event_date, count()FROM tob_apps_allWHERE app_id = 10000000 AND event_date >= "2022-01-01" AND event_date <= "2022-08-02" AND hash_uid global IN (SELECT hash_uid FROM users_unique_all WHERE (tea_app_id = 10000000) AND (last_active_date >= "2022-01-01") ) GROUP BY event_date
如果需要从右表提取出属性到外层进行计算,则不能使用 IN 来代替 JOIN。
相同的条件下,上面的测试 SQL,由 JOIN 时的16秒优化到了IN查询时的11秒。
3.2 更快的 JOIN3.2.1 优先本地 JOIN数据预先相同规则分区也就是 Colocate JOIN。优先将需要关联的表按照相同的规则进行分布,查询时就不需要分布式的 JOIN。
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06")) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = "app_launch") AND (event_date = "2022-08-06")settings distributed_perfect_shard=1
比如事件表 tob_apps_all 和用户表 users_unique_all 都是按照用户 ID 来分 shard 存储的,相同的用户的两个表的数据都在同一个 shard 上,因此这两个表的 JOIN 就不需要分布式 JOIN 了。
distributed_perfect_shard 这个 settings key 是字节内部 ClickHouse 支持的,设置过这个参数,指定执行计划时就不会再执行分布式 JOIN 了。
基本执行过程:
一个 ClickHouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行 sql(tob_apps_all、users_unique_all替换成本地表)。每个节点都执行1中分发的本地表 join 的 SQL(这一步不再分发右表全量的数据)。数据再回传到 coordinator 节点,然后返回给 client。数据冗余存储如果一个表的数据量比较小,可以不分 shard 存储,每个 shard 都存储全量的数据,例如我们的业务对象表。查询时,不需要分布式 JOIN,直接在本地进行 JOIN 即可。
SELECT count()FROM tob_apps_all AS et ANY LEFT JOIN ( SELECT item_id FROM items_all WHERE (tea_app_id = 268411)) AS it ON et.item_id = it.item_idWHERE (tea_app_id = 268411) AND (event = "app_launch") AND (event_date = "2022-08-06")settings distributed_perfect_shard=1
例如这个 SQL,items_all 表每个 shard 都存储同样的数据,这样也可以避免分布式 JOIN 带来的查询放大和全表数据分发问题。
3.2.2 更少的数据不论是分布式 JOIN 还是本地 JOIN,都需要尽量让少的数据参与 JOIN,既能提升查询速度也能减少资源消耗。
SQL 下推ClickHouse 对 SQL 的下推做的不太好,有些复杂的 SQL 下推会失效。因此,我们手动对 SQL 做了下推,目前正在测试基于查询优化器来帮助实现下推优化,以便让 SQL 更加简洁。
下推的 SQL:
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06" AND 用户属性条件 1 OR 用户属性条件 2)) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = "app_launch") AND (event_date = "2022-08-06")settings distributed_perfect_shard=1
对应的不下推的 SQL:
SELECT et.os_name, ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN ( SELECT device_id, hash_uid FROM rangers.users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06")) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = "app_launch") AND (event_date = "2022-08-06")AND (ut.用户属性条件 1 OR ut.用户属性条件 2)settings distributed_perfect_shard=1
可以看到,不下推的 SQL 更加简洁,直接基于 JOIN 过后的宽表进行过滤。但是 ClickHouse 可能会将不满足条件的 users_unique_all 数据也进行 JOIN。
我们使用中有一个复杂的 case,用户表过滤条件不下推有1千万+,SQL 执行了3000秒依然执行超时,而做了下推之后60秒内就执行成功了。
3.2.3 Clickhouse 引擎层优化一个 SQL 实际在 Clickhouse 如何执行,对 SQL 的执行时间和资源消耗至关重要。社区版的 Clickhouse 在执行模型和 SQL 优化器上还要改进的空间,尤其是复杂 SQL 以及多 JOIN 的场景下。
执行模型优化
社区版的 Clickhouse 目前还是一个两阶段执行的执行模型。第一阶段,Coordinator 在收到查询后,将请求发送给对应的 Worker 节点。第二阶段,Worker 节点完成计算,Coordinator 在收到各 Worker 节点的数据后进行汇聚和处理,并将处理后的结果返回。
有以下几个问题:
第二阶段的计算比较复杂时,Coordinator 的节点计算压力大,容易成为瓶颈。不支持 shuffle join,hash join 时右表为大表时构建慢,容易 OOM。对复杂查询的支持不友好。字节跳动 ClickHouse 团队为了解决上述问题,改进了执行模型,参考其他的分布式数据库引擎(例如 Presto 等),将一个复杂的 Query 按数据交换情况切分成多个 Stage,各 Stage 之间则通过 Exchange 完成数据交换。根据 Stage 依赖关系定义拓扑结构,产生 DAG 图,并根据 DAG 图调度 Stage。例如两表JOIN,会先调度左右表读取 Stage,之后再调度JOIN这个 Stage,JOIN的 Stage 依赖于左右表的 Stage。
举个例子
SELECT et.os_name, ut.device_id AS user_device_id, dt.hash_did AS device_hashidFROM tob_apps_all AS et GLOBAL ANY LEFT JOIN ( SELECT device_id, hash_uid FROM users_unique_all WHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06")) AS ut ON et.hash_uid = ut.hash_uidGLOBAL ANY LEFT JOIN ( SELECT device_id, hash_did FROM devices_all WHERE (tea_app_id = 268411) AND (last_active_date >= "2022-08-06")) AS dt ON et.device_id = dt.device_idWHERE (tea_app_id = 268411) AND (event = "app_launch") AND (event_date = "2022-08-06")LIMIT 10
Stage执行模型基本过程(可能的):
读取 tob_apps_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个Stage。读取 users_unique_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个 Stage。上述两个表的数据,在每个节点上的数据进行本地JOIN,然后再按照 join key(device_id) 进行 shuffle。这是一个 Stage。读取 devices_all 数据,按照 join key(device_id)进行 shuffle,这是一个Stage。第3步、第4步的数据,相同 join key(device_id) 的数据都在同一个节点上,然后进行本地JOIN,这是一个 Stage。汇总数据,返回 limit 10 的数据。这是一个 Stage。统计效果如下:
查询优化器
有了上面的 stage 的执行模型,可以灵活调整 SQL 的执行顺序,字节跳动 Clickhouse 团队自研了查询优化器,根据优化规则(基于规则和代价预估)对 SQL 的执行计划进行转换,一个执行计划经过优化规则后会变成另外一个执行计划,能够准确的选择出一条效率最高的执行路径,然后构建 Stage 的 DAG 图,大幅度降低查询时间。
下图描述了整个查询的执行流程,从 SQL parse 到执行期间所有内容全部进行了重新实现(其中紫色模块),构建了一套完整的且规范的查询优化器。
还是上面的三表 JOIN 的例子,可能的一个执行过程是:
查询优化器发现 users_unique_all 表与 tob_apps_all 表的分 shard 规则一样(基于用户 ID ),所以就不会先对表按 join key 进行 shuffle,users_unique 与 tob_apps 直接基于本地表 JOIN,然后再按照 join key(device_id)进行 shuffle。这是一个 Stage。查询优化器根据规则或者代价预估决定设备表 devices_all 是需要 broadcast join 还是 shuffle join。如果 broadcast join:在一个节点查到全部的 device 数据,然后分发到其他节点。这是一个 Stage。如果 shuffle join:在每个节点对 device 数据按照 join key(device_id) 进行 shuffle。这是一个 Stage。汇总数据,返回 limit 10 的数据。这是一个 Stage。效果:
可以看到,查询优化器能优化典型的复杂的 SQL 的执行效率,缩短执行时间。
4. 总结ClickHouse 最为擅长的领域是一个大宽表来进行查询,多表 JOIN 时Clickhouse 性能表现不佳。作为业内领先的用户分析与运营平台,火山引擎增长分析 DataFinder 基于海量数据做到了复杂指标能够秒级查询。本文介绍了我们是如何优化 Clickhouse JOIN 查询的。
主要有以下几个方面:
减少参与 JOIN 的表以及数据量。优先使用本地 JOIN,避免分布式 JOIN 带来的性能损耗。优化本地 JOIN,优先使用内存进行 JOIN。优化分布式 JOIN 的执行逻辑,依托于字节跳动对 ClickHouse 的深度定制化。