定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据
Window
和Buffer
类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted
通知。
和Buffer
一样,Window
有很多变体,每一种都以自己的方式将原始Observable分解为多个作为结果的Observable,每一个都包含一个映射原始数据的window
。用Window
操作符的术语描述就是,当一个窗口打开(when a window "opens")意味着一个新的Observable已经发射(产生)了,而且这个Observable开始发射来自原始Observable的数据;当一个窗口关闭(when a window "closes")意味着发射(产生)的Observable停止发射原始Observable的数据,并且发射终止通知onCompleted
给它的观察者们。
在RxJava中有许多种Window
操作符的变体。
window
的这个变体会立即打开它的第一个窗口。每当它观察到closingSelector
返回的Observable发射了一个对象时,它就关闭当前打开的窗口并立即打开一个新窗口。用这个方法,这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。
无论何时,只要window
观察到windowOpenings
这个Observable发射了一个Opening
对象,它就打开一个窗口,并且同时调用closingSelector
生成一个与那个窗口关联的关闭(closing)Observable。当这个关闭(closing)Observable发射了一个对象时,window
操作符就会关闭那个窗口。对这个变体来说,由于当前窗口的关闭和新窗口的打开是由单独的Observable管理的,它创建的窗口可能会存在重叠(重复某些来自原始Observable的数据)或间隙(丢弃某些来自原始Observable的数据)。
这个window
的变体立即打开它的第一个窗口。每当当前窗口发射了count
项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。
这个window
的变体立即打开它的第一个窗口。原始Observable每发射skip
项数据它就打开一个新窗口(例如,如果skip
等于3,每到第三项数据,它会打开一耳光新窗口)。每当当前窗口发射了count
项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。如果skip=count
,它的行为与window(source, count)
相同;如果skip < count
,窗口可会有count - skip
个重叠的数据;如果skip > count
,在两个窗口之间会有skip - count
项数据被丢弃。
这个window
的变体立即打开它的第一个窗口。每当过了timespan
这么长的时间它就关闭当前窗口并打开一个新窗口(时间单位是unit
,可选在调度器scheduler
上执行)。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是一一对应的。
这个window
的变体立即打开它的第一个窗口。这个变体是window(count)
和window(timespan, unit[, scheduler])
的结合,每当过了timespan
的时长或者当前窗口收到了count
项数据,它就关闭当前窗口并打开另一个。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是一一对应的。
buffer(timespan, timeshift, unit)
在每一个timeshift
时期内都创建一个新的List
,然后用原始Observable发射的每一项数据填充这个列表(在把这个List
当做自己的数据发射前,从创建时开始,直到过了timespan
这么长的时间)。如果timespan
长于timeshift
,它发射的数据包将会重叠,因此可能包含重复的数据项。
这个window
的变体立即打开它的第一个窗口。随后每当过了timeshift
的时长就打开一个新窗口(时间单位是unit
,可选在调度器scheduler
上执行),当窗口打开的时长达到timespan
,它就关闭当前打开的窗口。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。窗口的数据可能重叠也可能有间隙,取决于你设置的timeshift
和timespan
的值。
这个变体的window
默认在computation
调度器上执行它的定时器。
你可以使用Window
操作符实现反压backpressure
(意思是,处理这样一个Observable:它产生数据的数据可能比它的观察者消费数据的数据快)。
Window操作符可以将大量的数据序列缩减为较少的数据窗口序列,让它们更容易处理。例如,你可以按固定的时间间隔,定期关闭和发射来自一个爆发性Observable的数据窗口。
示例代码
Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
你还可以选择每当收到爆发性Observable的N项数据时发射一个新的数据窗口。
示例代码
Observable<Observable<Integer>> burstyWindowed = bursty.window(5);