flink sql 不支持 动态 拼接的临时表。
比如
select * from kafka_event_msg
left join (
select appid form A
union
select appid form B
)
会报错:
org.apache.flink.table.api.TableException: Table sink 'vvp.default.bi_kafka_sink' doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[(appid = appid0)], select=[appid, client_ip, data_object, data_source, receive_time, appid0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
所以需要在MySQL那边就建立一张视图:
create view app_view as
select appid form A
union
select appid form B
Flink SQL 从kafka写入到Apache doris,出现事件重复
我司是搞游戏的,每个游戏一个事件表(事件时间,按天分区),入库的时候,偶尔出现重复事件,dw_time不一样的。也就是说 事件一样,写入时间不一样。
排查后发现,是因为 Apache doris 事件表的分区只创建到了2050年。但是 用户通过修改系统时间,把时间修改成了2099年,然后事件入库的时候,找不到分区,Flink 就会报错,进而 频繁的重启。而我们的Flink SQL的策略 不是 精确一次,所以就会重复插入。
总结:
1, H游戏 产生的2038年的数据,因为StarRock没有创建该分区(G游戏的是2099年的分区都创好了),导致flink SQL写入StarRock 异常,导致flink 崩溃重启
2,flink 使用的是至少一次(重启时间较短,但是有概率重复消费),而不是精确一次(重启需要较长时间,不会重复消费),导致flink集群 重复消费了事件
后续结局方法:
如果kafka的消费模式设置 精确一次,固然能解决问题,但是写入数据会大大降低,不符合实时数仓的初衷。所以我们是手动过滤掉比当前时间差别太大的事件。然后在业务需求 根据offset进行去重(kafka有offset,把offset写入到事件表中)