创建操作

Create

使用一个函数从头开始创建一个Observable

create

你可以使用Create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable—恰当的调用观察者的onNext,onError和onCompleted方法。

一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法。

create

RxJava将这个操作符实现为 create 方法。

建议你在传递给create方法的函数中检查观察者的isUnsubscribed状态,以便在没有观察者的时候,让你的Observable停止发射数据或者做昂贵的运算。

示例代码:

  1. Observable.create(new Observable.OnSubscribe<Integer>() {
  2. @Override
  3. public void call(Subscriber<? super Integer> observer) {
  4. try {
  5. if (!observer.isUnsubscribed()) {
  6. for (int i = 1; i < 5; i++) {
  7. observer.onNext(i);
  8. }
  9. observer.onCompleted();
  10. }
  11. } catch (Exception e) {
  12. observer.onError(e);
  13. }
  14. }
  15. } ).subscribe(new Subscriber<Integer>() {
  16. @Override
  17. public void onNext(Integer item) {
  18. System.out.println("Next: " + item);
  19. }
  20. @Override
  21. public void onError(Throwable error) {
  22. System.err.println("Error: " + error.getMessage());
  23. }
  24. @Override
  25. public void onCompleted() {
  26. System.out.println("Sequence complete.");
  27. }
  28. });

输出:

  1. Next: 1
  2. Next: 2
  3. Next: 3
  4. Next: 4
  5. Sequence complete.

create方法默认不在任何特定的调度器上执行。