FlinkCEP - Flink的复杂事件处理

FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。它允许您在无休止的事件流中检测事件模式,让您有机会掌握数据中重要的事项。

此页面描述了Flink CEP中可用的API调用。我们首先介绍Pattern API,它允许您指定要在流中检测的模式,然后介绍如何检测匹配事件序列并对其进行 算子操作然后,我们将介绍CEP库在处理事件时间延迟时所做的假设,以及如何将您的工作从较旧的Flink版本迁移到Flink-1.3。

入门

如果要直接进入,请设置Flink程序并将FlinkCEP依赖项添加到pom.xml项目中。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep-scala_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

信息 FlinkCEP不是二进制分发的一部分。在此处了解如何与集群执行相关联

现在,您可以使用Pattern API开始编写第一个CEP程序。

注意在事件DataStream要应用模式匹配,必须实施适当的到equals()hashCode()方法,因为FlinkCEP用它们来比较和匹配的事件。

  1. DataStream<Event> input = ...
  2. Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
  3. new SimpleCondition<Event>() {
  4. @Override
  5. public boolean filter(Event event) {
  6. return event.getId() == 42;
  7. }
  8. }
  9. ).next("middle").subtype(SubEvent.class).where(
  10. new SimpleCondition<SubEvent>() {
  11. @Override
  12. public boolean filter(SubEvent subEvent) {
  13. return subEvent.getVolume() >= 10.0;
  14. }
  15. }
  16. ).followedBy("end").where(
  17. new SimpleCondition<Event>() {
  18. @Override
  19. public boolean filter(Event event) {
  20. return event.getName().equals("end");
  21. }
  22. }
  23. );
  24. PatternStream<Event> patternStream = CEP.pattern(input, pattern);
  25. DataStream<Alert> result = patternStream.select(
  26. new PatternSelectFunction<Event, Alert>() {
  27. @Override
  28. public Alert select(Map<String, List<Event>> pattern) throws Exception {
  29. return createAlertFrom(pattern);
  30. }
  31. }
  32. });
  1. val input: DataStream[Event] = ...
  2. val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  3. .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  4. .followedBy("end").where(_.getName == "end")
  5. val patternStream = CEP.pattern(input, pattern)
  6. val result: DataStream[Alert] = patternStream.select(createAlert(_))

Pattern API

模式API允许您定义要从输入流中提取的复杂模式序列。

每个复杂模式序列由多个简单模式组成,即模式查找具有相同属性的单个事件。从现在开始,我们将调用这些简单的模式模式,以及我们在流中搜索的最终复杂模式序列,即模式序列您可以将模式序列视为此类模式的图形,其中从一个模式到下一个模式的转换基于用户指定的条件发生,例如event.getName().equals("end")一个匹配是访问的复数图案图的所有模式,通过有效模式转换的序列的输入事件的序列。

注意每个模式必须具有唯一的名称,稍后您可以使用该名称来标识匹配的事件。

注意模式名称不能包含该字符":"

在本节的其余部分,我们将首先介绍如何定义单个模式,然后如何将各个模式组合到复杂模式中

个人模式

一个模式可以是或一个循环模式。单例模式接受单个事件,而循环模式可以接受多个事件。在模式匹配的符号,图案"a b+ c? d"(或"a",随后一个或多个 "b"的,任选接着一个"c",接着是"d"), ,ac?d是单模式,同时b+是一个循环的一个。默认情况下,模式是单例模式,您可以使用Quantifiers将其转换为循环模式每个模式可以有一个或多个条件,基于它接受事件。

量词

在FlinkCEP中,您可以使用以下方法指定循环模式:pattern.oneOrMore()对于期望一个或多个事件发生的模式(例如b+前面提到的); 并且pattern.times(#ofTimes),对于期望特定类型事件的特定出现次数的模式,例如4 a; 并且pattern.times(#fromTimes, #toTimes),对于期望特定最小出现次数和给定类型事件的最大出现次数的模式,例如2-4s a

您可以使用该pattern.greedy()方法使循环模式变得贪婪,但您还不能使组模式变得贪婪。您可以使用该pattern.optional()方法创建所有模式,循环与否,可选

对于命名的模式start,以下是有效的量词:

  1. // expecting 4 occurrences
  2. start.times(4);
  3. // expecting 0 or 4 occurrences
  4. start.times(4).optional();
  5. // expecting 2, 3 or 4 occurrences
  6. start.times(2, 4);
  7. // expecting 2, 3 or 4 occurrences and repeating as many as possible
  8. start.times(2, 4).greedy();
  9. // expecting 0, 2, 3 or 4 occurrences
  10. start.times(2, 4).optional();
  11. // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
  12. start.times(2, 4).optional().greedy();
  13. // expecting 1 or more occurrences
  14. start.oneOrMore();
  15. // expecting 1 or more occurrences and repeating as many as possible
  16. start.oneOrMore().greedy();
  17. // expecting 0 or more occurrences
  18. start.oneOrMore().optional();
  19. // expecting 0 or more occurrences and repeating as many as possible
  20. start.oneOrMore().optional().greedy();
  21. // expecting 2 or more occurrences
  22. start.timesOrMore(2);
  23. // expecting 2 or more occurrences and repeating as many as possible
  24. start.timesOrMore(2).greedy();
  25. // expecting 0, 2 or more occurrences and repeating as many as possible
  26. start.timesOrMore(2).optional().greedy();
  1. // expecting 4 occurrences
  2. start.times(4)
  3. // expecting 0 or 4 occurrences
  4. start.times(4).optional()
  5. // expecting 2, 3 or 4 occurrences
  6. start.times(2, 4)
  7. // expecting 2, 3 or 4 occurrences and repeating as many as possible
  8. start.times(2, 4).greedy()
  9. // expecting 0, 2, 3 or 4 occurrences
  10. start.times(2, 4).optional()
  11. // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
  12. start.times(2, 4).optional().greedy()
  13. // expecting 1 or more occurrences
  14. start.oneOrMore()
  15. // expecting 1 or more occurrences and repeating as many as possible
  16. start.oneOrMore().greedy()
  17. // expecting 0 or more occurrences
  18. start.oneOrMore().optional()
  19. // expecting 0 or more occurrences and repeating as many as possible
  20. start.oneOrMore().optional().greedy()
  21. // expecting 2 or more occurrences
  22. start.timesOrMore(2)
  23. // expecting 2 or more occurrences and repeating as many as possible
  24. start.timesOrMore(2).greedy()
  25. // expecting 0, 2 or more occurrences
  26. start.timesOrMore(2).optional()
  27. // expecting 0, 2 or more occurrences and repeating as many as possible
  28. start.timesOrMore(2).optional().greedy()

条件

对于每个模式,您可以指定传入事件必须满足的条件,以便“接受”到模式中,例如,其值应大于5,或大于先前接受的事件的平均值。您可以指定经由事件属性条件pattern.where()pattern.or()pattern.until()方法。这些可以是IterativeConditions或SimpleConditions。

迭代条件:这是最常见的条件类型。这是您可以如何指定一个条件,该条件基于先前接受的事件的属性或其子集的统计信息来接受后续事件。

下面是迭代条件的代码,如果名称以“foo”开头,则接受名为“middle”的模式的下一个事件,并且如果该模式的先前接受的事件的价格总和加上当前的价格事件不要超过5.0的值。迭代条件可能很强大,特别是与循环模式结合使用,例如oneOrMore()

  1. middle.oneOrMore()
  2. .subtype(SubEvent.class)
  3. .where(new IterativeCondition<SubEvent>() {
  4. @Override
  5. public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
  6. if (!value.getName().startsWith("foo")) {
  7. return false;
  8. }
  9. double sum = value.getPrice();
  10. for (Event event : ctx.getEventsForPattern("middle")) {
  11. sum += event.getPrice();
  12. }
  13. return Double.compare(sum, 5.0) < 0;
  14. }
  15. });
  1. middle.oneOrMore()
  2. .subtype(classOf[SubEvent])
  3. .where(
  4. (value, ctx) => {
  5. lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
  6. value.getName.startsWith("foo") && sum + value.getPrice < 5.0
  7. }
  8. )

注意调用以ctx.getEventsForPattern(…)查找给定潜在匹配的所有先前接受的事件。此 算子操作的成本可能会有所不同,因此在实施您的条件时,请尽量Reduce其使用。

简单条件:此类条件扩展了上述IterativeCondition类,并基于事件本身的属性决定是否接受事件。

  1. start.where(new SimpleCondition<Event>() {
  2. @Override
  3. public boolean filter(Event value) {
  4. return value.getName().startsWith("foo");
  5. }
  6. });
  1. start.where(event => event.getName.startsWith("foo"))

最后,您还可以Event通过pattern.subtype(subClass)方法将接受事件的类型限制为初始事件类型(此处的子类型

  1. start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
  2. @Override
  3. public boolean filter(SubEvent value) {
  4. return ... // some condition
  5. }
  6. });
  1. start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

组合条件:如上所示,您可以将subtype条件与其他条件组合。这适用于所有条件。您可以通过顺序调用任意组合条件where()最终结果将是各个条件的结果的逻辑AND要使用OR组合条件,可以使用该or()方法,如下所示。

  1. pattern.where(new SimpleCondition<Event>() {
  2. @Override
  3. public boolean filter(Event value) {
  4. return ... // some condition
  5. }
  6. }).or(new SimpleCondition<Event>() {
  7. @Override
  8. public boolean filter(Event value) {
  9. return ... // or condition
  10. }
  11. });
  1. pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

停止条件:在循环模式(oneOrMore()oneOrMore().optional())的情况下,您还可以指定停止条件,例如,接受值大于5的事件,直到值的总和小于50。

为了更好地理解它,请看下面的示例。特定

  • 模式"(a+ until b)"(一个或多个"a"直到"b"

  • 一系列传入事件 "a1" "c" "a2" "b" "a3"

  • 该库将输出结果:{a1 a2} {a1} {a2} {a3}

由于停止条件,您可以看到{a1 a2 a3}或未{a2 a3}返回。

模式 算子操作描述
其中(条件)定义当前模式的条件。要匹配模式,事件必须满足条件。多个连续的where()子句导致其条件为AND:
  1. pattern.where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // some condition }});
或(条件)添加与现有条件进行OR运算的新条件。只有在至少通过其中一个条件时,事件才能匹配该模式:
  1. pattern.where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // some condition }}).or(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // alternative condition }});
直到(条件)指定循环模式的停止条件。意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。仅适用于 oneOrMore()注意:它允许在基于事件的条件下清除相应模式的状态。
  1. pattern.oneOrMore().until(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // alternative condition }});
亚型(子类)定义当前模式的子类型条件。如果事件属于此子类型,则事件只能匹配该模式:
  1. pattern.subtype(SubEvent.class);
一个或多个()指定此模式至少发生一次匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅连续注意:建议使用until()within()启用状态清除
  1. pattern.oneOrMore();
timesOrMore(#times)指定此模式至少需要#times出现匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅连续
  1. pattern.timesOrMore(2);
次(#ofTimes)指定此模式需要匹配事件的确切出现次数。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅连续
  1. pattern.times(2);
次(#fromTimes,#toTimes)指定此模式期望在匹配事件的#fromTimes#toTimes之间出现默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅连续
  1. pattern.times(2, 4);
Optional()指定此模式是可选的,即根本不会发生。这适用于所有上述量词。
  1. pattern.oneOrMore().optional();
贪婪()指定此模式是贪婪的,即它将尽可能多地重复。这仅适用于量词,目前不支持组模式。
  1. pattern.oneOrMore().greedy();
Pattern OperationDescription
where(condition)Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition.Multiple consecutive where() clauses lead to their conditions being ANDed:
  1. pattern.where(event => / some condition /)
or(condition)Adds a new condition which is ORed with an existing one. An event can match the pattern only if itpasses at least one of the conditions:
  1. pattern.where(event => / some condition /) .or(event => / alternative condition /)
until(condition)Specifies a stop condition for looping pattern. Meaning if event matching the given condition occurs, no moreevents will be accepted into the pattern.Applicable only in conjunction with oneOrMore()NOTE: It allows for cleaning state for corresponding pattern on event-based condition.
  1. pattern.oneOrMore().until(event => / some condition /)
subtype(subClass)Defines a subtype condition for the current pattern. An event can only match the pattern if it isof this subtype:
  1. pattern.subtype(classOf[SubEvent])
oneOrMore()Specifies that this pattern expects at least one occurrence of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info oninternal contiguity see consecutive.NOTE: It is advised to use either until() or within() to enable state clearing
  1. pattern.oneOrMore()
timesOrMore(#times)Specifies that this pattern expects at least #times occurrencesof a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info oninternal contiguity see consecutive.
  1. pattern.timesOrMore(2)
times(#ofTimes)Specifies that this pattern expects an exact number of occurrences of a matching event.By default a relaxed internal contiguity (between subsequent events) is used.For more info on internal contiguity see consecutive.
  1. pattern.times(2)
times(#fromTimes, #toTimes)Specifies that this pattern expects occurrences between #fromTimesand #toTimes of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info oninternal contiguity see consecutive.
  1. pattern.times(2, 4)
optional()Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to allaforementioned quantifiers.
  1. pattern.oneOrMore().optional()
greedy()Specifies that this pattern is greedy, i.e. it will repeat as many as possible. This is only applicableto quantifiers and it does not support group pattern currently.
  1. pattern.oneOrMore().greedy()

结合模式

现在你已经看到了单个模式的样子,现在是时候看看如何将它们组合成一个完整的模式序列。

模式序列必须以初始模式开始,如下所示:

  1. Pattern<Event, ?> start = Pattern.<Event>begin("start");
  1. val start : Pattern[Event, _] = Pattern.begin("start")

接下来,您可以通过指定它们之间所需的连续条件为模式序列添加更多模式FlinkCEP支持事件之间的以下形式的邻接:

  • 严格连续性:预期所有匹配事件一个接一个地出现,中间没有任何不匹配的事件。

  • 轻松连续性:忽略匹配的事件之间出现的不匹配事件。

  • 非确定性轻松连续性:进一步放宽连续性,允许忽略某些匹配事件的其他匹配。

要在连续模式之间应用它们,您可以使用:

  • next()严格来说
  • followedBy()放松,和
  • followedByAny(),对于非确定性放松连续性。要么

  • notNext(),如果您不希望事件类型直接跟随另一个事件类型

  • notFollowedBy(),如果您不希望事件类型在两个其他事件类型之间的任何位置。注意模式序列无法结束notFollowedBy()

注意一个NOT图案不能由一个可选的一个之后。

  1. // strict contiguity
  2. Pattern<Event, ?> strict = start.next("middle").where(...);
  3. // relaxed contiguity
  4. Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
  5. // non-deterministic relaxed contiguity
  6. Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
  7. // NOT pattern with strict contiguity
  8. Pattern<Event, ?> strictNot = start.notNext("not").where(...);
  9. // NOT pattern with relaxed contiguity
  10. Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
  1. // strict contiguity
  2. val strict: Pattern[Event, _] = start.next("middle").where(...)
  3. // relaxed contiguity
  4. val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
  5. // non-deterministic relaxed contiguity
  6. val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
  7. // NOT pattern with strict contiguity
  8. val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
  9. // NOT pattern with relaxed contiguity
  10. val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

宽松的连续性意味着仅匹配第一个匹配事件,而具有非确定性的松弛连续性,将针对同一开始发出多个匹配。作为示例,"a b"给定事件序列的模式"a", "c", "b1", "b2"将给出以下结果:

  • "a"和之间的严格连续性"b":( {}不匹配),"c""a"原因"a"被丢弃。

  • 之间轻松连续性"a""b"{a b1}作为放松的连续性被视为“跳过不匹配的事件,直到下一个匹配的一个”。

  • 非确定性之间轻松连续性"a""b"{a b1}{a b2},因为这是最普遍的形式。

也可以为模式定义时间约束以使其有效。例如,您可以通过该pattern.within()方法定义模式应在10秒内发生处理和事件时间都支持时间模式

注意模式序列只能有一个时间约束。如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。

  1. next.within(Time.seconds(10));
  1. next.within(Time.seconds(10))

循环模式中的邻接性

您可以在循环模式中应用与上一节中讨论的相同的连续条件连续性将应用于被接受到这种模式中的数据元之间。为了举例说明上述情况,一个模式序列"a b+ c""a"后跟一个或多个"b"任何(非确定性松弛)序列,然后a "c")输入"a", "b1", "d1", "b2", "d2", "b3" "c"将具有以下结果:

  • 严格连续性{a b3 c}-在"d1"之后"b1"的原因"b1"被丢弃,同样的情况发生了"b2",因为"d2"

  • 宽松的连续性{a b1 c}{a b1 b2 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c}- "d"的被忽略。

  • 非确定性轻松连续性{a b1 c}{a b1 b2 c}{a b1 b3 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c}-注意{a b1 b3 c},这是间放松连续性的结果"b"S”。

对于循环模式(例如oneOrMore()times()),默认是放松的连续性如果您想要严格的连续性,则必须使用该consecutive()呼叫明确指定它,如果您想要非确定性的宽松连续性,则可以使用该allowCombinations()呼叫。

模式 算子操作描述
连续的()匹配事件一起使用oneOrMore()times()强制执行严格的连续性,即任何不匹配的数据元都会中断匹配(如next())。如果不应用,则使用放松的连续性(如followedBy())。例如:
  1. Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); }}).followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); }}).oneOrMore().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); }});
将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B}没有连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
allowCombinations()匹配事件一起使用oneOrMore()并且times()在匹配事件之间施加非确定性松弛连续性(如followedByAny())。如果不应用,则使用放松的连续性(如followedBy())。例如:
  1. Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); }}).followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); }}).oneOrMore().allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); }});
将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.启用组合:{C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}未启用组合:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
Pattern OperationDescription
consecutive()Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matchingevents, i.e. any non-matching element breaks the match (as in next()).If not applied a relaxed contiguity (as in followedBy()) is used.E.g. a pattern like:
  1. Pattern.begin("start").where(.getName().equals("c")) .followedBy("middle").where(.getName().equals("a")) .oneOrMore().consecutive() .followedBy("end1").where(.getName().equals("b"))
Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 Bwith consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}
allowCombinations()Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguitybetween the matching events (as in followedByAny()).If not applied a relaxed contiguity (as in followedBy()) is used.E.g. a pattern like:
  1. Pattern.begin("start").where(.getName().equals("c")) .followedBy("middle").where(.getName().equals("a")) .oneOrMore().allowCombinations() .followedBy("end1").where(.getName().equals("b"))
Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 Bwith combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

模式组

它也可以定义一个模式序列作为条件beginfollowedByfollowedByAnynext图案序列将被认为是逻辑上的匹配条件和一个GroupPattern将被返回,并且可以应用oneOrMore()times(#ofTimes)times(#fromTimes, #toTimes)optional()consecutive()allowCombinations()GroupPattern

  1. Pattern<Event, ?> start = Pattern.begin(
  2. Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
  3. );
  4. // strict contiguity
  5. Pattern<Event, ?> strict = start.next(
  6. Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
  7. ).times(3);
  8. // relaxed contiguity
  9. Pattern<Event, ?> relaxed = start.followedBy(
  10. Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
  11. ).oneOrMore();
  12. // non-deterministic relaxed contiguity
  13. Pattern<Event, ?> nonDetermin = start.followedByAny(
  14. Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
  15. ).optional();
  1. val start: Pattern[Event, _] = Pattern.begin(
  2. Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
  3. )
  4. // strict contiguity
  5. val strict: Pattern[Event, _] = start.next(
  6. Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
  7. ).times(3)
  8. // relaxed contiguity
  9. val relaxed: Pattern[Event, _] = start.followedBy(
  10. Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
  11. ).oneOrMore()
  12. // non-deterministic relaxed contiguity
  13. val nonDetermin: Pattern[Event, _] = start.followedByAny(
  14. Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
  15. ).optional()
模式 算子操作描述
开始(#NAME)定义一个起始模式:
  1. Pattern<Event, ?> start = Pattern.<Event>begin("start");
开始(#pattern_sequence)定义一个起始模式:
  1. Pattern<Event, ?> start = Pattern.<Event>begin( Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
下一个(#NAME)添加新模式。匹配事件必须直接接替先前的匹配事件(严格连续性):
  1. Pattern<Event, ?> next = start.next("middle");
下一个(#pattern_sequence)添加新模式。一系列匹配事件必须直接接替先前的匹配事件(严格连续性):
  1. Pattern<Event, ?> next = start.next( Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
followedBy(#NAME)添加新模式。匹配事件和先前匹配事件(轻松连续)之间可能发生其他事件:
  1. Pattern<Event, ?> followedBy = start.followedBy("middle");
followedBy(#pattern_sequence)添加新模式。在一系列匹配事件和先前匹配事件(轻松连续)之间可能发生其他事件:
  1. Pattern<Event, ?> followedBy = start.followedBy( Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
followedByAny(#NAME)添加新模式。匹配事件和先前匹配事件之间可能发生其他事件,并且将针对每个备选匹配事件(非确定性放松连续性)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny("middle");
followedByAny(#pattern_sequence)添加新模式。在一系列匹配事件和先前匹配事件之间可能发生其他事件,并且将针对匹配事件的每个替代序列(非确定性松弛邻接)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny(    Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
notNext()添加新的负面模式。匹配(否定)事件必须直接成功执行先前的匹配事件(严格连续性)才能丢弃部分匹配:
Pattern<Event, ?> notNext = start.notNext("not");
notFollowedBy()添加新的负面模式。即使在匹配(否定)事件和先前匹配事件(松弛连续性)之间发生其他事件,也将丢弃部分匹配事件序列:
Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
内(时间)定义事件序列与模式匹配的最大时间间隔。如果未完成的事件序列超过此时间,则将其丢弃:
pattern.within(Time.seconds(10));
Pattern OperationDescription
begin(#name)Defines a starting pattern:
val start = Pattern.beginEvent
begin(#pattern_sequence)Defines a starting pattern:
val start = Pattern.begin(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
next(#name)Appends a new pattern. A matching event has to directly succeed the previous matching event(strict contiguity):
val next = start.next("middle")
next(#pattern_sequence)Appends a new pattern. A sequence of matching events have to directly succeed the previous matching event(strict contiguity):
val next = start.next(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
followedBy(#name)Appends a new pattern. Other events can occur between a matching event and the previousmatching event (relaxed contiguity) :
val followedBy = start.followedBy("middle")
followedBy(#pattern_sequence)Appends a new pattern. Other events can occur between a sequence of matching events and the previousmatching event (relaxed contiguity) :
val followedBy = start.followedBy(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
followedByAny(#name)Appends a new pattern. Other events can occur between a matching event and the previousmatching event, and alternative matches will be presented for every alternative matching event(non-deterministic relaxed contiguity):
val followedByAny = start.followedByAny("middle")
followedByAny(#pattern_sequence)Appends a new pattern. Other events can occur between a sequence of matching events and the previousmatching event, and alternative matches will be presented for every alternative sequence of matching events(non-deterministic relaxed contiguity):
val followedByAny = start.followedByAny(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
notNext()Appends a new negative pattern. A matching (negative) event has to directly succeed theprevious matching event (strict contiguity) for the partial match to be discarded:
val notNext = start.notNext("not")
notFollowedBy()Appends a new negative pattern. A partial matching event sequence will be discarded evenif other events occur between the matching (negative) event and the previous matching event(relaxed contiguity):
val notFollowedBy = start.notFollowedBy("not")
within(time)Defines the maximum time interval for an event sequence to match the pattern. If a non-completed eventsequence exceeds this time, it is discarded:
pattern.within(Time.seconds(10))

比赛后跳过策略

对于给定模式,可以将同一事件分配给多个成功匹配。要控制将分配事件的匹配数,您需要指定调用的跳过策略AfterMatchSkipStrategy跳过策略有四种类型,如下所示:

  • NO_SKIP:将发出每个可能的匹配。
  • SKIP_PAST_LAST_EVENT:丢弃在匹配开始后但在结束之前开始的每个部分匹配。
  • SKIP_TO_FIRST:丢弃在比赛开始后但在 PatternName的第一个事件发生之前开始的每个部分匹配。
  • SKIP_TO_LAST:放弃在比赛开始之后但在 PatternName的最后一个事件发生之前开始的每个部分匹配。请注意,使用SKIP_TO_FIRSTSKIP_TO_LAST跳过策略时,还应指定有效的PatternName

例如,对于给定模式b+ c和数据流b1 b2 b3 c,这四种跳过策略之间的差异如下:

跳过策略结果 描述
NO_SKIPb1 b2 b3 cb2 b3 cb3 c找到匹配后b1 b2 b3 c,匹配过程不会丢弃任何结果。
SKIP_PAST_LAST_EVENTb1 b2 b3 c找到匹配后b1 b2 b3 c,匹配过程将丢弃所有已开始的部分匹配。
SKIP_TO_FIRST [ b]b1 b2 b3 cb2 b3 cb3 c找到匹配后b1 b2 b3 c,匹配过程将尝试丢弃之前开始的所有部分匹配b1,但是没有这样的匹配。因此,不会丢弃任何东西。
*SKIP_TO_LAST [ b]b1 b2 b3 cb3 c找到匹配后b1 b2 b3 c,匹配过程将尝试丢弃之前开始的所有部分匹配b3有一个这样的比赛b2 b3 c

看看另一个例子,以便更好地看到NO_SKIP和SKIP_TO_FIRST之间的区别:模式:(a | c) (b | c) c+.greedy d和序列:a b c1 c2 c3 d然后结果将是:

跳过策略结果 描述
NO_SKIPa b c1 c2 c3 db c1 c2 c3 dc1 c2 c3 dc2 c3 d找到匹配后a b c1 c2 c3 d,匹配过程不会丢弃任何结果。
SKIP_TO_FIRST [ b*]a b c1 c2 c3 dc1 c2 c3 d找到匹配后a b c1 c2 c3 d,匹配过程将尝试丢弃之前开始的所有部分匹配c1有一个这样的比赛b c1 c2 c3 d

要指定要使用的跳过策略,只需AfterMatchSkipStrategy通过调用创建

函数描述
AfterMatchSkipStrategy.noSkip()创建NO_SKIP跳过策略
AfterMatchSkipStrategy.skipPastLastEvent()创建SKIP_PAST_LAST_EVENT跳过策略
AfterMatchSkipStrategy.skipToFirst(patternName)使用引用的模式名称patternName创建SKIP_TO_FIRST跳过策略
AfterMatchSkipStrategy.skipToLast(patternName)使用引用的模式名称patternName创建SKIP_TO_LAST跳过策略

然后通过调用将跳过策略应用于模式:

AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);
val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)

检测模式

指定要查找的模式序列后,是时候将其应用于输入流以检测潜在匹配。要针对模式序列运行事件流,您必须创建一个PatternStream给定一个输入流input,一个模式pattern和一个可选的比较器,comparator用于在EventTime的情况下对具有相同时间戳的事件进行排序,或者在同一时刻到达,PatternStream通过调用来创建

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

输入流可以被键入不带键的取决于你的使用情况。

注意在非被Key化的数据流上应用模式将导致并行度等于1的作业。

从模式中选择

获得a后,PatternStream您可以通过selectflatSelect方法从检测到的事件序列中进行选择

select()方法需要PatternSelectFunction实现。A PatternSelectFunction具有select为每个匹配事件序列调用方法。Map<String, List<IN>>以键的形式接收匹配,其中键是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是输入数据元的类型)。给定模式的事件按时间戳排序。返回每个模式的接受事件列表的原因是当使用循环模式(例如oneToMany()times())时,对于给定模式可以接受多个事件。选择函数只返回一个结果。

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, List<IN>> pattern) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);
        return new OUT(startEvent, endEvent);
    }
}

A PatternFlatSelectFunction类似于PatternSelectFunction,唯一的区别是它可以返回任意数量的结果。为此,该select方法有一个附加Collector参数,用于将输出数据元向下游转发。

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    @Override
    public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

The select() method takes a selection function as argument, which is called for each matching event sequence.It receives a match in the form of Map[String, Iterable[IN]] where the key is the name of each pattern in your patternsequence and the value is an Iterable over all accepted events for that pattern (IN is the type of your input elements).

The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each pattern is that when using looping patterns (e.g. oneToMany() and times()), more than one event may be accepted for a given pattern. The selection function returns exactly one result per call.

def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
    val startEvent = pattern.get("start").get.next
    val endEvent = pattern.get("end").get.next
    OUT(startEvent, endEvent)
}

The flatSelect method is similar to the select method. Their only difference is that the function passed to theflatSelect method can return an arbitrary number of results per call. In order to do this, the function forflatSelect has an additional Collector parameter which is used to forward your output elements downstream.

def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
    val startEvent = pattern.get("start").get.next
    val endEvent = pattern.get("end").get.next
    for (i <- 0 to startEvent.getValue) {
        collector.collect(OUT(startEvent, endEvent))
    }
}

处理超时部分模式

每当模式通过within关键字附加窗口长度时,部分事件序列可能因为超出窗口长度而被丢弃。为了对这些超时的部分匹配做出反应selectflatSelectAPI调用允许您指定超时处理程序。为每个超时的部分事件序列调用此超时处理程序。超时处理程序接收到目前为止由模式匹配的所有事件,以及检测到超时时的时间戳。

为了处理部分模式,selectflatSelectAPI调用提供了一个重载版本,它作为参数

  • PatternTimeoutFunction/PatternFlatTimeoutFunction
  • 将返回与超时匹配的旁路输出的OutputTag
  • 和已知PatternSelectFunction/ PatternFlatSelectFunction
  • Java
  • Scala
PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
    outputTag,
    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);

SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
    outputTag,
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
    pattern: Map[String, Iterable[Event]] => ComplexEvent()
}

val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)

The flatSelect API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.In contrast to the select functions, the flatSelect functions are called with a Collector. You can use the collector to emit an arbitrary number of events.

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)

处理事件时间的延迟

CEP顺序的元件,其中被处理的事项。为了保证在事件时间工作时按正确的顺序处理数据元,传入的数据元最初放在缓冲区中,其中数据元根据其时间戳按升序排序,当水印到达时,此缓冲区中的所有数据元都包含在处理小于水印的时间戳。这意味着水印之间的数据元按事件时间顺序处理。

注意在事件时间工作时,库假定水印的正确性。

为了保证跨越水印的数据元按事件时间顺序处理,Flink的CEP库假定水印的正确性,并将其视为时间戳小于上次看到的水印的后期数据元。后期数据元不会被进一步处理。此外,您可以指定sideOutput标记来收集最后看到的水印之后的后期数据元,您可以像这样使用它。

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
    .sideOutputLateData(lateDataOutputTag)
    .select(
        new PatternSelectFunction<Event, ComplexEvent>() {...}
    );

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val lateDataOutputTag = OutputTag[String]("late-data")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
      .sideOutputLateData(lateDataOutputTag)
      .select{
          pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent()
      }

val lateData: DataStream<String> = result.getSideOutput(lateDataOutputTag)

例子

以下示例检测start, middle(name = "error") -> end(name = "critical")被Keys化数据流上的模式Events事件由他们的ids 键入,有效模式必须在10秒内发生。整个处理是在事件时间完成的。

StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
    @Override
    public Integer getKey(Event value) throws Exception {
        return value.getId();
    }
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
        }
    }).followedBy("end").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");
        }
    }).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
    @Override
    public Alert select(Map<String, List<Event>> pattern) throws Exception {
        return createAlert(pattern);
    }
});
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin[Event]("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_)))

迁移到1.4+

在Flink-1.4中,CEP库与<= Flink 1.2的向后兼容性被删除。遗憾的是,无法恢复曾经使用1.2.x运行的CEP作业

迁移到1.3.x.

Flink-1.3中的CEP库附带了许多新函数,这些函数导致了API的一些变化。在这里,我们描述了您需要对旧CEP作业进行的更改,以便能够使用Flink-1.3运行它们。完成这些更改并重新编译作业后,您将能够从使用旧版本作业的保存点恢复执行,无需重新处理过去的数据。

所需的更改是:

  • 更改条件(where(…)子句中的条件)以扩展SimpleCondition类而不是实现FilterFunction接口。

  • 将作为参数提供的函数更改为select(…)flatSelect(…)方法,以期望与每个模式关联的事件列表(Listin JavaIterablein Scala)。这是因为通过添加循环模式,多个输入事件可以匹配单个(循环)模式。

  • followedBy()在Flink1.1和1.2隐含的non-deterministic relaxed contiguity(见这里)。在Flink 1.3中,这已经改变并followedBy()隐式relaxed contiguityfollowedByAny()如果non-deterministic relaxed contiguityRequired则应该使用。