ERROR OneForOneStrategy

如果你在 Spark Streaming 里启用 checkpointing,forEachRDD 函数使用的对象都应该可以被序列化(Serializable)。否则会出现这样的异常 “ERROR OneForOneStrategy: … java.io.NotSerializableException:”

  1. JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);
  2. // This enables checkpointing.
  3. jssc.checkpoint("/tmp/checkpoint_test");
  4. JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999);
  5. NotSerializable notSerializable = new NotSerializable();
  6. dStream.foreachRDD(rdd -> {
  7. if (rdd.count() == 0) {
  8. return null;
  9. }
  10. String first = rdd.first();
  11. notSerializable.doSomething(first);
  12. return null;
  13. }
  14. );
  15. // This does not work!!!!

按照下面的方式之一进行修改,上面的代码才能正常运行:

  • 在配置文件里面删除 jssc.checkpoint 这一行关闭 checkpointing。
  • 让对象能被序列化。
  • 在 forEachRDD 函数里面声明 NotSerializable,下面的示例代码是可以正常运行的:
  1. JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);
  2. jssc.checkpoint("/tmp/checkpoint_test");
  3. JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999);
  4. dStream.foreachRDD(rdd -> {
  5. if (rdd.count() == 0) {
  6. return null;
  7. }
  8. String first = rdd.first();
  9. NotSerializable notSerializable = new NotSerializable();
  10. notSerializable.doSomething(first);
  11. return null;
  12. }
  13. );
  14. // This code snippet is fine since the NotSerializable object
  15. // is declared and only used within the forEachRDD function.

阅读原文