Flink 在分組視窗中怎么使用row_number,場景為流處理
以下sql,報錯了,根據報錯我去除了row_number和left join 改為join后確實沒問題了。
請問row_number在Fllink中的流處理中只能用于Top-N的場景嗎?
菜鳥求解
INSERT INTO sink_print
select
TUMBLE_END( proctime,INTERVAL '1' Day) as pt,
to_date(t.action_time,'yyyy-MM-dd') as action_date,
t.source as source,
t.countrycode as countrycode,
t.hmsr as hmsr,
count(distinct t.uid) as metric_value,
1002 as metric_id,
-1 as hours,
t.site
from
(
select t.action_time,t.uid,t.cid,t.hmsr,t.countrycode,t.source,t.site
,ROW_NUMBER() over(partition by t.cid order by TO_TIMESTAMP(t.action_time,'yyyyMMddHHmmss') desc) rk,t1.join_req_dt,t.proctime
from
dwr_app_log t
left join dwr_tb_ms_thrd_cust t1
on t.uid = t1.cust_id
where
t.uid is not null
and t.site is not null
and to_date(t.action_time,'yyyy-MM-dd') = to_date(cast(t1.join_req_dt as string),'yyyy-MM-dd')
and t.platform = 'GSHOPPER'
) t
where t.rk = 1
group by
TUMBLE(proctime, INTERVAL '1' Day),to_date(t.action_time,'yyyy-MM-dd'),t.hmsr,t.countrycode,t.source,t.site
報錯為:
Exception in thread "main" org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[cid], orderBy=[$9 DESC], select=[cid, action_time, source, countrycode, hmsr, site, uid, proctime, $9])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:355)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:207)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:314)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:303)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:302)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/207261.html
標籤:Java EE
