📜 ⬆️ ⬇️

About Publish, Connect, RefCount and Share in RxSwift

Hi, Habr. I present to your attention the translation of the article Understanding Publish, Connect, RefCount and RxSwift .

The original article uses Swift second version and the corresponding version of RxSwift. I had the courage to rewrite the pieces of code below under Swift 3.
Just want to note that concepts such as Observable and Sequence, can be considered one and the same. The same goes for Observer and Subscriber.

I also recommend reading about share (), share Replay (), shareReplayLatestWhileConnected () in RxSwift .
')
In this article I will try to explain such operators for working with Connectable Observable in RxSwift as publish, connect, refCount and share . They are used together in various combinations. It is very important to understand the difference between:


Active and Passive Observables


Before getting to the point, I would like to say a few words about hot and cold Observables. As for me, the concepts of hot and cold Observables are a bit blurred.

Let's hot Observable we will call Active Sequence, and cold Passive Sequence.


An example of the Passive Sequence is a request to the network, which begins only when we subscribe to the sequence. Examples of Active Sequence are a web-socket connection, timer events, or text produced by UITextField .

And it's all. Think of active and passive sequences. The concepts of hot / cold / warm / cool Observables are too messy and can be confusing.

Multiple subscriptions to one Observable


If you have ever subscribed twice (or more) to the same Observable, you might be surprised at the results.

Take a look at the following piece of code:

 let url = URL(string: "https://habrahabr.ru/")! let requestObservable = URLSession.shared .rx.data(request: URLRequest(url: url)) requestObservable.subscribe(onNext: { print($0) }) requestObservable.subscribe(onNext: { print($0) }) 

Looking at the console, we will see two HTTP responsions. Observable executed the request twice, although this is contrary to our expectations.



share() as salvation


Obviously, this is not what we want from a regular HTTP request. But we can change this behavior and execute just one request. You just need to apply the share() operator to our Observable.

 let url = URL(string: "https://habrahabr.ru/")! let requestObservable = URLSession.shared .rx.data(request: URLRequest(url: url)) .share() requestObservable.subscribe(onNext: { print($0) }) requestObservable.subscribe(onNext: { print($0) }) 

As expected, only one HTTP request was executed.


In essence, the operator share() is just a wrapper over publish().refcount() .
Stop stop stop! What publish() , what refcount() ?

publish() and his friend connect()


When the publish () operator is applied, the Observable is transformed into a Connectable Observable. The ReactiveX documentation states:
Connectable Observable is similar to regular Observable, except for one thing. It begins to produce elements not when it is subscribed, but only when the connect() operator is called on it .
 let myObservable = Observable.just(1).publish() print("Subscribing") myObservable.subscribe(onNext: { print("first = \($0)") }) myObservable.subscribe(onNext: { print("second = \($0)") }) DispatchQueue.main.asyncAfter(deadline: .now() + 3) { print("Calling connect after 3 seconds") myObservable.connect() } /* Output: Subscribing Calling connect after 3 seconds first = 1 second = 1 */ 

In the example above, Observers subscribe to myObservable immediately after it has been created. But they only work after 3 seconds, when the connect() operator was called. Simply put, connect() activates the Connectable Observable and includes subscribers.

The interesting thing is how resources are cleaned up. Look at this code.

 let myObservable = Observable<Int> .interval(1, scheduler: MainScheduler.instance) .publish() myObservable.connect() print("Starting at 0 seconds") let mySubscription = myObservable.subscribe(onNext: { print("Next: \($0)") }) DispatchQueue.main.asyncAfter(deadline: .now() + 3) { print("Disposing at 3 seconds") mySubscription.dispose() } DispatchQueue.main.asyncAfter(deadline: .now() + 6) { print("Subscribing again at 6 seconds") myObservable.subscribe(onNext: { print("Next: \($0)") }) } // Output: /* Starting at 0 seconds Next: 0 Next: 1 Next: 2 Disposing at 3 seconds Subscribing again at 6 seconds Next: 6 Next: 7 Next: 8 Next: 9 ... */ 

Even if all subscribers have unsubscribed from our Observable, the latter still lives and continues to produce events under the hood.

Translator's Note
The connect() method returns Disposable . Thus, you can stop the production of elements by calling the dispose() method of this Disposable , or you can provide this DisposeBag ' DisposeBag .

Now let's compare this to publish().refcount() .

The difference between publish().connect() and publish().refcount()


You can perceive the refcount() statement as magic, which is processed for you by unsubscribing the Observers. refcount() calls connect() automatically when the first Observer is signed, so there is no need to do it yourself.

 let myObservable = Observable<Int> .interval(1, scheduler: MainScheduler.instance) .publish() .refCount() print("Starting at 0 seconds") let mySubscription = myObservable.subscribe(onNext: { print("Next: \($0)") }) DispatchQueue.main.asyncAfter(deadline: .now() + 3) { print("Disposing at 3 seconds") mySubscription.dispose() } DispatchQueue.main.asyncAfter(deadline: .now() + 6) { print("Subscribing again at 6 seconds") myObservable.subscribe(onNext: { print("Next: \($0)") }) } // Output: /* Starting at 0 seconds Next: 0 Next: 1 Next: 2 Disposing at 3 seconds Subscribing again at 6 seconds Next: 0 Next: 1 Next: 2 Next: 3 ... */ 

Pay attention to that. When we signed again, Observable began to emit items from the beginning.

Conclusion


Feel the difference now? publish().connect() and publish().refcount() (or share() ) control the unsubscribe mechanism from Obervables differently.

When you use publish().connect() , you need to manually control the resource cleanup mechanism of your Observable (described in the note under the spoiler) . Your sequence behaves as active and produces items all the time, regardless of subscriptions.

On the other hand, publish().refcount()/share() keeps track of how many Observers subscribe to the Observable and do not disable the former from the latter as long as at least one subscriber exists. In other words, when the subscriber count drops to zero, the Observable “dies” and stops producing any items.

If something is not completely clear, please let us know in the comments. Thank.

Source: https://habr.com/ru/post/336662/


All Articles