学习目的
- 知道 rxswift 都有什么
- 知道他们都怎么用
- 知道为什么有这些东西
- 根类归纳整理
- 如何自己扩展
- 适当分析源码
主要概念
- 可监听序列(信号)
- operator(信号变换&组合)
- observer(观察者)
- 订阅(绑定)
- Disposable取消订阅
如何创建信号
可监听序列
event
1 | public enum Event<Element> { |
1 | let numbers: Observable<Int> = Observable.create { observer -> Disposable in |
特征序列
Single
1 | public enum SingleEvent<Element> { |
- 发出⼀个元素,或⼀个error 事件
- 不会共享附加作⽤
⼀个⽐较常⻅的例⼦就是执⾏ HTTP 请求,然后返回⼀个应答或错误。不过你也可以⽤ Single 来描述任何只有⼀个元素的序列。
1 | func getRepo(_ repo: String) -> Single<[String: Any]> { |
可以对 Observable 调⽤ .asSingle() ⽅法,将它转换为 Single。
Completable
- 没有 next 时间
- 发出 completed 或error 事件
- 不会共享附加作
Completable 适⽤于那种你只关⼼任务是否完成,⽽不需要在意任务返回值的情况。它和 Observable
1 | func cacheLocally() -> Completable { |
Maybe
- 发出⼀个next,completed或error 事件
- 不会共享附加作⽤
如果你遇到那种可能需要发出⼀个元素,⼜可能不需要发出时,就可以使⽤ Maybe。
1 | func generateString() -> Maybe<String> { |
以对 Observable 调⽤ .asMaybe() ⽅法,将它转换为 Maybe。
Driver
主要是为了简化 UI 层的代码
- 不会产⽣ error 事件
- ⼀定在 MainScheduler 监听(主线程监听)
- 共享附加作⽤
使用 driver 的原因
1 | let results = query.rx.text |
使用 drive
1 | let results = query.rx.text.asDriver() // 将ControlProperty 转换为 Driver |
Signal
Signal 和 Driver 相似,唯⼀的区别是,Driver 会对新观察者回放(重新发送)上⼀个元素,⽽ Signal 不会对新观察者回放上⼀个元素。
- 不会产⽣ error 事件
- ⼀定在 MainScheduler 监听(主线程监听)
- 共享附加作⽤
⼀般情况下状态序列我们会选⽤ Driver 这个类型,事件序列我们会选⽤ Signal 这个类型。
ControlEvent
ControlEvent 专⻔⽤于描述 UI 控件所产⽣的事件,它具有以下特征:
- 不会产⽣ error 事件
- ⼀定在 MainScheduler
- ⼀定在 MainScheduler 订阅(主线程订阅) 监听(主线程监听)
- 共享附加作⽤
观察者 AnyObserver
特征观察者 Binder
- 不会处理错误事件
- 确保绑定都是在给定 Scheduler 上执⾏(默认 MainScheduler)
- 只处理 next 事件
在介绍 AnyObserver 时,我们举了这样⼀个例⼦:
1 | let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in |
由于这个观察者是⼀个 UI 观察者,所以它在响应事件时,只会处理 next 事件,并且更新 UI 的操作需要在主线程上执⾏。
因此⼀个更好的⽅案就是使⽤ Binder:
1 | let observer: Binder<Bool> = Binder(usernameValidOutlet) { (view, isHidden) in |
Binder 可以只处理 next 事件,并且保证响应 next 事件的代码一定会在给定 Scheduler 上执⾏,这⾥采⽤默认的 MainScheduler。
- 复用
由于⻚⾯是否隐藏是⼀个常⽤的观察者,所以应该让所有的 UIView 都提供这种观察者:
1 | extension Reactive where Base: UIView { |
即使观察序列又是观察者
AsyncSubject
PublishSubject
ReplaySubject
BehaviorSubject
ControlProperty
RxRelay
PublishRelay
PublishRelay 就是 PublishSubject 去掉终⽌事件 onError 或 onCompleted 。
BehaviorRelay
BehaviorRelay 就是 BehaviorSubject 去掉终⽌事件 演示 onError 或 onCompleted 。
如何选择operator
Observable 创建
- 产⽣特定的⼀个元素:just
- 经过⼀段延时:timer
- 从⼀个序列拉取元素:from
- 重复的产⽣某⼀个元素:repeatElement
- 存在⾃定义逻辑:create
- 每次订阅时产⽣:deferred
- 每隔⼀段时间,发出⼀个元素:interval
- 在⼀段延时后:timer
- ⼀个空序列,只有⼀个完成事件:empty
- ⼀个任何事件都没有产⽣的序列:never
Observable组合
- 任意⼀个 Observable 产⽣了元素,就发出这个元素:merge
- 当上⼀个 Observable 才能开始发出元素:concat
- 组合多个 Observables 的元素
- 当每⼀个 Observable 都发出⼀个新的元素:zip
- 当任意⼀个 Observable 发出⼀个新的元素:combineLatest
Observable转换
- 对每个元素直接转换:map
- 转换到另⼀个 Observable :flatMap
- 只接收最新的元素转换的 Observable 所产⽣的元素:flatMapLatest
- 每⼀个元素转换的 Observable 按顺序产⽣元素:concatMap
- 基于所有遍历过的元素:scan
基于时间序
- 拖延⼀段时间后再发出:delay
想要将产⽣的事件封装成元素发送出来
- 将他们封装成 Event
:materialize - 然后解封出来:dematerialize
基于数量的操作
忽略掉所有的 next 事件,只接收 completed 和 error 事件:ignoreElements
创建⼀个新的 Observable 在原有的序列前⾯加⼊⼀些元素:startWith
我想从 Observable 中收集元素,缓存这些元素之后在发出:buffer
我想将 Observable 拆分成多个 Observables :window
- 基于元素的共同特征:groupBy
我想只接收 Observable 中特定的元素
- 发出唯⼀的元素:single
我想重新从 Observable 中发出某些元素
- 通过判定条件过滤出⼀些元素:filter
- 仅发出头⼏个元素:take
- 仅发出尾部的⼏个元素:takeLast
- 仅仅发出第 n 个元素:elementAt
跳过头⼏个元素
- 跳过头 n 个元素:skip
- 跳过头⼏个满⾜判定的元素:skipWhile,skipWhileWithIndex
- 跳过某段时间内产⽣的头⼏个元素:skip
- 跳过头⼏个元素直到另⼀个 Observable 发出⼀个元素:skipUntil
只取头⼏个元素
- 只取头⼏个满⾜判定的元素:takeWhile,takeWhileWithIndex
- 只取某段时间内产⽣的头⼏个元素:take
- 只取头⼏个元素直到另⼀个 Observable 发出⼀个元素:takeUntil
周期性的对 Observable 抽样:sample
发出那些元素,这些元素产⽣后的特定的时间内,没有新的元素产⽣:debounce
直到元素的值发⽣变化,才发出新的元素:distinctUntilChanged
- 并提供元素是否相等的判定函数:distinctUntilChanged
在开始发出元素时,延时后进⾏订阅:delaySubscription
我想要从⼀些 Observables 中,只取第⼀个产⽣元素的 Observable :amb
我想评估 Observable 的全部元素
- 并且对每个元素应⽤聚合⽅法,待所有元素都应⽤聚合⽅法后,发出结果:reduce
- 并且对每个元素应⽤聚合⽅法,每次应⽤聚合⽅法后,发出结果:scan
我想在某个 Scheduler 应⽤操作符:subscribeOn
- 在某个 Scheduler 监听:observeOn
我想要 Observable 发⽣某个事件时, 采取某个⾏动:do
我想要 Observable 发出⼀个 error 事件:error
如果规定时间内没有产⽣元素:timeout
我想要 Observable 发⽣错误时,优雅的恢复
- 如果规定时间内没有产⽣元素,就切换到备选 Observable :timeout
- 如果产⽣错误,将错误替换成某个元素 :catchErrorJustReturn
- 如果产⽣错误,就切换到备选 Observable :catchError
- 如果产⽣错误,就重试 :retry
我创建⼀个 Disposable 资源,使它与 Observable 具有相同的寿命:using
我创建⼀个 Observable ,直到我通知它可以产⽣元素后,才能产⽣元素:publish
- 并且,就算是在产⽣元素后订阅,也要发出全部元素:replay
- 并且,⼀旦所有观察者取消观察,他就被释放掉:refCount
- 通知它可以产⽣元素了:connect
动态信号
1 | RACSignal *s = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { |
Cocoa 桥接
1 | - (void) bridge { |
信号变化(内部是产生一个新信号的)
1 | [_sigClick map:^id(id value) { |
序列转换
1 | self.sigSequence = [[RACSequence return:@3] concat:[RACSequence return:@4]].signal; |
订阅(绑定)信号的方式
直接订阅方法
1 | [self.sigSample |
绑定
1 | RAC(self, a) = Signal; |
Cocoa 桥接
1 | - (void)signal { |
信号变化&组合
- 单个信号的变化
- 多个信号的组合
- 高阶操作
值操作
问题:
- 为什么会有这样的值操作方法?
- 自己如何扩展新的值方法
- transform 这些用的比较多
- map
- MapReplace
- ReduceEach tuple(a, b) -> c
- 值判断逻辑变换
- not
- and
- or
- 用的比较少
- reduceApply 这个不太清楚为什么要这么设计,用combineLatest: reduceEach: 就可以做了,而且代码看起来更好。
- materialize
- dematerialize
数量操作
- repeat 一直会有值
条件过滤1
- ignore
- ignoreValues
- distinctUntilChanged
条件过滤2
- takeUntilBlock:(BOOL (^)(id x))predicate
- takeWhileBlock:(BOOL (^)(id x))predicate;
- skipUntilBlock:(BOOL (^)(id x))predicate;
- skipWhileBlock:(BOOL (^)(id x))predicate;
数量判断,如果有值就发送
- any;
- any:(BOOL (^)(id object))predicateBlock;
- all:(BOOL (^)(id object))predicateBlock;
重试
- retry
- retry: Count
- collect
汇聚
信号必须有返回值
副作用:
– 对于信号值变化以外的一些操作
- doNext
- doCompleted
- doError
折叠函数
不听对一个value 操作,使用折叠函数解决这个问题
1 | [sig10 aggregateWithStart:@0 reduce:^id(id running, id next) { |
- (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock;
- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id running, id next, NSUInteger index))reduceBlock;
- (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock;
- (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock;
- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id running, id next, NSUInteger index))reduceBlock;
时间操作
- + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler;
- + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway;
- delay
- throttle 阀门,在固定时间内没有新值发送的时候,会发送最后的值
多个信号组合
问题:
- 受哪个信号终止而终止?
- 错误传递?
- 各个信号何时开始开始订阅?
- 在哪个线程发出?
- concat
- 第一个结束后,订阅第二个
- 第一个error 后,就直接 error
- merge
- zip
- combineLatest
- sample
- takeUntil
- takeUntilReplacement, 当 B 来了直接替换 A,开始订阅 B
信号的高阶操作(升阶降阶)
- 升阶 S(v) -> S(s(v))
- 降阶 S(s(v)) -> S(v)
1 | RACSignal *signal = @[@1, @2, @3, @4].rac_sequence.signal; |
降阶操作
switchToLatests
flatten 类似 merge 只不过一个是接收的 value是 signal,另一个接收的就是 value
- flatten:count 按个数展开信号,当信号个数 > count 以后等待,如果有 sig completed,那么把等待中的sig 放入展开数组里面
flatten:1 == concat
- flattenMap
满足 monad 的部分定义,绝大部分函数都可以使用 flattenMap 实现
- bind
大部分函数都可以使用 bind 实现
冷信号&热信号
一些习题
- 如何获得无限递增的信号
1 | RACSignal *increment(int inc) { |
- fibonacci
1 | RACSignal *fibonacci() { |