MySQL Flink Watermark實(shí)現(xiàn)事件時(shí)間處理的關(guān)鍵技術(shù)
目錄
- 1.概述
- 2.SQL案例-演示W(wǎng)atermark為零的情況
- 3.SQL案例-演示W(wǎng)atermark不為零的情況
1.概述
生活中有種場景:
車輛進(jìn)入隧道,信號(hào)不好,出了隧道后,信號(hào)就正常了。
正常情況下,車輛進(jìn)入隧道后,如果車輛正常,沒有事故,會(huì)正常駛出隧道。
在正常的隧道行駛過程中,可能會(huì)因?yàn)樾盘?hào)的原因,導(dǎo)致數(shù)據(jù)沒有像信號(hào)正常的時(shí)候那么快到達(dá)。
也就是說,這種情況下,數(shù)據(jù)出現(xiàn)了延遲。我們把這種延遲數(shù)據(jù)稱之為遲到數(shù)據(jù)。
生活中,這種場景非常多,比如:車輛進(jìn)入地下車庫,手機(jī)欠費(fèi),網(wǎng)絡(luò)抖動(dòng)等。這都屬于生活的正常情況。無法避免。
程序中,一般不會(huì)允許數(shù)據(jù)丟失。所以,我們程序會(huì)推出一些機(jī)制來保證遲到數(shù)據(jù)被正常處理。
Watermark就是用來保證正常遲到的數(shù)據(jù)被正確的處理。
Watermark,也叫水印,或者是水位線。用來處理一定程度下的延遲數(shù)據(jù)。
2.SQL案例-演示W(wǎng)atermark為零的情況
#1.創(chuàng)建表 CREATE TABLE source_table ( user_id STRING, price BIGINT, `timestamp` bigint, row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)), watermark for row_time as row_time - interval "0" second ) WITH ( "connector" = "socket", "hostname" = "node1", "port" = "9999", "format" = "csv" ); #2.數(shù)據(jù)查詢SQL select user_id, count(*) as pv, sum(price) as sum_price, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval "5" second) AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval "5" second) AS STRING)) * 1000 as window_end from source_table group by user_id, tumble(row_time, interval "5" second);
3.SQL案例-演示W(wǎng)atermark不為零的情況
Watermark不為零,就有可能是兩種情況:
- 小于0,窗口會(huì)提前觸發(fā)計(jì)算,這種情況在實(shí)際應(yīng)用不存在,所以這里也不討論
- 大于0,窗口會(huì)延遲觸發(fā)計(jì)算,延遲的時(shí)間就是我們?cè)O(shè)置的Watermark的值
這里,我們主要是討論Watermark>0的情況。
#1.創(chuàng)建表 CREATE TABLE source_table ( user_id STRING, price BIGINT, `timestamp` bigint, row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)), watermark for row_time as row_time - interval "2" second ) WITH ( "connector" = "socket", "hostname" = "node1", "port" = "9999", "format" = "csv" ); #2.Watermark的解釋 WATERMARK FOR ts AS ts - INTERVAL "2" SECOND 這里的2,表示,數(shù)據(jù)允許延遲2秒鐘到達(dá),窗口會(huì)在(正常結(jié)束+延遲時(shí)間)后觸發(fā)計(jì)算 #3.查詢SQL select user_id, count(*) as pv, sum(price) as sum_price, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval "5" second) AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval "5" second) AS STRING)) * 1000 as window_end from source_table group by user_id, tumble(row_time, interval "5" second);
到此這篇關(guān)于MySQL Flink Watermark實(shí)現(xiàn)事件時(shí)間處理的關(guān)鍵技術(shù)的文章就介紹到這了,更多相關(guān)MySQL Flink Watermark內(nèi)容請(qǐng)搜索以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持!
