Swift Combine — Publisher和Subscriber的交互流程(自定义Publisher、Subscriber、Subscription)

发布于:2024-06-28 ⋅ 阅读:(136) ⋅ 点赞:(0)

之前的文章已经介绍过PublisherSubscriber,对于概念类的东西这里就不多介绍了,在介绍PublisherSubscriber的交互流程之前,先补充一下前面没有提到过的Subscription

Subscription

Subscription是一个协议,实现该协议的对象负责将订阅者链接到发布者。只要它在内存中,订阅者就会继续接收值。它只包含一个方法:

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
    /// Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
}

当订阅者在Publisher中接收到subscription对象后,便开始调用request方法,demand参数决定了订阅者要从发布者那里获取多少个值。

demand参数有几个可选的参数值:

  • none:表示订阅者一个值都不会收到。
  • max(value): 表示订阅者要接收value个值。
  • unlimited:表示订阅者要接受无限个值。

Subscription的实例对象中包含了一个Subscriber的引用,以使其保持最新状态。
Subscription协议有继承了Cancellable协议,所以有了cancel方法,而在自定义Subscription的时候,requestcancel方法都是必须实现的。

Publisher和Subscriber的交互流程

介绍完了Subscription协议,现在看看PublisherSubscriber是如何建立的联系。

  1. Publisher调用subscribe(_:) 方法开启链接申请,同时参数传入Subscriber实例对象。
  2. 在第一步调用subscribe(_:) 方法后,即触发Publisher内部调用receive(subscriber:)方法,在该方法中创建一个连接PublisherSubscriberSubscription对象,然后调用Subscriberreceive(subscription:)方法,将Subscription对象传给Subscriber
  3. Subscriberreceive(subscription:)方法中,使用传进来的subscription对象调用request方法,并设置Subscriber的请求次数。
  4. Subscriptionrequest方法中,知道了Subscriber的请求次数,经过相关的逻辑处理后,在此方法中给Subscriber发送数据。
  5. 通过Subscriberreceive(_:)方法向Subscriber发送数据。
  6. 通过Subscriberreceive(completion:)方法向Subscriber发送结束或者失败信息。

因为Subscription是起了一个桥梁的作用,属于幕后,所以上面第5条、第6条从语义上来说相当于Publisher通过receive(_:)方法或receive(completion:)方法向Subscriber发送数据或者结束信息。实际上SubscriptionPublisher做了向下游发送数据的事情。

自定义Subscriber

首先看一下Subscriber协议的定义:

public protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure : Error

    func receive(subscription: any Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

协议中有两个类型,三个方法。自定义的Subscriber需要使用class定义,而非struct,否则会报错,另外struct是值类型,Subscription没有持有最初的那个Subscriber对象。

// 自定义Subscriber
class CustomSubscriber: Subscriber {
  // 确定输入类型,需要和Publisher的输出类型一致。
  typealias Input = Int
  // 确定失败类型,需要和Publisher的失败类型一致,永远不会失败就定义为Never。
  typealias Failure = Never

  /** 交互流程中第3步
   *  接收subscription对象的方法。
   *  方法内subscription对象调用request方法,设置请求次数。
   */
  func receive(subscription: any Subscription) {
    debugPrint("CustomSubscriber subscription.request")
    subscription.request(.max(5))
  }
  
  /** 交互流程中第5步
   *  接收Publisher发送数据的方法。
   *  该方法返回`Subscribers.Demand`,用于在request方法中计算请求次数。
   */
  func receive(_ input: Int) -> Subscribers.Demand {
    print("New value \(input)")
    return .none
  }

  /** 交互流程中第6步
   *  接收Publisher发送结束的方法,或者正常结束,或者失败。
   */
  func receive(completion: Subscribers.Completion<Never>) {
    print("Completion: \(completion)")
  }
}

自定义Publisher

在自定义Publisher前,再看一下Publisher协议的定义:

public protocol Publisher<Output, Failure> {

    associatedtype Output
    associatedtype Failure : Error

    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

自定义的Publisher需要继承这个协议,比如:

// 自定义Publisher
class CustomPublisher: Publisher {
  // 确定输出类型,需要和Subscriber的输入类型一致。
  typealias Output = Int
  // 确定失败类型,需要和Subscriber的失败类型一致,永远不会失败就定义为Never。
  typealias Failure = Never

  /** 交互流程中第2步
   *  接收subscriber对象的方法。方法传入Subscriber实例对象,开始建立联系。
   *  方法内创建Subscription对象,然后调用Subscriber的receive(subscription:)方法,将Subscription对象传给Subscriber。
   */
  func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
    // 创建Subscription对象
    let subscription = CustomSubscription(subscriber: subscriber)
    debugPrint("CustomPublisher subscriber.receive")
    // 将Subscription对象传给Subscriber
    subscriber.receive(subscription: subscription)
  }
}

自定义Subscription

先看一下Subscription协议:

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
    /// Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
}

该协议中规定了要实现request方法,因为继承了Cancellable,所以还需要实现一个cancel方法。

public protocol Cancellable {
    func cancel()
}

Subscriber一样,自定义的Subscription需要使用class定义,而非struct,否则会报错,另外创建的Subscription实例对象需要在内存中保持,否则订阅就失效了。

下面是自定义Subscription

// 自定义Subscription
class CustomSubscription<S: Subscriber>: Subscription where S.Input == Int, S.Failure == Never {
  // 持有传入进来的Subscriber对象。
  private var subscriber: S
  private var counter = 0
  private var isCompleted = false

  // 初始化的时候将Subscriber对象传入进来,并持有,待后续发送数据使用。
  init(subscriber: S) {
    self.subscriber = subscriber
  }
  
  /** 交互流程中第4步
   *  该方法传入请求数据的次数,并给Subscriber发送数据。
   */
  func request(_ demand: Subscribers.Demand) {
    debugPrint("CustomSubscription request")
    guard !isCompleted else { return }

    for _ in 0..<(demand.max ?? 10) {
      _ = subscriber.receive(counter) // 给Subscriber发送数据
      counter += 1
    }

    if counter >= 5 {
      subscriber.receive(completion: .finished) // 通知Subscriber结束。
      isCompleted = true
    }
  }

  // 该方法中执行一些取消订阅的操作。
  func cancel() {
    isCompleted = true
  }
}

如何使用

定义完了上面的,现在看看怎么使用吧。还是依托SwiftUI的界面,我们在对应的ViewModel中添加方法,使用上面自定义的类。

首先定义一个ViewModel

class CustomCombineViewModel: ObservableObject {

  var subscription: AnyCancellable?

  func testMethod1() {
    // 创建自定义的Publisher
    let publisher = CustomPublisher()
    // 创建自定义的Subscriber
    let subscriber = CustomSubscriber()

    debugPrint("Begin subscribe")

    /** 交互流程中第1步,申请订阅。
     *  由Publisher对象调用subscribe方法,传入Subscriber对象开始。
     */
    publisher.subscribe(subscriber)
  }

  func testMethod2() {
    // 创建自定义的Publisher
    let publisher = CustomPublisher()
    // 通过sink方法申请订阅,并将创建的subscription持有,否则订阅失败,sink方法返回的时AnyCancellable,这里做了类型抹除。
    subscription = publisher
      .sink { completion in
        print("sink completion: \(completion)")
      } receiveValue: { value in
        print("sink new value \(value)")
      }
  }
}

在上面代码中的testMethod1方法中,分别创建了PublisherSubscriber,并用Publisher对象调用subscribe方法开启订阅,这也是订阅的开启入口。

当执行testMethod1时候,输出打印:

"Begin subscribe"
"CustomPublisher subscriber.receive"
"CustomSubscriber subscription.request"
"CustomSubscription request"
New value 0
New value 1
New value 2
New value 3
New value 4
Completion: finished

上面的输出也反应了从开始订阅到发送数据结束的过程。打印了5个数据是应为我们在Subscriber类中调用request方法的时候传入了.max(5),最多发送5个数据。

再看一下第二个方法testMethod2(),这个方法中没有明确的Publisher调用subscribe方法呢?

Subscribers有两个内置的Subscriber,分别为Subscribers.SinkSubscribers.Assign。当调用sink或者assign方法的时候,就开启了订阅流程。

当执行testMethod2时候,输出打印:

"CustomPublisher subscriber.receive"
"CustomSubscription request"
sink new value 0
sink new value 1
sink new value 2
sink new value 3
sink new value 4
sink new value 5
sink new value 6
sink new value 7
sink new value 8
sink new value 9
sink completion: finished

因为sink请求的是无限次数数据,所以将我们在Subscription中的数据都打印出来了。

Subscribers.Sink

Sink 创建的时候会立即调用 Subscription 对象的 request(.unlimited)

Publisher 有两个 sink 扩展方法:

  • sink(receiveCompletion:receiveValue:)
  • sink(receiveValue:)

Subscribers.Assign

Assign 会将接收到的值赋值给一个类对象的属性或者一个另一个 @Published publisher 上,它对 publisher 的 demand 也是 .unlimited

Publisher 有两个 assign 扩展方法:

  • assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
  • assign(to published: inout Published<Self.Output>.Publisher)

写在最后

现在我们完全理解了Combine订阅交互流程,是不是对Combine框架有了进一步的认识呢?
在实际开发过程中,不建议我们自己去实现PublisherSubscriberSubscription,因为一个逻辑错误可能会破坏发布者和订阅者之间的所有连接,这可能会导致意想不到的结果。

最后,希望能够帮助到有需要的朋友,如果觉得有帮助,还望点个赞,添加个关注,笔者也会不断地努力,写出更多更好用的文章。


网站公告

今日签到

点亮在社区的每一天
去签到