你可以实现你自己的Observable操作符,本文展示怎么做。
如果你的操作符是被用于创造一个Observable,而不是变换或者响应一个Observable,使用 create( )
方法,不要试图手动实现 Observable
。另外,你可以按照下面的用法说明创建一个自定义的操作符。
如果你的操作符是用于Observable发射的单独的数据项,按照下面的说明做:Sequence Operators 。如果你的操作符是用于变换Observable发射的整个数据序列,按照这个说明做:Transformational Operators 。
提示: 在一个类似于Groovy的语言Xtend中,你可以以 extension methods 的方式实现你自己的操作符 ,不使用本文的方法,它们也可以链式调用。详情参见 RxJava and Xtend
下面的例子向你展示了怎样使用lift( )
操作符将你的自定义操作符(在这个例子中是 myOperator
)与标准的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});
下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与lift()
搭配使用。
将你的自定义操作符定义为实现了 Operator
接口的一个公开类, 就像这样:
public class MyOperator<T> implements Operator<T> {
public MyOperator( /* any necessary params here */ ) {
/* 这里添加必要的初始化代码 */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* 这里添加你自己的onCompleted行为,或者仅仅传递完成通知: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* 这里添加你自己的onError行为, 或者仅仅传递错误通知:*/
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* 这个例子对结果的每一项执行排序操作,然后返回这个结果 */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
下面的例子向你展示了怎样使用 compose( )
操作符将你得自定义操作符(在这个例子中,是一个名叫myTransformer
的操作符,它将一个发射整数的Observable转换为发射字符串的)与标准的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});
下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与compose()
搭配使用。
将你的自定义操作符定义为实现了 Transformer
接口的一个公开类,就像这样:
public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
public MyTransformer( /* any necessary params here */ ) {
/* 这里添加必要的初始化代码 */
}
@Override
public Observable<String> call(Observable<Integer> source) {
/*
* 这个简单的例子Transformer应用一个map操作,
* 这个map操作将发射整数变换为发射整数的字符串表示。
*/
return source.map( new Func1<Integer,String>() {
@Override
public String call(Integer t1) {
return String.valueOf(t1);
}
} );
}
}
Subscriber.isUnsubscribed( )
状态,如果没有订阅者了,没必要浪费时间生成数据项。onNext( )
方法任意次,但是这些调用必须是不重叠的。onCompleted( )
或 onError( )
正好一次,但不能都调用,而且不能在这之后调用订阅者的 onNext( )
方法。serialize( )
操作符,它会强制保持正确的行为。first( )
被定义为 take(1).single( )ignoreElements( )
被定义为 filter(alwaysFalse( ))reduce(a)
被定义为 scan(a).last( )onError()
通知订阅者。onError()
毫无意义,那样或者是无用的,或者只是对问题的妥协。你可以使用 Exceptions.throwIfFatal(throwable)
方法过滤掉这些致命的异常,并重新抛出它们,而不是试图发射关于它们的通知。null
可能是Observable发射的一个合法数据。频繁发生错误的一个来源是:测试一些变量并且将持有一个非 null
值作为是否发射了数据的替代。一个值为 null
的数据仍然是一个发射数据项,它与没有发射任何东西是不能等同的。