FlinkCEP - Flink的复杂事件处理


此页面描述了Flink CEP中可用的API调用。我们首先介绍Pattern API,它允许您指定要在流中检测的模式,然后介绍如何检测匹配事件序列并对其进行 算子操作然后,我们将介绍CEP库在处理事件时间延迟时所做的假设,以及如何将您的工作从较旧的Flink版本迁移到Flink-1.3。



  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep-scala_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

信息 FlinkCEP不是二进制分发的一部分。在此处了解如何与集群执行相关联

现在,您可以使用Pattern API开始编写第一个CEP程序。


  1. DataStream<Event> input = ...
  2. Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
  3. new SimpleCondition<Event>() {
  4. @Override
  5. public boolean filter(Event event) {
  6. return event.getId() == 42;
  7. }
  8. }
  9. ).next("middle").subtype(SubEvent.class).where(
  10. new SimpleCondition<SubEvent>() {
  11. @Override
  12. public boolean filter(SubEvent subEvent) {
  13. return subEvent.getVolume() >= 10.0;
  14. }
  15. }
  16. ).followedBy("end").where(
  17. new SimpleCondition<Event>() {
  18. @Override
  19. public boolean filter(Event event) {
  20. return event.getName().equals("end");
  21. }
  22. }
  23. );
  24. PatternStream<Event> patternStream = CEP.pattern(input, pattern);
  25. DataStream<Alert> result = patternStream.select(
  26. new PatternSelectFunction<Event, Alert>() {
  27. @Override
  28. public Alert select(Map<String, List<Event>> pattern) throws Exception {
  29. return createAlertFrom(pattern);
  30. }
  31. }
  32. });
  1. val input: DataStream[Event] = ...
  2. val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  3. .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  4. .followedBy("end").where(_.getName == "end")
  5. val patternStream = CEP.pattern(input, pattern)
  6. val result: DataStream[Alert] = patternStream.select(createAlert(_))

Pattern API







一个模式可以是或一个循环模式。单例模式接受单个事件,而循环模式可以接受多个事件。在模式匹配的符号,图案"a b+ c? d"(或"a",随后一个或多个 "b"的,任选接着一个"c",接着是"d"), ,ac?d是单模式,同时b+是一个循环的一个。默认情况下,模式是单例模式,您可以使用Quantifiers将其转换为循环模式每个模式可以有一个或多个条件,基于它接受事件。


在FlinkCEP中,您可以使用以下方法指定循环模式:pattern.oneOrMore()对于期望一个或多个事件发生的模式(例如b+前面提到的); 并且pattern.times(#ofTimes),对于期望特定类型事件的特定出现次数的模式,例如4 a; 并且pattern.times(#fromTimes, #toTimes),对于期望特定最小出现次数和给定类型事件的最大出现次数的模式,例如2-4s a



  1. // expecting 4 occurrences
  2. start.times(4);
  3. // expecting 0 or 4 occurrences
  4. start.times(4).optional();
  5. // expecting 2, 3 or 4 occurrences
  6. start.times(2, 4);
  7. // expecting 2, 3 or 4 occurrences and repeating as many as possible
  8. start.times(2, 4).greedy();
  9. // expecting 0, 2, 3 or 4 occurrences
  10. start.times(2, 4).optional();
  11. // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
  12. start.times(2, 4).optional().greedy();
  13. // expecting 1 or more occurrences
  14. start.oneOrMore();
  15. // expecting 1 or more occurrences and repeating as many as possible
  16. start.oneOrMore().greedy();
  17. // expecting 0 or more occurrences
  18. start.oneOrMore().optional();
  19. // expecting 0 or more occurrences and repeating as many as possible
  20. start.oneOrMore().optional().greedy();
  21. // expecting 2 or more occurrences
  22. start.timesOrMore(2);
  23. // expecting 2 or more occurrences and repeating as many as possible
  24. start.timesOrMore(2).greedy();
  25. // expecting 0, 2 or more occurrences and repeating as many as possible
  26. start.timesOrMore(2).optional().greedy();
  1. // expecting 4 occurrences
  2. start.times(4)
  3. // expecting 0 or 4 occurrences
  4. start.times(4).optional()
  5. // expecting 2, 3 or 4 occurrences
  6. start.times(2, 4)
  7. // expecting 2, 3 or 4 occurrences and repeating as many as possible
  8. start.times(2, 4).greedy()
  9. // expecting 0, 2, 3 or 4 occurrences
  10. start.times(2, 4).optional()
  11. // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
  12. start.times(2, 4).optional().greedy()
  13. // expecting 1 or more occurrences
  14. start.oneOrMore()
  15. // expecting 1 or more occurrences and repeating as many as possible
  16. start.oneOrMore().greedy()
  17. // expecting 0 or more occurrences
  18. start.oneOrMore().optional()
  19. // expecting 0 or more occurrences and repeating as many as possible
  20. start.oneOrMore().optional().greedy()
  21. // expecting 2 or more occurrences
  22. start.timesOrMore(2)
  23. // expecting 2 or more occurrences and repeating as many as possible
  24. start.timesOrMore(2).greedy()
  25. // expecting 0, 2 or more occurrences
  26. start.timesOrMore(2).optional()
  27. // expecting 0, 2 or more occurrences and repeating as many as possible
  28. start.timesOrMore(2).optional().greedy()





  1. middle.oneOrMore()
  2. .subtype(SubEvent.class)
  3. .where(new IterativeCondition<SubEvent>() {
  4. @Override
  5. public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
  6. if (!value.getName().startsWith("foo")) {
  7. return false;
  8. }
  9. double sum = value.getPrice();
  10. for (Event event : ctx.getEventsForPattern("middle")) {
  11. sum += event.getPrice();
  12. }
  13. return Double.compare(sum, 5.0) < 0;
  14. }
  15. });
  1. middle.oneOrMore()
  2. .subtype(classOf[SubEvent])
  3. .where(
  4. (value, ctx) => {
  5. lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
  6. value.getName.startsWith("foo") && sum + value.getPrice < 5.0
  7. }
  8. )

注意调用以ctx.getEventsForPattern(…)查找给定潜在匹配的所有先前接受的事件。此 算子操作的成本可能会有所不同,因此在实施您的条件时,请尽量Reduce其使用。


  1. start.where(new SimpleCondition<Event>() {
  2. @Override
  3. public boolean filter(Event value) {
  4. return value.getName().startsWith("foo");
  5. }
  6. });
  1. start.where(event => event.getName.startsWith("foo"))


  1. start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
  2. @Override
  3. public boolean filter(SubEvent value) {
  4. return ... // some condition
  5. }
  6. });
  1. start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)


  1. pattern.where(new SimpleCondition<Event>() {
  2. @Override
  3. public boolean filter(Event value) {
  4. return ... // some condition
  5. }
  6. }).or(new SimpleCondition<Event>() {
  7. @Override
  8. public boolean filter(Event value) {
  9. return ... // or condition
  10. }
  11. });
  1. pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)



  • 模式"(a+ until b)"(一个或多个"a"直到"b"

  • 一系列传入事件 "a1" "c" "a2" "b" "a3"

  • 该库将输出结果:{a1 a2} {a1} {a2} {a3}

由于停止条件,您可以看到{a1 a2 a3}或未{a2 a3}返回。

模式 算子操作描述
  1. pattern.where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // some condition }});
  1. pattern.where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // some condition }}).or(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // alternative condition }});
直到(条件)指定循环模式的停止条件。意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。仅适用于 oneOrMore()注意:它允许在基于事件的条件下清除相应模式的状态。
  1. pattern.oneOrMore().until(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return // alternative condition }});
  1. pattern.subtype(SubEvent.class);
  1. pattern.oneOrMore();
  1. pattern.timesOrMore(2);
  1. pattern.times(2);
  1. pattern.times(2, 4);
  1. pattern.oneOrMore().optional();
  1. pattern.oneOrMore().greedy();
Pattern OperationDescription
where(condition)Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition.Multiple consecutive where() clauses lead to their conditions being ANDed:
  1. pattern.where(event => / some condition /)
or(condition)Adds a new condition which is ORed with an existing one. An event can match the pattern only if itpasses at least one of the conditions:
  1. pattern.where(event => / some condition /) .or(event => / alternative condition /)
until(condition)Specifies a stop condition for looping pattern. Meaning if event matching the given condition occurs, no moreevents will be accepted into the pattern.Applicable only in conjunction with oneOrMore()NOTE: It allows for cleaning state for corresponding pattern on event-based condition.
  1. pattern.oneOrMore().until(event => / some condition /)
subtype(subClass)Defines a subtype condition for the current pattern. An event can only match the pattern if it isof this subtype:
  1. pattern.subtype(classOf[SubEvent])
oneOrMore()Specifies that this pattern expects at least one occurrence of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info oninternal contiguity see consecutive.NOTE: It is advised to use either until() or within() to enable state clearing
  1. pattern.oneOrMore()
timesOrMore(#times)Specifies that this pattern expects at least #times occurrencesof a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info oninternal contiguity see consecutive.
  1. pattern.timesOrMore(2)
times(#ofTimes)Specifies that this pattern expects an exact number of occurrences of a matching event.By default a relaxed internal contiguity (between subsequent events) is used.For more info on internal contiguity see consecutive.
  1. pattern.times(2)
times(#fromTimes, #toTimes)Specifies that this pattern expects occurrences between #fromTimesand #toTimes of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info oninternal contiguity see consecutive.
  1. pattern.times(2, 4)
optional()Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to allaforementioned quantifiers.
  1. pattern.oneOrMore().optional()
greedy()Specifies that this pattern is greedy, i.e. it will repeat as many as possible. This is only applicableto quantifiers and it does not support group pattern currently.
  1. pattern.oneOrMore().greedy()




  1. Pattern<Event, ?> start = Pattern.<Event>begin("start");
  1. val start : Pattern[Event, _] = Pattern.begin("start")


  • 严格连续性:预期所有匹配事件一个接一个地出现,中间没有任何不匹配的事件。

  • 轻松连续性:忽略匹配的事件之间出现的不匹配事件。

  • 非确定性轻松连续性:进一步放宽连续性,允许忽略某些匹配事件的其他匹配。


  • next()严格来说
  • followedBy()放松,和
  • followedByAny(),对于非确定性放松连续性。要么

  • notNext(),如果您不希望事件类型直接跟随另一个事件类型

  • notFollowedBy(),如果您不希望事件类型在两个其他事件类型之间的任何位置。注意模式序列无法结束notFollowedBy()


  1. // strict contiguity
  2. Pattern<Event, ?> strict = start.next("middle").where(...);
  3. // relaxed contiguity
  4. Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
  5. // non-deterministic relaxed contiguity
  6. Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
  7. // NOT pattern with strict contiguity
  8. Pattern<Event, ?> strictNot = start.notNext("not").where(...);
  9. // NOT pattern with relaxed contiguity
  10. Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
  1. // strict contiguity
  2. val strict: Pattern[Event, _] = start.next("middle").where(...)
  3. // relaxed contiguity
  4. val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
  5. // non-deterministic relaxed contiguity
  6. val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
  7. // NOT pattern with strict contiguity
  8. val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
  9. // NOT pattern with relaxed contiguity
  10. val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

宽松的连续性意味着仅匹配第一个匹配事件,而具有非确定性的松弛连续性,将针对同一开始发出多个匹配。作为示例,"a b"给定事件序列的模式"a", "c", "b1", "b2"将给出以下结果:

  • "a"和之间的严格连续性"b":( {}不匹配),"c""a"原因"a"被丢弃。

  • 之间轻松连续性"a""b"{a b1}作为放松的连续性被视为“跳过不匹配的事件,直到下一个匹配的一个”。

  • 非确定性之间轻松连续性"a""b"{a b1}{a b2},因为这是最普遍的形式。



  1. next.within(Time.seconds(10));
  1. next.within(Time.seconds(10))


您可以在循环模式中应用与上一节中讨论的相同的连续条件连续性将应用于被接受到这种模式中的数据元之间。为了举例说明上述情况,一个模式序列"a b+ c""a"后跟一个或多个"b"任何(非确定性松弛)序列,然后a "c")输入"a", "b1", "d1", "b2", "d2", "b3" "c"将具有以下结果:

  • 严格连续性{a b3 c}-在"d1"之后"b1"的原因"b1"被丢弃,同样的情况发生了"b2",因为"d2"

  • 宽松的连续性{a b1 c}{a b1 b2 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c}- "d"的被忽略。

  • 非确定性轻松连续性{a b1 c}{a b1 b2 c}{a b1 b3 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c}-注意{a b1 b3 c},这是间放松连续性的结果"b"S”。


模式 算子操作描述
  1. Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); }}).followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); }}).oneOrMore().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); }});
将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B}没有连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
  1. Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); }}).followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); }}).oneOrMore().allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); }});
将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.启用组合:{C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}未启用组合:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
Pattern OperationDescription
consecutive()Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matchingevents, i.e. any non-matching element breaks the match (as in next()).If not applied a relaxed contiguity (as in followedBy()) is used.E.g. a pattern like:
  1. Pattern.begin("start").where(.getName().equals("c")) .followedBy("middle").where(.getName().equals("a")) .oneOrMore().consecutive() .followedBy("end1").where(.getName().equals("b"))
Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 Bwith consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}
allowCombinations()Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguitybetween the matching events (as in followedByAny()).If not applied a relaxed contiguity (as in followedBy()) is used.E.g. a pattern like:
  1. Pattern.begin("start").where(.getName().equals("c")) .followedBy("middle").where(.getName().equals("a")) .oneOrMore().allowCombinations() .followedBy("end1").where(.getName().equals("b"))
Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 Bwith combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}


它也可以定义一个模式序列作为条件beginfollowedByfollowedByAnynext图案序列将被认为是逻辑上的匹配条件和一个GroupPattern将被返回,并且可以应用oneOrMore()times(#ofTimes)times(#fromTimes, #toTimes)optional()consecutive()allowCombinations()GroupPattern

  1. Pattern<Event, ?> start = Pattern.begin(
  2. Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
  3. );
  4. // strict contiguity
  5. Pattern<Event, ?> strict = start.next(
  6. Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
  7. ).times(3);
  8. // relaxed contiguity
  9. Pattern<Event, ?> relaxed = start.followedBy(
  10. Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
  11. ).oneOrMore();
  12. // non-deterministic relaxed contiguity
  13. Pattern<Event, ?> nonDetermin = start.followedByAny(
  14. Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
  15. ).optional();
  1. val start: Pattern[Event, _] = Pattern.begin(
  2. Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
  3. )
  4. // strict contiguity
  5. val strict: Pattern[Event, _] = start.next(
  6. Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
  7. ).times(3)
  8. // relaxed contiguity
  9. val relaxed: Pattern[Event, _] = start.followedBy(
  10. Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
  11. ).oneOrMore()
  12. // non-deterministic relaxed contiguity
  13. val nonDetermin: Pattern[Event, _] = start.followedByAny(
  14. Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
  15. ).optional()
模式 算子操作描述
  1. Pattern<Event, ?> start = Pattern.<Event>begin("start");
  1. Pattern<Event, ?> start = Pattern.<Event>begin( Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
  1. Pattern<Event, ?> next = start.next("middle");
  1. Pattern<Event, ?> next = start.next( Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
  1. Pattern<Event, ?> followedBy = start.followedBy("middle");
  1. Pattern<Event, ?> followedBy = start.followedBy( Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
Pattern<Event, ?> followedByAny = start.followedByAny("middle");
Pattern<Event, ?> followedByAny = start.followedByAny(    Pattern.<Event>begin("start").where(…).followedBy("middle").where(…));
Pattern<Event, ?> notNext = start.notNext("not");
Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
Pattern OperationDescription
begin(#name)Defines a starting pattern:
val start = Pattern.beginEvent
begin(#pattern_sequence)Defines a starting pattern:
val start = Pattern.begin(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
next(#name)Appends a new pattern. A matching event has to directly succeed the previous matching event(strict contiguity):
val next = start.next("middle")
next(#pattern_sequence)Appends a new pattern. A sequence of matching events have to directly succeed the previous matching event(strict contiguity):
val next = start.next(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
followedBy(#name)Appends a new pattern. Other events can occur between a matching event and the previousmatching event (relaxed contiguity) :
val followedBy = start.followedBy("middle")
followedBy(#pattern_sequence)Appends a new pattern. Other events can occur between a sequence of matching events and the previousmatching event (relaxed contiguity) :
val followedBy = start.followedBy(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
followedByAny(#name)Appends a new pattern. Other events can occur between a matching event and the previousmatching event, and alternative matches will be presented for every alternative matching event(non-deterministic relaxed contiguity):
val followedByAny = start.followedByAny("middle")
followedByAny(#pattern_sequence)Appends a new pattern. Other events can occur between a sequence of matching events and the previousmatching event, and alternative matches will be presented for every alternative sequence of matching events(non-deterministic relaxed contiguity):
val followedByAny = start.followedByAny(    Pattern.beginEvent.where(…).followedBy("middle").where(…))
notNext()Appends a new negative pattern. A matching (negative) event has to directly succeed theprevious matching event (strict contiguity) for the partial match to be discarded:
val notNext = start.notNext("not")
notFollowedBy()Appends a new negative pattern. A partial matching event sequence will be discarded evenif other events occur between the matching (negative) event and the previous matching event(relaxed contiguity):
val notFollowedBy = start.notFollowedBy("not")
within(time)Defines the maximum time interval for an event sequence to match the pattern. If a non-completed eventsequence exceeds this time, it is discarded:



  • NO_SKIP:将发出每个可能的匹配。
  • SKIP_PAST_LAST_EVENT:丢弃在匹配开始后但在结束之前开始的每个部分匹配。
  • SKIP_TO_FIRST:丢弃在比赛开始后但在 PatternName的第一个事件发生之前开始的每个部分匹配。
  • SKIP_TO_LAST:放弃在比赛开始之后但在 PatternName的最后一个事件发生之前开始的每个部分匹配。请注意,使用SKIP_TO_FIRSTSKIP_TO_LAST跳过策略时,还应指定有效的PatternName

例如,对于给定模式b+ c和数据流b1 b2 b3 c,这四种跳过策略之间的差异如下:

跳过策略结果 描述
NO_SKIPb1 b2 b3 cb2 b3 cb3 c找到匹配后b1 b2 b3 c,匹配过程不会丢弃任何结果。
SKIP_PAST_LAST_EVENTb1 b2 b3 c找到匹配后b1 b2 b3 c,匹配过程将丢弃所有已开始的部分匹配。
SKIP_TO_FIRST [ b]b1 b2 b3 cb2 b3 cb3 c找到匹配后b1 b2 b3 c,匹配过程将尝试丢弃之前开始的所有部分匹配b1,但是没有这样的匹配。因此,不会丢弃任何东西。
*SKIP_TO_LAST [ b]b1 b2 b3 cb3 c找到匹配后b1 b2 b3 c,匹配过程将尝试丢弃之前开始的所有部分匹配b3有一个这样的比赛b2 b3 c

看看另一个例子,以便更好地看到NO_SKIP和SKIP_TO_FIRST之间的区别:模式:(a | c) (b | c) c+.greedy d和序列:a b c1 c2 c3 d然后结果将是:

跳过策略结果 描述
NO_SKIPa b c1 c2 c3 db c1 c2 c3 dc1 c2 c3 dc2 c3 d找到匹配后a b c1 c2 c3 d,匹配过程不会丢弃任何结果。
SKIP_TO_FIRST [ b*]a b c1 c2 c3 dc1 c2 c3 d找到匹配后a b c1 c2 c3 d,匹配过程将尝试丢弃之前开始的所有部分匹配c1有一个这样的比赛b c1 c2 c3 d




AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);
val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)



DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)





select()方法需要PatternSelectFunction实现。A PatternSelectFunction具有select为每个匹配事件序列调用方法。Map<String, List<IN>>以键的形式接收匹配,其中键是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是输入数据元的类型)。给定模式的事件按时间戳排序。返回每个模式的接受事件列表的原因是当使用循环模式(例如oneToMany()times())时,对于给定模式可以接受多个事件。选择函数只返回一个结果。

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    public OUT select(Map<String, List<IN>> pattern) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);
        return new OUT(startEvent, endEvent);

A PatternFlatSelectFunction类似于PatternSelectFunction,唯一的区别是它可以返回任意数量的结果。为此,该select方法有一个附加Collector参数,用于将输出数据元向下游转发。

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));

The select() method takes a selection function as argument, which is called for each matching event sequence.It receives a match in the form of Map[String, Iterable[IN]] where the key is the name of each pattern in your patternsequence and the value is an Iterable over all accepted events for that pattern (IN is the type of your input elements).

The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each pattern is that when using looping patterns (e.g. oneToMany() and times()), more than one event may be accepted for a given pattern. The selection function returns exactly one result per call.

def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
    val startEvent = pattern.get("start").get.next
    val endEvent = pattern.get("end").get.next
    OUT(startEvent, endEvent)

The flatSelect method is similar to the select method. Their only difference is that the function passed to theflatSelect method can return an arbitrary number of results per call. In order to do this, the function forflatSelect has an additional Collector parameter which is used to forward your output elements downstream.

def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
    val startEvent = pattern.get("start").get.next
    val endEvent = pattern.get("end").get.next
    for (i <- 0 to startEvent.getValue) {
        collector.collect(OUT(startEvent, endEvent))




  • PatternTimeoutFunction/PatternFlatTimeoutFunction
  • 将返回与超时匹配的旁路输出的OutputTag
  • 和已知PatternSelectFunction/ PatternFlatSelectFunction
  • Java
  • Scala
PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternSelectFunction<Event, ComplexEvent>() {...}

DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);

SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}

DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
    pattern: Map[String, Iterable[Event]] => ComplexEvent()

val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)

The flatSelect API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.In contrast to the select functions, the flatSelect functions are called with a Collector. You can use the collector to emit an arbitrary number of events.

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>

val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)





PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
        new PatternSelectFunction<Event, ComplexEvent>() {...}

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val lateDataOutputTag = OutputTag[String]("late-data")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
          pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent()

val lateData: DataStream<String> = result.getSideOutput(lateDataOutputTag)


以下示例检测start, middle(name = "error") -> end(name = "critical")被Keys化数据流上的模式Events事件由他们的ids 键入,有效模式必须在10秒内发生。整个处理是在事件时间完成的。

StreamExecutionEnvironment env = ...

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
    public Integer getKey(Event value) throws Exception {
        return value.getId();

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
    }).followedBy("end").where(new SimpleCondition<Event>() {
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
    public Alert select(Map<String, List<Event>> pattern) throws Exception {
        return createAlert(pattern);
val env : StreamExecutionEnvironment = ...

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin[Event]("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_)))


在Flink-1.4中,CEP库与<= Flink 1.2的向后兼容性被删除。遗憾的是,无法恢复曾经使用1.2.x运行的CEP作业




  • 更改条件(where(…)子句中的条件)以扩展SimpleCondition类而不是实现FilterFunction接口。

  • 将作为参数提供的函数更改为select(…)flatSelect(…)方法,以期望与每个模式关联的事件列表(Listin JavaIterablein Scala)。这是因为通过添加循环模式,多个输入事件可以匹配单个(循环)模式。

  • followedBy()在Flink1.1和1.2隐含的non-deterministic relaxed contiguity(见这里)。在Flink 1.3中,这已经改变并followedBy()隐式relaxed contiguityfollowedByAny()如果non-deterministic relaxed contiguityRequired则应该使用。