旁路输出

In addition to the main stream that results from DataStream operations, you can also produce anynumber of additional side output result streams. The type of data in the result streams does nothave to match the type of data in the main stream and the types of the different side outputs canalso differ. This operation can be useful when you want to split a stream of data where you wouldnormally have to replicate the stream and then filter out from each stream the data that you don’twant to have.

When using side outputs, you first need to define an OutputTag that will be used to identify aside output stream:

  1. // this needs to be an anonymous inner class, so that we can analyze the type
  2. OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
  1. val outputTag = OutputTag[String]("side-output")

Notice how the OutputTag is typed according to the type of elements that the side output streamcontains.

Emitting data to a side output is possible from the following functions:

You can use the Context parameter, which is exposed to users in the above functions, to emitdata to a side output identified by an OutputTag. Here is an example of emitting side outputdata from a ProcessFunction:

  1. DataStream<Integer> input = ...;
  2. final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
  3. SingleOutputStreamOperator<Integer> mainDataStream = input
  4. .process(new ProcessFunction<Integer, Integer>() {
  5. @Override
  6. public void processElement(
  7. Integer value,
  8. Context ctx,
  9. Collector<Integer> out) throws Exception {
  10. // emit data to regular output
  11. out.collect(value);
  12. // emit data to side output
  13. ctx.output(outputTag, "sideout-" + String.valueOf(value));
  14. }
  15. });
  1. val input: DataStream[Int] = ...
  2. val outputTag = OutputTag[String]("side-output")
  3. val mainDataStream = input
  4. .process(new ProcessFunction[Int, Int] {
  5. override def processElement(
  6. value: Int,
  7. ctx: ProcessFunction[Int, Int]#Context,
  8. out: Collector[Int]): Unit = {
  9. // emit data to regular output
  10. out.collect(value)
  11. // emit data to side output
  12. ctx.output(outputTag, "sideout-" + String.valueOf(value))
  13. }
  14. })

For retrieving the side output stream you use getSideOutput(OutputTag)on the result of the DataStream operation. This will give you a DataStream that is typedto the result of the side output stream:

  1. final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
  2. SingleOutputStreamOperator<Integer> mainDataStream = ...;
  3. DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
  1. val outputTag = OutputTag[String]("side-output")
  2. val mainDataStream = ...
  3. val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)