RxSwift学习之旅 - 被观察者和订阅者

应用场景

在开发中经常会遇到和处理的业务场景有: 利用@IBActionTarget-action响应用户的操作事件;设置Notification监听键盘的位置变化;使用closure异步处理网络请求返回的数据;使用KVO监听属性的变化,以做出相应操作等。

上面的场景都对应一个被观察的对象,当这个对象发生某种变化时,我们根据不同的变化事件去处理不同的业务逻辑。这无疑构成一个观察者模式,RxSwift中,被观察者和订阅者是其中两大重要角色。

Observable(被观察者)

RxSwift中的Observable 是观察者模式中被观察的对象,相当于一个事件序列 (GeneratorType) ,会主动向订阅者发送新产生的事件信息。事件类型分为以下三种:

  • .onNext(element) 表示新的事件数据。
  • .onError(error) 表示事件序列因异常而完结。
  • .onCompleted() 表示事件序列完结。

Subscribe(订阅者)

如果只有一个Observable被创建,而没有被任何的订阅者所订阅的话,那么什么事情都不会发生。所以我们需要创建一个订阅者,来响应事件的触发。

1
2
3
4
5
6
7
8
9
_ = Observable<String>.create { observerOfString in
print("Observable created")
observerOfString.on(.next("😉"))
observerOfString.on(.completed)
return Disposables.create()
}
.subscribe { event in
print(event)
}

通过subscribe订阅之后,会受到两个消息,.onNext(element).onCompleted(),打印出:

1
2
next(😉)
completed

创建并订阅

RxSwift本身提供了多种方式来创建和订阅一个被观察者,下面来一一介绍,以下的代码可以git clone https://github.com/ReactiveX/RxSwift.git,然后打开Rx.playground,新建一个page,importRxSwift即可。

create

常见的两种方式来创建一个Observable对象,一种是通过引入RxCocoa(RxCocoa是对cocoa进行的Rx扩展),它已经包含了我们常用到的Observable流,比如buttontap事件。一般做iOS开发的要使用到RxSwift都要用到RxCocoa的,这两个是相辅相成的。

1
let observable = loginButton.rx.tap.asObservable()

也可以使用提供的create函数来创建一个Observable对象。

1
2
3
4
5
6
7
8
9
10
11
12
let disposeBag = DisposeBag()
let observable = Observable<String>.create { observerOfString -> Disposable in
observerOfString.on(.next("😬"))
observerOfString.on(.completed)
return Disposables.create()
}
observable.subscribe { print($0) }
.disposed(by: disposeBag)

output:
next(😬)
completed

never

创建一个序列,不会终止也不会发出任何事件。

1
2
3
4
5
6
7
8
9
let disposeBag = DisposeBag()
let neverSequence = Observable<String>.never()

let neverSequenceSubscription = neverSequence
.subscribe { _ in
print("This will never be printed")
}

neverSequenceSubscription.disposed(by: disposeBag)

empty

创建一个空的序列,只会发出一个完成事件。

1
2
3
4
5
6
7
8
9
10
let disposeBag = DisposeBag()

Observable<Int>.empty()
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)

output:
completed

just

创建一个单个元素的序列。

1
2
3
4
5
6
7
8
9
10
11
let disposeBag = DisposeBag()

Observable.just("🔴")
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)

output:
next(🔴)
completed

of

使用固定数量的元素创建一个序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
let disposeBag = DisposeBag()

Observable.of("🐶", "🐱", "🐭", "🐹")
.subscribe(onNext: { element in
print(element)
})
.disposed(by: disposeBag)

output:
🐶
🐱
🐭
🐹

这里使用了subscribe(onNext:)而不是subscribe(_:),可以通过前面一种方式来订阅某个事件,所以有如下的写法:

1
2
3
4
5
6
someObservable.subscribe(
onNext: { print("Element:", $0) },
onError: { print("Error:", $0) },
onCompleted: { print("Completed") },
onDisposed: { print("Disposed") }
)

可以只订阅其中某个,而不用全部订阅。

from

从一个序列创建一个可被观察的序列。

1
2
3
4
5
6
7
8
9
10
11
let disposeBag = DisposeBag()

Observable.from(["🐶", "🐱", "🐭", "🐹"])
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

output:
🐶
🐱
🐭
🐹

这里使用$0表示默认的第一个参数

range

创建一个发出一系列顺序整数然后终止的序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let disposeBag = DisposeBag()

Observable.range(start: 1, count: 10)
.subscribe { print($0) }
.disposed(by: disposeBag)

output:
next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
next(7)
next(8)
next(9)
next(10)
completed

repeatElement

创建一个给予元素的无限序列。

1
2
3
4
5
6
7
8
9
10
11
let disposeBag = DisposeBag()

Observable.repeatElement("🔴")
.take(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

output:
🔴
🔴
🔴

这里的take(3)表示只取前3个元素。

generate

创建一个满足条件的序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
let disposeBag = DisposeBag()

Observable.generate(
initialState: 0,
condition: { $0 < 3 },
iterate: { $0 + 1 }
)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

output:
0
1
2

deferred

只有当有订阅者订阅的时候才会去创建序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
let disposeBag = DisposeBag()
var count = 1

let deferredSequence = Observable<String>.deferred {
print("Creating \(count)")
count += 1

return Observable.create { observer in
print("Emitting...")
observer.onNext("🐶")
observer.onNext("🐱")
observer.onNext("🐵")
return Disposables.create()
}
}

deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

output:
Creating 1
Emitting...
🐶
🐱
🐵
Creating 2
Emitting...
🐶
🐱
🐵

如果把后面两个订阅去掉的话,是不会有Creating输出的。

error

创建一个没有元素并以错误终止的序列。

1
2
3
4
5
6
7
8
9
10
11
public enum TestError : Error {
case test
}

let disposeBag = DisposeBag()
Observable<Int>.error(TestError.test)
.subscribe { print($0) }
.disposed(by: disposeBag)

output:
error(test)

doOn

在每个事情发出之后,可以调用其它处理,然后返回原事件,相当于一个拦截器,但是只能拦截不能修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let disposeBag = DisposeBag()

Observable.of("🍎", "🍐", "🍊", "🍋")
.do(onNext: { print("Intercepted:", $0) }, onError: { print("Intercepted error:", $0) }, onCompleted: { print("Completed") })
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

output:
Intercepted: 🍎
🍎
Intercepted: 🍐
🍐
Intercepted: 🍊
🍊
Intercepted: 🍋
🍋
Completed

同样可以针对不同的事件类型单独拦截。

Subject

Subject就相当于一个桥梁或者代理,它既可以作为一个observer也可以作为一个Observable

下面来看几种不同的Subject:

PublishSubject

PublishSubject只会发送给订阅者订阅之后的事件,之前发生的事件将不会发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

output:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

订阅者2只能收到订阅后传过来的事件,如图:
image

如果要保证所有事件都能被订阅到,可以使用Create主动创建或使用ReplaySubject
如果被观察者因为错误被终止,PublishSubject只会发出一个错误的通知。

ReplaySubject

不管订阅者什么时候订阅的都可以把所有发生过的事件发送给订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.createUnbounded()

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

output:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐶)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

当然你也指定重发事件的缓冲区大小,比如上面的例子如果这样创建:

1
let subject = ReplaySubject<String>.create(bufferSize: 1)

指定缓冲区大小为1,那么订阅者2就不会收到🐶了。
image

BehaviorSubject

广播所有事件给订阅者,对于新的订阅者,广播最近的一个事件或者默认值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

subject.addObserver("3").disposed(by: disposeBag)
subject.onNext("🍐")
subject.onNext("🍊")

output:
Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 3 Event: next(🅱️)
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)

image

PublishSubject, ReplaySubject, and BehaviorSubject不会自动发送完成事件当他们被回收时。

Variable

VariableBehaviorSubject的封装,它和BehaviorSubject不同之处在于,不能向Variable发送.Complete.Error,它会在生命周期结束被释放的时候自动发送.Complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
let disposeBag = DisposeBag()
let variable = Variable("🔴")

variable.asObservable().addObserver("1").disposed(by: disposeBag)
variable.value = "🐶"
variable.value = "🐱"

variable.asObservable().addObserver("2").disposed(by: disposeBag)
variable.value = "🅰️"
variable.value = "🅱️"

output:
Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 1 Event: completed
Subscription: 2 Event: completed

参考资料

RxSwift

Document

AloneMonkey wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!