RxSwift 学习笔记

文章目录
  1. 1. 学习目的
  2. 2. 主要概念
  3. 3. 如何创建信号
    1. 3.1. 可监听序列
    2. 3.2. 特征序列
      1. 3.2.1. Single
      2. 3.2.2. Completable
      3. 3.2.3. Maybe
      4. 3.2.4. Driver
      5. 3.2.5. Signal
      6. 3.2.6. ControlEvent
    3. 3.3. 观察者 AnyObserver
      1. 3.3.1. 特征观察者 Binder
    4. 3.4. 即使观察序列又是观察者
      1. 3.4.1. AsyncSubject
      2. 3.4.2. PublishSubject
      3. 3.4.3. ReplaySubject
      4. 3.4.4. BehaviorSubject
      5. 3.4.5. ControlProperty
      6. 3.4.6. RxRelay
      7. 3.4.7. PublishRelay
      8. 3.4.8. BehaviorRelay
    5. 3.5. 如何选择operator
    6. 3.6. 动态信号
    7. 3.7. Cocoa 桥接
    8. 3.8. 信号变化(内部是产生一个新信号的)
    9. 3.9. 序列转换
  4. 4. 订阅(绑定)信号的方式
    1. 4.1. 直接订阅方法
    2. 4.2. 绑定
    3. 4.3. Cocoa 桥接
  5. 5. 信号变化&组合
    1. 5.1. 值操作
    2. 5.2. 数量操作
    3. 5.3. 时间操作
    4. 5.4. 多个信号组合
    5. 5.5. 信号的高阶操作(升阶降阶)
  6. 6. 冷信号&热信号
  7. 7. 一些习题

学习目的

  1. 知道 rxswift 都有什么
  2. 知道他们都怎么用
  3. 知道为什么有这些东西
  4. 根类归纳整理
  5. 如何自己扩展
  6. 适当分析源码

主要概念

  1. 可监听序列(信号)
  2. operator(信号变换&组合)
  3. observer(观察者)
  4. 订阅(绑定)
  5. Disposable取消订阅

如何创建信号

可监听序列

event

1
2
3
4
5
public enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
let numbers: Observable<Int> = Observable.create { observer -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}

Observable<Int>.just(one)
Observable.of(one, two, three)
Observable.from([one, two, three])
Observable<Void>.empty()
Observable<Any>.never()
Observable<Int>.range(start: 1, count: 10)
Observable<String>.create

特征序列

Single

1
2
3
4
public enum SingleEvent<Element> { 
case success(Element)
case error(Swift.Error)
}
  • 发出⼀个元素,或⼀个error 事件
  • 不会共享附加作⽤

⼀个⽐较常⻅的例⼦就是执⾏ HTTP 请求,然后返回⼀个应答或错误。不过你也可以⽤ Single 来描述任何只有⼀个元素的序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func getRepo(_ repo: String) -> Single<[String: Any]> {
return Single<[String: Any]>.create { single in
let url = URL(string: "https://api.github.com/repos/\(repo)")!
let task = URLSession.shared.dataTask(with: url) { data, _, error in
if let error = error {
single(.error(error))
return
}

guard let data = data,
let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
let result = json as? [String: Any] else {
single(.error(DataError.cantParseJSON))
return
}
single(.success(result))
}
task.resume()
return Disposables.create { task.cancel() }
}
}

可以对 Observable 调⽤ .asSingle() ⽅法,将它转换为 Single。

Completable

  • 没有 next 时间
  • 发出 completed 或error 事件
  • 不会共享附加作

Completable 适⽤于那种你只关⼼任务是否完成,⽽不需要在意任务返回值的情况。它和 Observable有点相似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func cacheLocally() -> Completable {
return Completable.create { completable in
// Store some data locally
...
...
guard success else {
completable(.error(CacheError.failedCaching))
return Disposables.create {}
}

completable(.completed)
return Disposables.create {}
}
}

Maybe

  • 发出⼀个next,completed或error 事件
  • 不会共享附加作⽤

如果你遇到那种可能需要发出⼀个元素,⼜可能不需要发出时,就可以使⽤ Maybe。

1
2
3
4
5
6
7
8
9
10
func generateString() -> Maybe<String> { 
return Maybe<String>.create { maybe in
maybe(.success("RxSwift"))
// OR
maybe(.completed)
// OR
maybe(.error(error))
return Disposables.create {}
}
}

以对 Observable 调⽤ .asMaybe() ⽅法,将它转换为 Maybe。

Driver

主要是为了简化 UI 层的代码

  • 不会产⽣ error 事件
  • ⼀定在 MainScheduler 监听(主线程监听)
  • 共享附加作⽤

使用 driver 的原因

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
let results = query.rx.text 
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
.observeOn(MainScheduler.instance) // 结果在主线程返回
.catchErrorJustReturn([]) // 错误被处理了,这样⾄少不会终⽌整个序列
}
.share(replay: 1)

// HTTP 请求是被共享的
results
.map { "\($0.count)" }
.bind(to: resultCount.rx.text)
.disposed(by: disposeBag)

results
.bind(to: resultsTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.disposed(by: disposeBag)

使用 drive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let results = query.rx.text.asDriver() // 将ControlProperty 转换为 Driver
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
.asDriver(onErrorJustReturn: []) // 仅仅提供发⽣错误时的备选返回值
}

results
.map { "\($0.count)" }
.drive(resultCount.rx.text) // 这⾥改⽤ `drive` ⽽不是 `bindTo`
.disposed(by: disposeBag) // 这样可以确保必备条件都已经满⾜了

results
.drive(resultsTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.disposed(by: disposeBag)

Signal

Signal 和 Driver 相似,唯⼀的区别是,Driver 会对新观察者回放(重新发送)上⼀个元素,⽽ Signal 不会对新观察者回放上⼀个元素。

  • 不会产⽣ error 事件
  • ⼀定在 MainScheduler 监听(主线程监听)
  • 共享附加作⽤

⼀般情况下状态序列我们会选⽤ Driver 这个类型,事件序列我们会选⽤ Signal 这个类型。

ControlEvent

ControlEvent 专⻔⽤于描述 UI 控件所产⽣的事件,它具有以下特征:

  • 不会产⽣ error 事件
  • ⼀定在 MainScheduler
  • ⼀定在 MainScheduler 订阅(主线程订阅) 监听(主线程监听)
  • 共享附加作⽤

观察者 AnyObserver

特征观察者 Binder

  • 不会处理错误事件
  • 确保绑定都是在给定 Scheduler 上执⾏(默认 MainScheduler)
  • 只处理 next 事件

在介绍 AnyObserver 时,我们举了这样⼀个例⼦:

1
2
3
4
5
6
7
8
9
10
11
12
let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in 
switch event {
case .next(let isHidden):
self?.usernameValidOutlet.isHidden = isHidden
default:
break
}
}

usernameValid
.bind(to: observer)
.disposed(by: disposeBag)

由于这个观察者是⼀个 UI 观察者,所以它在响应事件时,只会处理 next 事件,并且更新 UI 的操作需要在主线程上执⾏。
因此⼀个更好的⽅案就是使⽤ Binder:

1
2
3
4
5
6
let observer: Binder<Bool> = Binder(usernameValidOutlet) { (view, isHidden) in
view.isHidden = isHidden
}
usernameValid
.bind(to: observer)
.disposed(by: disposeBag)

Binder 可以只处理 next 事件,并且保证响应 next 事件的代码一定会在给定 Scheduler 上执⾏,这⾥采⽤默认的 MainScheduler。

  • 复用

由于⻚⾯是否隐藏是⼀个常⽤的观察者,所以应该让所有的 UIView 都提供这种观察者:

1
2
3
4
5
6
7
8
9
10
11
12
extension Reactive where Base: UIView { 
public var isHidden: Binder<Bool> {
return Binder(self.base) { view, hidden in
view.isHidden = hidden
}
}
}

//-----
usernameValid
.bind(to: usernameValidOutlet.rx.isHidden)
.disposed(by: disposeBag)

即使观察序列又是观察者

AsyncSubject

PublishSubject

ReplaySubject

BehaviorSubject

ControlProperty

RxRelay

PublishRelay

PublishRelay 就是 PublishSubject 去掉终⽌事件 onError 或 onCompleted 。

BehaviorRelay

BehaviorRelay 就是 BehaviorSubject 去掉终⽌事件 演示 onError 或 onCompleted 。

如何选择operator

Observable 创建

  • 产⽣特定的⼀个元素:just
    • 经过⼀段延时:timer
  • 从⼀个序列拉取元素:from
  • 重复的产⽣某⼀个元素:repeatElement
  • 存在⾃定义逻辑:create
  • 每次订阅时产⽣:deferred
  • 每隔⼀段时间,发出⼀个元素:interval
    • 在⼀段延时后:timer
  • ⼀个空序列,只有⼀个完成事件:empty
  • ⼀个任何事件都没有产⽣的序列:never

Observable组合

  • 任意⼀个 Observable 产⽣了元素,就发出这个元素:merge
  • 当上⼀个 Observable 才能开始发出元素:concat
  • 组合多个 Observables 的元素
    • 当每⼀个 Observable 都发出⼀个新的元素:zip
    • 当任意⼀个 Observable 发出⼀个新的元素:combineLatest

Observable转换

  • 对每个元素直接转换:map
  • 转换到另⼀个 Observable :flatMap
    • 只接收最新的元素转换的 Observable 所产⽣的元素:flatMapLatest
    • 每⼀个元素转换的 Observable 按顺序产⽣元素:concatMap
  • 基于所有遍历过的元素:scan

基于时间序

  • 拖延⼀段时间后再发出:delay

想要将产⽣的事件封装成元素发送出来

  • 将他们封装成 Event:materialize
  • 然后解封出来:dematerialize

基于数量的操作

  • 忽略掉所有的 next 事件,只接收 completed 和 error 事件:ignoreElements

  • 创建⼀个新的 Observable 在原有的序列前⾯加⼊⼀些元素:startWith

  • 我想从 Observable 中收集元素,缓存这些元素之后在发出:buffer

  • 我想将 Observable 拆分成多个 Observables :window

    • 基于元素的共同特征:groupBy
  • 我想只接收 Observable 中特定的元素

    • 发出唯⼀的元素:single
  • 我想重新从 Observable 中发出某些元素

    • 通过判定条件过滤出⼀些元素:filter
    • 仅发出头⼏个元素:take
    • 仅发出尾部的⼏个元素:takeLast
    • 仅仅发出第 n 个元素:elementAt
  • 跳过头⼏个元素

    • 跳过头 n 个元素:skip
    • 跳过头⼏个满⾜判定的元素:skipWhile,skipWhileWithIndex
    • 跳过某段时间内产⽣的头⼏个元素:skip
    • 跳过头⼏个元素直到另⼀个 Observable 发出⼀个元素:skipUntil
  • 只取头⼏个元素

    • 只取头⼏个满⾜判定的元素:takeWhile,takeWhileWithIndex
    • 只取某段时间内产⽣的头⼏个元素:take
    • 只取头⼏个元素直到另⼀个 Observable 发出⼀个元素:takeUntil
  • 周期性的对 Observable 抽样:sample

  • 发出那些元素,这些元素产⽣后的特定的时间内,没有新的元素产⽣:debounce

  • 直到元素的值发⽣变化,才发出新的元素:distinctUntilChanged

    • 并提供元素是否相等的判定函数:distinctUntilChanged
  • 在开始发出元素时,延时后进⾏订阅:delaySubscription

  • 我想要从⼀些 Observables 中,只取第⼀个产⽣元素的 Observable :amb

  • 我想评估 Observable 的全部元素

    • 并且对每个元素应⽤聚合⽅法,待所有元素都应⽤聚合⽅法后,发出结果:reduce
    • 并且对每个元素应⽤聚合⽅法,每次应⽤聚合⽅法后,发出结果:scan
  • 我想在某个 Scheduler 应⽤操作符:subscribeOn

    • 在某个 Scheduler 监听:observeOn
  • 我想要 Observable 发⽣某个事件时, 采取某个⾏动:do

  • 我想要 Observable 发出⼀个 error 事件:error

  • 如果规定时间内没有产⽣元素:timeout

  • 我想要 Observable 发⽣错误时,优雅的恢复

    • 如果规定时间内没有产⽣元素,就切换到备选 Observable :timeout
    • 如果产⽣错误,将错误替换成某个元素 :catchErrorJustReturn
    • 如果产⽣错误,就切换到备选 Observable :catchError
    • 如果产⽣错误,就重试 :retry
  • 我创建⼀个 Disposable 资源,使它与 Observable 具有相同的寿命:using

  • 我创建⼀个 Observable ,直到我通知它可以产⽣元素后,才能产⽣元素:publish

    • 并且,就算是在产⽣元素后订阅,也要发出全部元素:replay
    • 并且,⼀旦所有观察者取消观察,他就被释放掉:refCount
    • 通知它可以产⽣元素了:connect

动态信号

1
2
3
4
5
6
7
RACSignal *s = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{

}];
}];

Cocoa 桥接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
- (void) bridge {
RACSignal *sO = RACObserver(self, button);

self.sigSetFrame = [self.button rac_signalForSelector:@selector(setFrame:)];
[_sigSetFrame
subscribeNext:^(id x) {
NSLog(@"setFrame:%@", x);
}];

self.sigClick = [self.button rac_signalForControlEvents:UIControlEventTouchUpInside];
[_sigClick
subscribeNext:^(id x) {
NSLog(@"event: %@", x);
}];

[[self rac_liftSelector:@selector(lift:) withSignals:_sigClick, nil]
subscribeNext:^(id x) {
NSLog(@"%@", x);
}];

[[self rac_liftSelector:@selector(lift:) withSignalsFromArray:@[[_sigClick map:^id(id value) {
return @[@3];
}]]]
subscribeNext:^(id x) {
NSLog(@"%@", x);
}];

[self rac_liftSelector:@selector(lift:) withSignalOfArguments:[_sigClick mapReplace:RACTuplePack(@1)]];
}

- (int) lift:(id)value{
printf("lift: %s", __func__);
return 1;
}

信号变化(内部是产生一个新信号的)

1
2
3
[_sigClick map:^id(id value) {
return @[@3];
}];

序列转换

1
self.sigSequence = [[RACSequence return:@3] concat:[RACSequence return:@4]].signal;

订阅(绑定)信号的方式

直接订阅方法

1
2
3
4
[self.sigSample
subscribeNext:^(id x) {
NSLog(@"sample: %@", x);
}];

绑定

1
RAC(self, a) = Signal;

Cocoa 桥接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- (void)signal {
[[self rac_liftSelector:@selector(lift:) withSignals:_sigClick, nil]
subscribeNext:^(id x) {
NSLog(@"%@", x);
}];

[[self rac_liftSelector:@selector(lift:) withSignalsFromArray:@[[_sigClick map:^id(id value) {
return @[@3];
}]]]
subscribeNext:^(id x) {
NSLog(@"%@", x);
}];

[self rac_liftSelector:@selector(lift:) withSignalOfArguments:[_sigClick mapReplace:RACTuplePack(@1)]];
}

- (int) lift:(id)value{
printf("lift: %s", __func__);
return 1;
}

信号变化&组合

  1. 单个信号的变化
  2. 多个信号的组合
  3. 高阶操作

signal 变化

值操作

问题:

  • 为什么会有这样的值操作方法?
  • 自己如何扩展新的值方法

  • transform 这些用的比较多
    1. map
    2. MapReplace
    3. ReduceEach tuple(a, b) -> c
  • 值判断逻辑变换
    1. not
    2. and
    3. or
  • 用的比较少
    1. reduceApply 这个不太清楚为什么要这么设计,用combineLatest: reduceEach: 就可以做了,而且代码看起来更好。
    2. materialize
    3. dematerialize

数量操作

  1. repeat 一直会有值
  • 条件过滤1

    1. ignore
    2. ignoreValues
    3. distinctUntilChanged
  • 条件过滤2

    1. takeUntilBlock:(BOOL (^)(id x))predicate
    2. takeWhileBlock:(BOOL (^)(id x))predicate;
    3. skipUntilBlock:(BOOL (^)(id x))predicate;
    4. skipWhileBlock:(BOOL (^)(id x))predicate;
  • 数量判断,如果有值就发送

    1. any;
    2. any:(BOOL (^)(id object))predicateBlock;
    3. all:(BOOL (^)(id object))predicateBlock;
  • 重试

    1. retry
    2. retry: Count
    3. collect 汇聚 信号必须有返回值

副作用:
– 对于信号值变化以外的一些操作

  • doNext
  • doCompleted
  • doError

折叠函数

不听对一个value 操作,使用折叠函数解决这个问题

1
2
3
[sig10 aggregateWithStart:@0 reduce:^id(id running, id next) {
return @([running intValue] + [next intValue]);
}];
  1. (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock;
  2. (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id running, id next, NSUInteger index))reduceBlock;
  3. (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock;
  4. (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock;
  5. (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id running, id next, NSUInteger index))reduceBlock;

时间操作

  1. + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler;
  2. + (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway;
  3. delay
  4. throttle 阀门,在固定时间内没有新值发送的时候,会发送最后的值

多个信号组合

问题:

  1. 受哪个信号终止而终止?
  2. 错误传递?
  3. 各个信号何时开始开始订阅?
  4. 在哪个线程发出?
  • concat
    • 第一个结束后,订阅第二个
    • 第一个error 后,就直接 error
  • merge
  • zip
  • combineLatest
  • sample
  • takeUntil
  • takeUntilReplacement, 当 B 来了直接替换 A,开始订阅 B

信号的高阶操作(升阶降阶)

  1. 升阶 S(v) -> S(s(v))
  2. 降阶 S(s(v)) -> S(v)
1
2
3
4
RACSignal *signal = @[@1, @2, @3, @4].rac_sequence.signal;
RACSignal *signalB = [[signal map:^id(id value) {
return [[RACSignal return:value] delay:1];
}] concat];
  • 降阶操作

  • switchToLatests

switchToLatests

  • if/then/else
    if/then/else

  • switch/cases/default

  • flatten

flatten

flatten 类似 merge 只不过一个是接收的 value是 signal,另一个接收的就是 value

  • flatten:count 按个数展开信号,当信号个数 > count 以后等待,如果有 sig completed,那么把等待中的sig 放入展开数组里面

flatten-count

flatten:1 == concat

  • flattenMap

满足 monad 的部分定义,绝大部分函数都可以使用 flattenMap 实现

  • bind

大部分函数都可以使用 bind 实现

冷信号&热信号

一些习题

  1. 如何获得无限递增的信号
1
2
3
4
5
6
7
8
RACSignal *increment(int inc) {
RACSignal *repeat = [[RACSignal return:@(inc)] repeat];

return [[repeat scanWithStart:0 reduce:^id(id running, id next) {
return @([running intValue] + [next intValue]);
}]
delay:1];
}
  1. fibonacci
1
2
3
4
5
6
7
RACSignal *fibonacci() {
RACSignal *repeat = [[RACSignal return:nil] repeat];
return [repeat scanWithStart:RACTuplePack(@1, @1) reduce:^id(RACTuple *running, id _) {
int next = [running.first intValue] + [running.second intValue];
return RACTuplePack(running.second, @(next));
}];
}