FlinkSQL窗口实例分析

news/2024/7/24 13:02:07 标签: flink, 大数据

Windowing TVFs

Windowing table-valued functions (Windowing TVFs),即窗口表值函数
注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区,即存在:group by window_start,window_end

  • TUMBLE函数采用三个必需参数,一个可选参数:

    TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到滚动窗口。
    size:是指定翻滚窗口宽度的持续时间。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

  • HOP采用 4 个必需参数和 1 个可选参数:

    HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
    slide:是指定连续跳跃窗口开始之间的持续时间的持续时间
    size:是指定跳跃窗口宽度的持续时间。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

  • CUMULATE采用 4 个必需参数和 1 个可选参数:

    CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
    step:是指定连续累积窗口末尾之间增加的窗口大小的持续时间。
    size:是指定累积窗口最大宽度的持续时间。size必须是 的整数倍step。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

滚动窗口

CREATE TABLE kafka_table(
        mid bigint,
        db string,
        sch string,
        tab string,
        opt string,
        ts bigint,
        ddl string,
        err string,
        src map < string, string >,
        cur map < string, string >,
        cus map < string, string >,
        event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
        WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't0',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

create view tmp as
select
            COALESCE(cur['group_name'], src['group_name']) group_name,
            COALESCE(cur['batch_number'], src['batch_number']) batch_number,
            event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区

select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

滑动窗口

CREATE TABLE kafka_table(
        mid bigint,
        db string,
        sch string,
        tab string,
        opt string,
        ts bigint,
        ddl string,
        err string,
        src map < string, string >,
        cur map < string, string >,
        cus map < string, string >,
        event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
        WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't0',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

create view tmp as
select
            COALESCE(cur['group_name'], src['group_name']) group_name,
            COALESCE(cur['batch_number'], src['batch_number']) batch_number,
            event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区

select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(HOP(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '60' SECOND,INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

累计窗口

CREATE TABLE kafka_table(
        mid bigint,
        db string,
        sch string,
        tab string,
        opt string,
        ts bigint,
        ddl string,
        err string,
        src map < string, string >,
        cur map < string, string >,
        cus map < string, string >,
        event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
        WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't0',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

create view tmp as
select
            COALESCE(cur['group_name'], src['group_name']) group_name,
            COALESCE(cur['batch_number'], src['batch_number']) batch_number,
            event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区

select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '1' HOUR,INTERVAL '24' HOURS)) --从零点开始累计
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '60' SECOND,INTERVAL '10' MINUTES))
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '1' MINUTE,INTERVAL '1' HOURS))
group by window_start,window_end,window_time,group_name

窗口聚合-多维分析

CREATE TABLE kafka_table(
        mid bigint,
        db string,
        sch string,
        tab string,
        opt string,
        ts bigint,
        ddl string,
        err string,
        src map < string, string >,
        cur map < string, string >,
        cus map < string, string >,
        event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
        WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't0',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

create view tmp as
select
            COALESCE(cur['group_name'], src['group_name']) group_name,
            COALESCE(cur['batch_number'], src['batch_number']) batch_number,
            event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区


--实例1:整体聚合
select window_start,window_end,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end

--实例2:根据字段聚合,n个维度
select window_start,window_end,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,group_name

--实例3:多维分析GROUPING SETS
select window_start,window_end,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,GROUPING SETS((group_name)) --等同于 实例2
group by window_start,window_end,GROUPING SETS((group_name), ()) --等同于 实例1 union all 实例2


--实例4:多维分析GROUPING SETS,多个字段
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,GROUPING SETS((group_name,batch_number),(group_name),(batch_number),())

--实例5:多维分析CUBE 2^n个维度
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,CUBE(group_name) --等同于group by window_start,window_end,GROUPING SETS((group_name), ())
group by window_start,window_end,CUBE(group_name,batch_number) --等同于实例4

--实例6:多维分析ROLLUP  n+1个维度
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,ROLLUP(group_name) --等同于 实例1 union all 实例2
group by window_start,window_end,ROLLUP(group_name,batch_number) --等同于GROUPING SETS((group_name,batch_number),(group_name),())

窗口topN

Window Top-N 语句的语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]
CREATE TABLE kafka_table(
        mid bigint,
        db string,
        sch string,
        tab string,
        opt string,
        ts bigint,
        ddl string,
        err string,
        src map < string, string >,
        cur map < string, string >,
        cus map < string, string >,
        event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
        WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't0',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

create view tmp as
select
            COALESCE(cur['group_name'], src['group_name']) group_name,
            COALESCE(cur['batch_number'], src['batch_number']) batch_number,
            event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区


--方式1:窗口 Top-N 紧随窗口聚合之后
create view tmp_window as
select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '24' HOURS))
group by window_start,window_end,window_time,group_name;

--计算每个翻滚 24小时窗口内pv最高的前 3 名机构(即每天PV最高的前三名)
select * from
    (
    select * ,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY cnt DESC) as rn
    from tmp_window
    ) t
where rn <=3

--计算每个机构pv最高的前 3天
select * from
    (
    select * ,ROW_NUMBER() OVER (PARTITION BY group_name ORDER BY cnt DESC) as rn
    from tmp_window
    ) t
where rn <=3

--方式2:窗口 Top-N 紧随窗口 TVF 之后
select *
from
    (
    select
    window_start
    ,window_end
    ,window_time
    ,group_name
    ,ts
    ,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY ts DESC) AS rn
    from TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '24' HOURS))
    )
where rn <=3

窗口去重

Flink使用去重的方式,就像Window Top-N查询ROW_NUMBER()的方式一样。理论上,
窗口重复数据删除是窗口 Top-N 的一种特殊情况,其中 N 为 1,并且按处理时间或事件时间排序
Window Deduplication 语句的语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]
CREATE TABLE kafka_table(
        mid bigint,
        db string,
        sch string,
        tab string,
        opt string,
        ts bigint,
        ddl string,
        err string,
        src map < string, string >,
        cur map < string, string >,
        cus map < string, string >,
        group_name as COALESCE(cur['group_name'], src['group_name']),
        batch_number as COALESCE(cur['batch_number'], src['batch_number']),
        event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
        WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't0',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

select *
from
    (
    select
    window_start
    ,window_end
    ,group_name
    ,event_time
    ,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY event_time DESC) AS rn
    from TABLE(TUMBLE(TABLE kafka_table, DESCRIPTOR(event_time), INTERVAL '24' HOURS))
    )
where rn =1

http://www.niftyadmin.cn/n/5291726.html

相关文章

爬虫工作量由小到大的思维转变---<第三十三章 Scrapy Redis 23年8月5日后会遇到的bug)>

前言: 收到回复评论说,按照我之前文章写的: 爬虫工作量由小到大的思维转变---&#xff1c;第三十一章 Scrapy Redis 初启动/conn说明书)&#xff1e;-CSDN博客 在启动scrapy-redis后,往redis丢入url网址的时候遇到: TypeError: ExecutionEngine.crawl() got an unexpected …

[BUG] Hadoop-3.3.4集群yarn管理页面子队列不显示任务

1.问题描述 使用yarn调度任务时&#xff0c;在CapacityScheduler页面上单击叶队列&#xff08;或子队列&#xff09;时&#xff0c;不会显示应用程序任务信息&#xff0c;root队列可以显示任务。此外&#xff0c;FairScheduler页面是正常的。 No matching records found2.原…

如果xm_bmgys的长度大于3就每行五列,否则每行两列

如果需要根据xm_bmgys的长度动态调整表格的列数&#xff0c;可以使用Freemarker的条件判断语句进行处理。 下面是一个更新后的示例代码&#xff1a; <table><#if xm_bmgys?size > 3> <!-- 如果长度大于3&#xff0c;每行五列 --><#list xm_bmgys a…

听GPT 讲Rust源代码--src/tools(29)

File: rust/src/tools/clippy/clippy_lints/src/unused_peekable.rs 在Rust源代码中&#xff0c;rust/src/tools/clippy/clippy_lints/src/unused_peekable.rs这个文件是Clippy工具中一个特定的Lint规则的实现文件&#xff0c;用于检测未使用的Peekable迭代器。 Peekable迭代器…

[OCR]Python 3 下的文字识别CnOCR

目录 1 CnOCR 2 安装 3 实践 1 CnOCR CnOCR 是 Python 3 下的文字识别&#xff08;Optical Character Recognition&#xff0c;简称OCR&#xff09;工具包。 工具包支持简体中文、繁体中文&#xff08;部分模型&#xff09;、英文和数字的常见字符识别&#xff0c;支持竖…

仪表盘、数据分析新增分享功能及应用服务下新增服务实例菜单

近期&#xff0c;博睿数据根据一体化智能可观测平台 Bonree ONE 产品本身&#xff0c;以及用户反馈进行持续的更新和优化。以下为 Bonree ONE 产品功能更新报告第03期内容&#xff0c;更多探索&#xff0c;未完待续。 本次迭代的更新集中在平台的仪表盘、数据分析新增分享功能&…

JWT使用HS512算法生成全局服务token原理

JWT使用HS512算法生成全局服务token原理 JWT使用HS512算法生成全局服务token原理 JWT使用HS512算法生成全局服务token原理前言一、jwt生成token原理1、 jwt的头部承载两部分信息2、Payload数据3、signature4、 签名的目的 二、 JWT的HS512算法生成token的C语言实现 总结 前言 …

2022年全国职业院校技能大赛(高职组)“云计算”赛项赛卷①第一场次:私有云

2022年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算”赛项赛卷1 第一场次&#xff1a;私有云&#xff08;30分&#xff09; 目录 2022年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算”赛项赛卷1 第一场次&#xff1a;私有云&#xff0…