6.3 ItemProcessor

ItemReaderItemWriter 接口对于每个任务来说都是非常必要的, 但如果想要在写出数据之前执行某些业务逻辑操作时要怎么办呢?
一个选择是对读取(reading)和写入(writing)使用组合模式(composite pattern): 创建一个 ItemWriter 的子类实现, 内部包含另一个 ItemWriter 对象的引用(对于 ItemReader 也是类似的). 示例如下:

  1. public class CompositeItemWriter<T> implements ItemWriter<T> {
  2. ItemWriter<T> itemWriter;
  3. public CompositeItemWriter(ItemWriter<T> itemWriter) {
  4. this.itemWriter = itemWriter;
  5. }
  6. public void write(List<? extends T> items) throws Exception {
  7. // ... 此处可以执行某些业务逻辑
  8. itemWriter.write(item);
  9. }
  10. public void setDelegate(ItemWriter<T> itemWriter){
  11. this.itemWriter = itemWriter;
  12. }
  13. }

上面的类中包含了另一个ItemWriter引用,通过代理它来实现某些业务逻辑。 这种模式对于 ItemReader 也是一样的道理, 但也可能持有内部 ItemReader 所拥有的多个数据输入对象的引用。 在ItemWriter中如果我们想要自己控制 write 的调用也可能需要持有其他引用。

但假如我们只想在对象实际被写入之前 “改造” 一下传入的item, 就没必要实现ItemWriter和执行 write 操作: 我们只需要这个将被修改的item对象而已。 对于这种情况, Spring Batch提供了 ItemProcessor 接口:

  1. public interface ItemProcessor<I, O> {
  2. O process(I item) throws Exception;
  3. }

ItemProcessor 非常简单; 传入一个对象,对其进行某些处理/转换,然后返回另一个对象(也可以是同一个)。 传入的对象和返回的对象类型可以一样,也可以不一致。 关键点在于处理过程中可以执行一些业务逻辑操作,当然这完全取决于开发者怎么实现它。 一个ItemProcessor可以被直接关联到某个 Step(步骤),例如,假设ItemReader 的返回类型是 Foo 【译者注: Foo, Bar一类的词 就和BalaBala一样,没什么实际意义】,而在写出之前需要将其转换成类型Bar 的对象。 就可以编写一个 ItemProcessor来执行这种转换:

  1. public class Foo {}
  2. public class Bar {
  3. public Bar(Foo foo) {}
  4. }
  5. public class FooProcessor implements ItemProcessor<Foo,Bar>{
  6. public Bar process(Foo foo) throws Exception {
  7. //执行某些操作,将 Foo 转换为 Bar对象
  8. return new Bar(foo);
  9. }
  10. }
  11. public class BarWriter implements ItemWriter<Bar>{
  12. public void write(List<? extends Bar> bars) throws Exception {
  13. //write bars
  14. }
  15. }

在上面的简单示例中,有两个类: FooBar, 以及实现了 ItemProcessor 接口的FooProcessor类。 因为是demo,所以转换很简单, 在实际使用中可能执行转换为任何类型, 响应的操作请读者根据需要自己编写。 BarWriter将被用于写出Bar对象,如果传入其他类型的对象可能会抛出异常。 同样,如果 FooProcessor 传入的参数不是 Foo 也会抛出异常。FooProcessor可以注入到某个Step中:

  1. <job id="ioSampleJob">
  2. <step name="step1">
  3. <tasklet>
  4. <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
  5. commit-interval="2"/>
  6. </tasklet>
  7. </step>
  8. </job>

6.3.1 Chaining ItemProcessors

在很多情况下执行单个转换就可以了, 但假如想要将多个 ItemProcessors “串联(chain)” 在一起要怎么实现呢? 我们可以使用前面提到的组合模式(composite pattern)来完成。 接着前面单一转换的示例, 我们将Foo转换为Bar,然后再转换为Foobar类型,并执行写出:

  1. public class Foo {}
  2. public class Bar {
  3. public Bar(Foo foo) {}
  4. }
  5. public class Foobar{
  6. public Foobar(Bar bar) {}
  7. }
  8. public class FooProcessor implements ItemProcessor<Foo,Bar>{
  9. public Bar process(Foo foo) throws Exception {
  10. //Perform simple transformation, convert a Foo to a Bar
  11. return new Bar(foo);
  12. }
  13. }
  14. public class BarProcessor implements ItemProcessor<Bar,FooBar>{
  15. public FooBar process(Bar bar) throws Exception {
  16. return new Foobar(bar);
  17. }
  18. }
  19. public class FoobarWriter implements ItemWriter<FooBar>{
  20. public void write(List<? extends FooBar> items) throws Exception {
  21. //write items
  22. }
  23. }

可以将 FooProcessorBarProcessor “串联”在一起来生成 Foobar 对象,如果用 Java代码表示,那就像下面这样:

  1. CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>();
  2. List itemProcessors = new ArrayList();
  3. itemProcessors.add(new FooTransformer());
  4. itemProcessors.add(new BarTransformer());
  5. compositeProcessor.setDelegates(itemProcessors);

就和前面的示例类似,复合处理器也可以配置到Step中:

  1. <job id="ioSampleJob">
  2. <step name="step1">
  3. <tasklet>
  4. <chunk reader="fooReader" processor="compositeProcessor" writer="foobarWriter"
  5. commit-interval="2"/>
  6. </tasklet>
  7. </step>
  8. </job>
  9. <bean id="compositeItemProcessor"
  10. class="org.springframework.batch.item.support.CompositeItemProcessor">
  11. <property name="delegates">
  12. <list>
  13. <bean class="..FooProcessor" />
  14. <bean class="..BarProcessor" />
  15. </list>
  16. </property>
  17. </bean>

6.3.2 Filtering Records

item processor 的典型应用就是在数据传给ItemWriter之前进行过滤(filter out)。 过滤(Filtering)是一种有别于跳过(skipping)的行为; skipping表明某几行记录是无效的,而 filtering 则只是表明某条记录不应该写入(written)。

例如, 某个批处理作业,从一个文件中读取三种不同类型的记录: 准备 insert 的记录、准备 update 的记录,需要 delete 的记录。 如果系统中不允许删除记录, 那么我们肯定不希望将 “delete” 类型的记录传递给 ItemWriter。 但因为这些记录又不是损坏的信息(bad records), 我们只想将其过滤掉,而不是跳过。 因此,ItemWriter只会收到 “insert” 和 “update”的记录。

要过滤某条记录, 只需要 ItemProcessor 返回“null” 即可. 框架将自动检测结果为“null”的情况, 不会将该item 添加到传给ItemWriter的list中。 像往常一样, 在 ItemProcessor 中抛出异常将会导致跳过(skip)。

6.3.3 容错(Fault Tolerance)

当某一个分块回滚时, 读取后已被缓存的那些item可能会被重新处理。 如果一个step被配置为支持容错(通常使用 skip跳过 或 retry重试处理),使用的所有 ItemProcessor 都应该实现为幂等的(idempotent)。 通常ItemProcessor对已经处理过的输入数据不执行任何修改, 而只更新需要处理的实例。