首頁(yè)技術(shù)文章正文

Flink進(jìn)階之CEP復(fù)雜事件處理

更新時(shí)間:2019-10-16 來(lái)源:黑馬程序員 瀏覽量:

1、簡(jiǎn)介

Flink CEP是在flink之上實(shí)現(xiàn)的復(fù)雜事件處理(CEP)庫(kù),它允許我們?cè)谑录髦袡z測(cè)事件的模式,讓我們有機(jī)會(huì)掌握數(shù)據(jù)中重要的事項(xiàng)。

本文章主要是介紹了flink cep中可用的api調(diào)用,首先介紹Pattern API,它允許你指定要在事件流中檢測(cè)的模式,并介紹匹配事件并對(duì)其進(jìn)行操作。最后分析下CEP庫(kù)在處理事件時(shí)間延遲問(wèn)題。【推薦了解大數(shù)據(jù)培訓(xùn)課程

1571208841512_CEP復(fù)雜事件處理.jpg

2、使用步驟

(1)首先我們需要引入cep的依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.11</artifactId>
    <version>1.5.0</version>
</dependency>

(2)確定equals()和hashcode()方法

如果使用CEP,需要我們?cè)赿atastream中的事件實(shí)現(xiàn)正確的equals()和hashcode()方法,因?yàn)镕link CEP使用他們來(lái)比較和匹配事件。

簡(jiǎn)單demo代碼:

al input: DataStream[Event] = ...
val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(input, pattern)
val result: DataStream[Alert] = patternStream.select(createAlert(_))


3、Pattern API

Pattern API允許你定義要從輸入流中提取的復(fù)雜模式序列。

每個(gè)復(fù)雜模式序列都是由多個(gè)簡(jiǎn)單模式組成,簡(jiǎn)單模式就是尋找具有相同屬性的單個(gè)事件的模式,我們可以先定義一些簡(jiǎn)單的模式,然后組合成復(fù)雜的序列模式。

可以將模式序列視為此類模式的結(jié)構(gòu)圖,基于用戶指定的條件從一個(gè)模式轉(zhuǎn)換到下一個(gè)模式,例如:

event.getName().equals(“start”).


匹配的是一系列輸入事件,通過(guò)一系列有效的模式轉(zhuǎn)換訪問(wèn)復(fù)雜模式圖中的所有模式。注意每個(gè)模式必須具有唯一的名稱,以便后續(xù)可以使用該名稱來(lái)標(biāo)識(shí)匹配的事件。模式名稱中不能包含字符”:”。

下面我們首先介紹如何定義單個(gè)模式,然后再將各個(gè)模式組合到復(fù)雜模式中。


單個(gè)模式

Pattern可以是單個(gè),也可以是循環(huán)模式,單個(gè)模式接收單個(gè)事件,而循環(huán)模式可以接收多個(gè)事件,在模式匹配符號(hào)中,模式“a b + c?d”(或“a”,后跟一個(gè)或多個(gè)“b”,可選地后跟“c”,后跟“d”),a,c ?,和d是單例模式,而b +是循環(huán)模式。

默認(rèn)情況下,模式是單個(gè)模式,可以使用Quantifiers將其轉(zhuǎn)換為循環(huán)模式。每個(gè)模式可以有一個(gè)或多個(gè)條件,基于它接收的事件。

Quantifiers

在FlinkCEP中,可以使用以下方法指定循環(huán)模式:pattern.oneOrMore(),用于期望一個(gè)或多個(gè)事件發(fā)生的模式(例如之前提到的b+);用于期望給定類型事件的特定出現(xiàn)次數(shù)的模式,對(duì)于名為start的模式,以下是有效的Quantifiers:

// expecting 4 occurrences
 start.times(4);
 // expecting 0 or 4 occurrences
 start.times(4).optional();
 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);
 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();
 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();
 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();
 // expecting 1 or more occurrences
 start.oneOrMore();
 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();
 // expecting 0 or more occurrences
 start.oneOrMore().optional();
 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();
 // expecting 2 or more occurrences
 start.timesOrMore(2);
 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();
 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy();


Conditions條件

每個(gè)模式中,從一個(gè)模式轉(zhuǎn)到下一個(gè)模式,可以指定其他條件,我們可以使用下面這些條件:

1.傳入事件的屬性,例如其值應(yīng)大于5,或者大于先前接收的事件的平均值;

2.匹配事件的連續(xù)性,例如檢測(cè)模式a,b,c序列中不能有任何非匹配事件。

Conditions on Properties關(guān)于屬性的條件

可以通過(guò)pattern.where(),pattern.or()或pattern.until()方法指定事件屬性的條件,條件可以是iterativeConditions或SimpleConditions。

1.迭代條件

這是最常見(jiàn)的條件類型,你可以指定一個(gè)條件,該條件基于先前接收的事件的屬性或器子集的統(tǒng)計(jì)信息來(lái)接收后續(xù)事件。

下面代碼說(shuō)的是:如果名稱以”foo”開(kāi)頭同時(shí)如果該模式的先前接收的事件的價(jià)格總和加上當(dāng)前事件的價(jià)格不超過(guò)該值5.0,則迭代條件接收名為”middle”的模式的下一個(gè)事件:迭代條件可以很強(qiáng)大,尤其是與循環(huán)模式相結(jié)合,例如:oneOrMore();

middle.oneOrMore()
  .subtype(classOf[SubEvent])
  .where(
     (value, ctx) => {
        lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
        value.getName.startsWith("foo") && sum + value.getPrice < 5.0
     }
   )


注意對(duì)context.getEventsForPattern()的調(diào)用將為給定潛在匹配項(xiàng)查找所有先前接收的事件,此操作代價(jià)可能會(huì)變化巨大,因此應(yīng)盡量減少其使用。

2.簡(jiǎn)單條件

這種類型的條件時(shí)擴(kuò)展了前面提到的IterativeCondition類,并且僅根據(jù)事件本身的屬性決定是否接收事件:

start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }});


此外還可以通過(guò)pattern.subtype(subclass)方法將接收事件的類型限定為初始事件類型的子類型:

start.where(event => event.getName.startsWith("foo"))


組合條件:

如上所示,可以將子類型條件與其他條件組合使用,這適用于所有條件。我們可以通過(guò)順序調(diào)用where()來(lái)任意組合條件。最終結(jié)果將是各個(gè)條件的結(jié)果的邏輯and,要使用or組合條件,可以使用or()方法,如下所示:

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)


停止條件

在循環(huán)模式(oneOrMore()和oneOrMore().optional())的情況下,還可以指定停止條件,例如:接收值大于5的事件,直到其值的總和小于50。

我們看個(gè)例子來(lái)更好的理解:

給定模式:(a+ until b),b之前,要出現(xiàn)一個(gè)或者多個(gè)a,

給定輸入的序列:a1,c,a2,b,a3

輸出結(jié)果:{a1 a2}{a1}{a2}{a3}

我們可以看到{a1,a2,a3},{a2,a3}兩個(gè)并沒(méi)有輸出,這就是停止條件的作用。


連續(xù)事件的條件

Flink CEP支持事件之間以一下形式連續(xù):

嚴(yán)格連續(xù)性:希望所有匹配事件一個(gè)接一個(gè)的出現(xiàn),中間沒(méi)有任何不匹配的事件;

寬松連續(xù)性:忽略匹配的事件之間出現(xiàn)不匹配事件,不能忽略兩個(gè)事件之間的匹配事件。

非確定性輕松連續(xù)性:進(jìn)一步放寬連續(xù)性,允許忽略某些匹配事件的其它匹配。

為了解釋上面的內(nèi)容,我們舉個(gè)例子。假如有個(gè)模式序列"a+ b",輸入序列"a1,c,a2,b",不同連續(xù)條件下有不同的區(qū)別:

嚴(yán)格連續(xù)性:{a2 b} - 由于c的存在導(dǎo)致a1被廢棄

寬松連續(xù)性:{a1,b}和{a1 a2 b} - c被忽略

非確定性寬松連續(xù)性:{a1 b}, {a2 b}, 和 {a1 a2 b}

對(duì)于循環(huán)模式(例如oneOrMore()和times()),默認(rèn)是寬松的連續(xù)性。 如果你想要嚴(yán)格的連續(xù)性,你必須使用consecutive()顯式指定它, 如果你想要非確定性的松弛連續(xù)性,你可以使用allowCombinations()方法。



組合模式

簡(jiǎn)介

已經(jīng)了解了單個(gè)模式的樣子,現(xiàn)在是時(shí)候看看如何將它們組合成一個(gè)完整的模式序列。

模式序列必須以初始模式開(kāi)始,如下所示:

Patternstart = Pattern.begin("start");


接下來(lái),您可以通過(guò)指定它們之間所需的連續(xù)條件,為模式序列添加更多模式。 在上一節(jié)中,我們描述了Flink支持的不同鄰接模式,即嚴(yán)格,寬松和非確定性寬松,以及如何在循環(huán)模式中應(yīng)用它們。 要在連續(xù)模式之間應(yīng)用它們,可以使用:

next() 對(duì)應(yīng)嚴(yán)格, followedBy() 對(duì)應(yīng)寬松連續(xù)性 followedByAny() 對(duì)應(yīng)非確定性寬松連續(xù)性亦或

notNext() 如果不希望一個(gè)事件類型緊接著另一個(gè)類型出現(xiàn)。 notFollowedBy() 不希望兩個(gè)事件之間任何地方出現(xiàn)該事件。 注意 模式序列不能以notFollowedBy()結(jié)束。 注意 NOT模式前面不能有可選模式。

// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// NOT pattern with strict contiguity
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// NOT pattern with relaxed contiguity
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);


寬松連續(xù)性指的是僅第一個(gè)成功匹配的事件會(huì)被匹配到,然而非確定性寬松連續(xù)性,相同的開(kāi)始會(huì)有多個(gè)匹配結(jié)果發(fā)出。距離,如果一個(gè)模式是"a b",給定輸入序列是"a c b1 b2"。對(duì)于不同連續(xù)性會(huì)有不同輸出。

a和b之間嚴(yán)格連續(xù)性,將會(huì)返回{},也即是沒(méi)有匹配。因?yàn)閏的出現(xiàn)導(dǎo)致a,拋棄了。

a和b之間寬松連續(xù)性,返回的是{a,b1},因?yàn)閷捤蛇B續(xù)性將會(huì)拋棄為匹配成功的元素,直至匹配到下一個(gè)要匹配的事件。

a和b之間非確定性寬松連續(xù)性,返回的是{a,b1},{a,b2}。

也可以為模式定義時(shí)間約束。 例如,可以通過(guò)pattern.within()方法定義模式應(yīng)在10秒內(nèi)發(fā)生。 時(shí)間模式支持處理時(shí)間和事件時(shí)間。 注意模式序列只能有一個(gè)時(shí)間約束。 如果在不同的單獨(dú)模式上定義了多個(gè)這樣的約束,則應(yīng)用最小的約束。

next.within(Time.seconds(10));

可以為begin,followBy,followByAny和next定義一個(gè)模式序列作為條件。模式序列將被邏輯地視為匹配條件,而且將返回GroupPattern并且 可對(duì)GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。


分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!