Combine
文中写一些 Swift 方法签名时,会带上 label,如
subscribe(_ subscriber:)
,正常作为 Selector 的写法时会忽略掉 label,只写作
subscribe(_:)
,本文特意带上 label 以使含义更清晰。
Combine Framework
Overview
在 App 运行过程中会发生各种各样的异步事件,如网络请求的返回,Notification 的发送等。在处理这些异步事件时,我们经常会使用异步回调、代理方法等。Combine 框架提供了一种声明式的 Swift API,可以将一个异步事件的处理逻辑表示成单独的一个处理链,链上的每个节点接收上一个节点的处理结果,执行自己的处理逻辑,然后传递给下一个节点。
Combine 框架采用 publisher-subscriber 模式:
-
协议
Publisher
表示一种能够随时间产生一系列值的类型。Combine 还为这些 Publishers 提供了许多 operators 来处理从上游接收的值,然后再重新发送到下游。 -
在这个处理链的最末端,是由协议
Subscriber
表示的订阅者类型,接收并处理发送给它的值。 -
当一个 Subscriber 订阅到一个 Publisher 上时,会接收到一个新生成的由协议
Subscription
表示的类型对象,Subscriber 通过该对象来向 Publisher 请求值,而 Publisher 也只有在接收到 Subscriber 的显式请求时才会分发值。
Publisher 和 Subscriber 的交互流程
-
首先,subscriber 调用 publisher 的
subscribe(_ subscriber:)
方法,将自己作为参数传过去;
subscribe(_ subscriber:)
会接着调用 publisher 的
receive(subscriber:)
方法。 -
方法
receive(subscriber:)
是
Publisher
协议的要求方法,所有遵循 Publisher 的类型都必须实现该方法,在这个方法里处理 subscriber 的订阅逻辑。但是使用方又不能直接调用该方法,而必须通过调用扩展方法
subscribe(_ subscriber:)
来发起订阅。 -
Publisher 接受订阅之后,会创建一个新的 Subscription 对象,然后调用 subscriber 的
receive(subscription:)
方法。协议 Subscription 约束了一个方法
request(_ demand:)
,subscriber 通过调用该方法来说明自己需要请求多少的值。只有 request 之后,publisher 才会向该 subscriber 发布值。 -
Publisher 通过调用 subscriber 的
receive(_ input:)
方法来向其发布值,并在结束时调用 subscriber 的
receive(completion:)
来进行通知。注意这里面说的调用并不一定指 publisher 对象持有 subscriber 对象,然后直接调用 subscriber 对象的上述方法,具体是否持有是具体实现细节,也有可能通过某些中间对象间接调用。Anyway,实现 Subscriber 协议的类型必须实现这两个方法(以及开头的
receive(subscription:)
方法),在这些方法中处理从 publisher 那里接收到的值。
Combine 中的 Publishers
内置 publishers
Combine 框架提供了许多内置的 publisher 类型供我们使用,如:
- 为 Sequence 类型实现的 publisher 扩展;
-
为 NotificationCenter 实现的
publisher(for:object:)
扩展; -
URLSession 的
dataTaskPublisher(for:)
扩展; - …
Subject
Subject 给我们提供了一种向流中插入值的方式,为我们在存量命令式编程的代码中引入 Combine 提供了一个强大的工具。Subject 本身即是一个 publisher,下游可以正常去 subscribe 它,然后使用方通过调用它的
send(_ value:)
方法来发布一个值。Combine 提供了两种 subject:
-
CurrentValueSubject:如其名,会维护一个当前值,初始化时需要传入一个初始值作为当前值,后续通过调用
send(_ value:)
来更新当前值。当一个新的 subscriber 订阅时,会马上收到一次最新的当前值。 -
PassthroughSubject:不同于 CurrentValueSubject,内部没有缓存状态,每次调用
send(_ value:)
时才会向下游发布值。
@Published
该 property wrapper 修饰 Class 的某个属性,为其生成一个 publisher,使用方通过
$
加上属性名来访问该 publisher。当属性值变化时,该 publisher 会在属性的 willSet 里发布新值,因此需要留意属性值本身尚未被更新仍是旧值,传给 subscriber 的值是新值。
Combine 中的 Subscribers
Combine 提供了两个内置的 subscribers:Subscribers.Sink 和 Subscribers.Assign,但一般不直接创建这两个的实例,而是通过 Publisher 的两种扩展方法 sink 和 assign 来获取类型抹除后的 AnyCancellable 对象。
Sink
Sink subscriber 创建的时候会立即调用 subscription 对象的
request(.unlimited)
,后面会详细介绍 Demand 的用法,这里需要留意的是一旦请求了 .unlimited 的 demand 之后,便无法再调整了,也就是说只要 publisher 不断地产生新的值,Sink 就会持续地接收到新值,直至被 cancel。
Publisher 有两个 sink 扩展方法:
-
sink(receiveCompletion:receiveValue:)
:两个闭包的含义和用法不必多解释; -
sink(receiveValue:)
:只有当 Publisher 的 Failure associated type 是 Never 时才可以使用该方法。
Assign
Assign subscriber 会将接收到的值赋值给一个类对象的属性或者一个另一个 Published publisher 上,它对 publisher 的 demand 也是 .unlimited。
-
assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
:- 因为 keyPath 类型是 ReferenceWritableKeyPath,所以 object 只能是一个类实例;
- 注意该 Assign subscriber 会强持有 object 对象,除非上游发布了一个 completion。
-
assign(to published: inout Published<Self.Output>.Publisher)
:- 使用该 subscriber 可以将上游的值通过一个 @Published 修饰的属性重新发布;
- 该方法没有返回值,当关联的 Published 实例析构时会自动地 cancel 掉订阅。
Publisher 的 operators
Combine 还为 publisher 添加了许多扩展方法,称为 operators,它们返回的也是一个 Publisher,因此可以进行链式调用。每个 operator 接收一个上游 publisher,处理转换上游发布的值,然后重新发布到下游。
具体的 operator 及其用法详见
文档
。
Connectable
常用的 Subject 如
sink(receiveValue:)
会在订阅到一个 publisher 时立即发起一个 unlimited demand,对应的 publisher 如果此时有值则会马上发布,但这时使用方并不一定准备好接收并处理数据。另一种情况是如果一个 publisher 期望有多个 subscribers,但由于每个 subscriber 订阅的时机不一样,有可能当第一个 subscriber 订阅时,publisher 就已经把值给发布出去了,这样当第二个 subscriber 订阅时,只会收到一个 completion。
Combine 提供一个
ConnectablePublisher
协议来支持手动控制开始发布值的时机,遵循该协议的 Publisher 只有在显示调用
connect()
方法之后,才会开始值发布的过程,在这之前,即使满足 publisher 发布值的条件,也不会进行发布。
使用 Publisher 的
makeConnectable()
operator 来将一个已有的 publisher 包装成一个
Publishers.MakeConnectable
实例,该实例便是一个 ConnectablePublisher,之后使用方便可在合适的时机调用其 connect() 方法来开启值的发布。
Combine 中有些 publisher 已经实现了 ConnectablePublisher 协议,如
Publishers.Multicast
,
Timer.TimerPublisher
等,有时在一些使用这些 publisher 的简单场景中,显式调用
connect()
反而显得繁琐,因此 ConnectablePublisher 又提供了一个
autoconnect()
operator,该操作符会在一个 subscriber 订阅它时立刻调用 connect 方法。
Demand and Back Pressure
在 Combine 中,一个 Publisher 只有在被 Subscriber 订阅并且发起要求的时候才会产生值。Subscriber 有下面两种方式来发起要求:
-
通过调用 Subscription 对象的
request(_ demand:)
方法,Subscription 对象在发起订阅时由
receive(subscription:)
方法传入; -
每次
receive(_ input:)
调用时,可以返回一个新的 Demand。
Demand 表示 subscriber 需要多少值,Combine 提供的 API 里面,有
.none
,
.unlimited
, 以及指定具体数目的
.max(Int)
。Demand 是可加的,如一个 subscriber 要求了 2 个值,然后又要求了 .max(3),则其订阅的 publisher 现在共有 5 个未满足的值,每当 publisher 发布一个值,其为满足的 demand 便随之减 1,这也是唯一使其为满足的 demand 数值减少的方式,因为 Demand 不支持负值。而一旦 subscriber 要求了 .unlimited 的 demand,则后续就无法继续再同 publisher 协商了。
自定义 Subscriber
内置的 Subscriber Sink 和 Assign 都是一开始就请求了 .unlimited 的 demand,如果需要精细化地控制 publisher 发送值的 rate,可以实现一个自定义的 Subscriber,如:
class MySubscriber: Subscriber {
typealias Input = Date
typealias Failure = Never
var subscription: Subscription?
func receive(subscription: Subscription) {
print("published received")
self.subscription = subscription
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
subscription.request(.max(3))
}
}
func receive(_ input: Date) -> Subscribers.Demand {
print("\(input) \(Date())")
return Subscribers.Demand.none
}
func receive(completion: Subscribers.Completion<Never>) {
print ("--done--")
}
}
自定义 Subscriber 的要点即是实现协议约束的三个方法,自定义的 subscriber 可以自己持有传入的 Subscription 对象,以实现精细化的控制。这种由 subscriber 来控制流速的行为称为 back pressure。
Back-Pressure 操作符
除了自定义 Subscriber,Combine 也提供了一些操作符给内置的 subscriber 使用以协助控制流速,这些操作符内部实现一些缓存相关的逻辑:
-
buffer:最大缓存一定数目的值,超出后丢弃或抛出错误;
-
debounce:设定一个 dueTime,假设时间为 t0 上游发布一个值,这时 debounce 不会立刻重新发布,而是创建一个重发布任务在 t0 + dueTime 之后执行(注意这里的任务是为了便于理解抽象出的概念,不代表具体实现真正地创建了一个任务,不过有可能确实是这样实现的);但如果在这期间,在时间 t1 时上游又发布了一个值,则之前的延时任务会被丢弃,会重新创建一个任务在 t1 + dueTime 之后才会重新发布,如果在这期间上游又发送了一个值,则以此类推。如:
let bounces:[(Int,TimeInterval)] = [ (0, 0), (1, 0.25), // 0.25s interval since last index (2, 1), // 0.75s interval since last index (3, 1.25), // 0.25s interval since last index (4, 1.5), // 0.25s interval since last index (5, 2) // 0.5s interval since last index ] let subject = PassthroughSubject<Int, Never>() cancellable = subject .debounce(for: .seconds(0.5), scheduler: RunLoop.main) .sink { index in print ("Received index \(index)") } for bounce in bounces { DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) { subject.send(bounce.0) } } // Prints: // Received index 1 // Received index 4 // Received index 5 // Here is the event flow shown from the perspective of time, showing value delivery through the `debounce()` operator: // Time 0: Send index 0. (republish task0 at: 0.5) // Time 0.25: Send index 1. (task0 is discarded, republish task1 at 0.25 + 0.5 = 0.75) // Time 0.75: Debounce period ends, publish index 1. (execute task1) // Time 1: Send index 2. (republish task2 at: 1 + 0.5 = 1.5) // Time 1.25: Send index 3. (task2 is discarded, republish task3 at 1.25 + 0.5 = 1.75) // Time 1.5: Send index 4. (task3 is discarded, republish task4 at 1.5 + 0.5 = 2.0) // Time 2: Debounce period ends, publish index 4. Also, send index 5. (execute task4. republish task5 at: 2 + 0.5 = 2.5) // Time 2.5: Debounce period ends, publish index 5. (execute task5)
-
throttle:设定一个 interval,每次达到时间时,会检查这一小段时间内有无值被发布,如果有的话,根据设定的 latest 参数决定将最新或最旧的值发布到下游。
-
collect:从上游接收到值时,先搜集起来,超过给定的数目或者超过给定的时间间隔之后,再把所有搜集到的值发给下游。
许多人经常搞不清 debounce 和 throttle 的区别,从上面的解释可以很清楚地看出二者的机制和差异,throttle 很稳定地定期发布一次(如有值可发布),而 debounce 如果上游频繁地发布值的话,可能要等好久才会发布一次,这也正是 debounce 的作用,比如在输入框输入文字的场景。
开源实现:OpenCombine
Apple 家的新东西都有一个特点:只能在比较新的操作系统版本上使用,比如 Combine 要求 iOS 13 以上才能使用,而且一些 API 更新可能会要求更新的 OS 版本。然而有位大佬
Sergej Jaskiewicz
开发了 Combine 的开源实现:
OpenCombine
,完全兼容 Combine 的 API,可以运行在老的 iOS 和 macOS 版本上,甚至支持 Windows、Linux 和 WASM。
我们来简单看一下内置的 Publisher 之一的 PassthroughSubject 的开源实现。
先从订阅流程看起,可以结合上面的图作为参照。首先是 Publisher 的扩展方法
subscribe(_ subscribe)
,其实就是简单地调用了一下具体 Publisher 类型的协议约束方法,传入参数 subscriber:
receive(subscriber: subscriber)
。PassthroughSubject 的协议方法实现为:
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Output == Downstream.Input, Failure == Downstream.Failure
{
lock.lock()
if active {
let conduit = Conduit(parent: self, downstream: subscriber) // a.
downstreams.insert(conduit) // b.
lock.unlock()
subscriber.receive(subscription: conduit) // c.
} else {
let completion = self.completion!
lock.unlock()
subscriber.receive(subscription: Subscriptions.empty) // d.
subscriber.receive(completion: completion) // e.
}
}
先看 c 处,最终调用了 subscriber 的
receive(subscription:)
,传入的 subscription 对象在 a 处创建,在 b 处被 publisher 插入到自己内部的一个 downstreams 数组中,该对象传给 subscriber 之后也会被 subscriber 对象持有,用于向 publisher 要求 demand、执行 cancel。
我们上面多次介绍到 Subscription 类型,但 Combine 并没有提供具体的实现类型,因为它其实是某个具体 publisher 的实现的一部分,这里的 Conduit 便是 PassthroughSubject 的 Subscription 实现类。我们接着看 PassthroughSubject 的另一个协议约束方法:
// PassthroughSubject 的 send(_ input:) 实现
public func send(_ input: Output) {
lock.lock()
guard active else {
lock.unlock()
return
}
let downstreams = self.downstreams
lock.unlock()
for conduit in downstreams {
conduit.offer(input)
}
}
// Conduit 的 offer(_ output:) 实现
override func offer(_ output: Output) {
lock.lock()
guard demand > 0, let downstream = downstream else {
lock.unlock()
return
}
demand -= 1 // a.
lock.unlock()
downstreamLock.lock()
let newDemand = downstream.receive(output) // b.
downstreamLock.unlock()
guard newDemand > 0 else { return }
lock.lock()
demand += newDemand // c.
lock.unlock()
}
PassthroughSubject 的
send(_ input:)
会调用 Conduit 的
offer(_ output:)
,在 offer 方法中,a 处将 demand 减 1,b 处调用 subscriber 的 receive(_ input:) 方法,c 处再将返回的 newDemand 加到现有 demand 上面,和前文描述的逻辑完全一致。
其他方法实现、Subscriber 的实现、各种操作符的实现可以直接翻源码,了解各个协议之间的约束之后非常简洁易读。
References
- Publisher
- Subscriber
- Processing Published Elements with Subscribers
- Controlling Publishing with Connectable Publishers
- 关于 Backpressure 和 Combine 中的处理
- OpenCombine
未经允许不得转载:大白鲨游戏网 » 深入理解 Swift Combine