6.13.1 自定义 ItemReader 示例

为了实现这个目的,我们实现一个简单的 ItemReader, 从给定的list中读取数据。 我们将实现最基本的 ItemReader 功能, read:

  1. public class CustomItemReader<T> implements ItemReader<T>{
  2. List<T> items;
  3. public CustomItemReader(List<T> items) {
  4. this.items = items;
  5. }
  6. public T read() throws Exception, UnexpectedInputException,
  7. NoWorkFoundException, ParseException {
  8. if (!items.isEmpty()) {
  9. return items.remove(0);
  10. }
  11. return null;
  12. }
  13. }

这是一个简单的类, 传入一个 items list, 每次读取时删除其中的一条并返回。 如果list里面没有内容,则将返回null, 从而满足 ItemReader 的基本要求, 测试代码如下所示:

  1. List<String> items = new ArrayList<String>();
  2. items.add("1");
  3. items.add("2");
  4. items.add("3");
  5. ItemReader itemReader = new CustomItemReader<String>(items);
  6. assertEquals("1", itemReader.read());
  7. assertEquals("2", itemReader.read());
  8. assertEquals("3", itemReader.read());
  9. assertNull(itemReader.read());

使 ItemReader 支持重启

现在剩下的问题就是让 ItemReader 变为可重启的。到目前这一步,如果发生掉电之类的情况,那么必须重新启动 ItemReader,而且是从头开始。在很多时候这是允许的,但有时侯更好的处理办法是让批处理作业在上次中断的地方重新开始。判断的关键是根据 reader 是有状态的还是无状态的。 无状态的 reader 不需要考虑重启的情况, 但有状态的则需要根据其最后一个已知的状态来重新启动。出于这些原因, 官方建议尽可能地让 reader 成为无状态的,使开发者不需要考虑重新启动的情况。

如果需要保存状态信息,那应该使用 ItemStream 接口:

  1. public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
  2. List<T> items;
  3. int currentIndex = 0;
  4. private static final String CURRENT_INDEX = "current.index";
  5. public CustomItemReader(List<T> items) {
  6. this.items = items;
  7. }
  8. public T read() throws Exception, UnexpectedInputException,
  9. ParseException {
  10. if (currentIndex < items.size()) {
  11. return items.get(currentIndex++);
  12. }
  13. return null;
  14. }
  15. public void open(ExecutionContext executionContext) throws ItemStreamException {
  16. if(executionContext.containsKey(CURRENT_INDEX)){
  17. currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
  18. }
  19. else{
  20. currentIndex = 0;
  21. }
  22. }
  23. public void update(ExecutionContext executionContext) throws ItemStreamException {
  24. executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
  25. }
  26. public void close() throws ItemStreamException {}
  27. }

每次调用 ItemStreamupdate 方法时, ItemReader 的当前 index 都会被保存到给定的 ExecutionContext 中,key 为 ‘current.index‘。 当调用 ItemStreamopen 方法时, ExecutionContext会检查是否包含该 key 对应的条目。 如果找到key, 那么当前索引 index 就好移动到该位置。这是一个相当简单的例子,但它仍然符合通用原则:

  1. ExecutionContext executionContext = new ExecutionContext();
  2. ((ItemStream)itemReader).open(executionContext);
  3. assertEquals("1", itemReader.read());
  4. ((ItemStream)itemReader).update(executionContext);
  5. List<String> items = new ArrayList<String>();
  6. items.add("1");
  7. items.add("2");
  8. items.add("3");
  9. itemReader = new CustomItemReader<String>(items);
  10. ((ItemStream)itemReader).open(executionContext);
  11. assertEquals("2", itemReader.read());

大多数ItemReaders具有更加复杂的重启逻辑。 例如 JdbcCursorItemReader , 存储了游标(Cursor)中最后所处理的行的 row id。

还值得注意的是 ExecutionContext 中使用的 key 不应该过于简单。这是因为 ExecutionContext 被一个 Step中的所有 ItemStreams 共用。在大多数情况下,使用类名加上 key 的方式应该就足以保证唯一性。然而,在极端情况下, 同一个类的多个 ItemStream 被用在同一个Step中时( 如需要输出两个文件的情况),就需要更加具备唯一性的name标识。出于这个原因,Spring Batch 的许多ItemReaderItemWriter 实现都有一个 setName() 方法, 允许覆盖默认的 key name。