📜 ⬆️ ⬇️

RxSwift cheat sheet on operators (+ PDF)



Interested in the topic of functional programming, I was at a crossroads - which framework to choose for review. ReactiveCocoa is a veteran in iOS circles, there is plenty of information on him. But he grew up with Objective-C, and although this is not a problem, I still mostly write on Swift at the moment, I would like to take a solution that was originally designed with all the features of the language taken into account. RxSwift is the port of Reactive Extensions, which has a long history, but the port itself is fresh and written specifically for Swift. On it, I decided to stop.
But the specifics of the RxSwift documentation is that the description of all the commands leads to reactivex.io , and there the general information is basically given, the hands of the developers have not reached yet to do the documentation for RxSwift, which is not always convenient. Some teams have subtleties in the implementation, there are those about which in the general documentation there is nothing but a mention.
After reading all the chapters of the wiki with the RxSwift githab, I immediately decided to sort out the official examples, then it became clear that this would not work with the RX, you need to understand the basics well, otherwise you will be like a monkey with a copy - paste grenade. I began to sort out the most difficult to understand the team, then those that seem to be understandable, but by asking myself questions about them I realized that I only guess how to answer correctly, but I’m not sure.
In general, I decided to work out all the RxSwift operators. The best way to understand something in programming is to run the code and see how it will work. Then, taking into account the specifics of reactive programming, the schemes are very useful, and a brief description in Russian. Having finished the work today, I thought that it was a sin not to share the results with those who are just eyeing the topic of reactive programming.
Many pictures and text under the cut, a lot!


Previously, I recommend viewing the official documentation , I have transferred the main essence and specificity of the RxSwift commands, and not the basics.
You can also play around with the balls in the schemes, the so-called RxMarbles , there is a free version for iPhone / iPad
')
So, in this article I will consider all (well, or almost all) RxSwift commands, for each I will give a brief description, scheme (if that makes sense), code, execution result, if necessary, I will make comments on the output in the log of the results of code execution.
In the article, the title of each team is a link to the official documentation, since I did not set a goal to translate all the nuances of the teams.

Here is a link to the githab , where I clone the official RxSwift repository, and added my own sandbox (DescribeOperators.playground), where there will be almost the same code as in the article.
And here is the link specifically to PDF where all commands are collected as mindMap, which allows you to quickly view all of them. Pieces of code in PDF are attached in order to see how and with which parameters you need to work with the command. Initially, for the sake of this PDF, I started everything - to have on hand a document in which all the commands with their diagrams are clearly visible. PDF turned out to be huge (in terms of working space, not weight), but I checked that even on iPad 2 everything looks fine.

About all the errors, please write in a personal, the amount of work turned out to be a bit too big, after the fourth reading of the text, my eyes cursed me.
Well, I hope my job is useful to someone. Let's get started

Content


Notes

Create Observable


asObservable
create
deferred
empty
error
interval
just
never
of
range
repeatElement
timer

Combining Observable


amb
combineLatest
concat
merge
startWith
switchLatest
withLatestFrom
zip

Filtration


distinctUntilChanged
elementAt
filter
ignoreElements
sample
single
skip
skip (duration)
skipUntil
skipWhile
skipWhileWithIndex
take
take (duration)
takeLast
takeUntil
takeWhile
takeWhileWithIndex
throttle

Transformation


buffer
flatmap
flatMapFirst
flatMapLatest
flatMapWithIndex
map
mapWithIndex
window

Math and aggregation operators


reduce
scan
toArray

Work with errors


catchError
catchErrorJustReturn
retry
retryWhen

Operators for working with Connectable Observable


multicast
publish
refCount
reply
replayAll

Auxiliary methods


debug
doOn / doOnNext
delaySubscription
observeOn
subscribe
subscribeOn
timeout
using



In the diagrams, I will use the designation Source / SO as the Source Observable , RO / Result as the Result Observable .

As an auxiliary code I will use the function createSequenceWithWait, it creates an Observable from an array of elements with a specified interval between elements.

public enum ResultType { case Infinite case Completed case Error } public func createSequenceWithWait<T, U>(array: [T], waitTime: Int64 = 1, resultType: ResultType = .Completed, describer: ((value: T) -> U)? = nil) -> Observable<U> { return Observable<U>.create{ observer in for (idx, letter) in array.enumerate() { let time = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(idx) * Int64(NSEC_PER_SEC)) dispatch_after(time, dispatch_get_main_queue()) { if let describer = describer { let value = describer(value: letter) observer.on(.Next(value)) } else { observer.on(.Next(letter as! U)) } } } if resultType != .Infinite { let allTime = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(array.count) * Int64(NSEC_PER_SEC)) dispatch_after(allTime, dispatch_get_main_queue()) { switch resultType { case .Completed: observer.onCompleted() case .Error: observer.onError(RxError.Unknown) default: break } } } return NopDisposable.instance } } 

The function example - just allows you to separate the output in the console, its code is as follows (taken from RxSwift)
 public func example(description: String, action: () -> ()) { print("\n--- \(description) example ---") action() } 

In all examples where it is necessary to work with time delays, if this code will be run in a sandbox, it is necessary to register
 import XCPlayground XCPlaygroundPage.currentPage.needsIndefiniteExecution = true 


It also implies that the reader has a general idea of ​​what is reactive programming in general and about RxSwift in particular. I do not know whether it makes sense to fence another introductory.

Create Observable





asObservable


This method is implemented in the RxSwift classes, if they support conversion to Observable. For example: ControlEvent, ControlProperty, Variable, Driver

 example("as Observable") { let variable = Variable<Int>(0) variable.asObservable().subscribe { e in print(e) } variable.value = 1 } 


Console:
 --- as Observable example --- Next(0) Next(1) Completed 


In this example, we have converted Variable to Observable and subscribed to its events.




create



This method allows you to create Observable from scratch, completely controlling what elements and when it will generate.

 example("create") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let multipleSequence = Observable<Observable<AnyObject>>.create { observer in observer.on(.Next(firstSequence)) observer.on(.Next(secondSequence)) return NopDisposable.instance } let concatSequence = multipleSequence.concat() concatSequence.subscribe { e in print(e) } } 


Console:
 --- create example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) 


In this example, we created the Observable manually, it will generate two Observables, the elements of which we then combine with the command concat , we will subscribe to the resulting Observable




deferred


This operator allows you to postpone the creation of an Observable, until you subscribe using subscribe

 example("without deferred") { var i = 1 let justObservable = Observable.just(i) i = 2 _ = justObservable.subscribeNext{ print ("i = \($0)") } } example("with deferred") { var i = 1 let deferredJustObservable = Observable.deferred{ Observable.just(i) } i = 2 _ = deferredJustObservable.subscribeNext{ print ("i = \($0)") } } 


Console:
 --- without deferred example --- i = 1 --- with deferred example --- i = 2 


In the first case, Observable is created immediately using Observable.just (i), and changing the value of i does not affect the generated element by a sequence. In the second case, we create using deferred, and we can change the value of i before subscribe




empty


Empty sequence ending with completed



 example("empty") { let sequence = Observable<Int>.empty() sequence.subscribe { e in print(e) } } 


Console:
 --- empty example --- Completed 





error


Will create a sequence that consists of one event - Error



 example("error") { let sequence = Observable<Int> .error(RxError.Unknown) sequence.subscribe { e in print(e) } } 


Console:
 --- error example --- Error(Unknown error occured.) 





interval


Creates an infinite sequence, increasing from 0 with step 1, with the specified periodicity



 example("interval") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } 


Console:
 --- interval example --- Next(0) Next(1) Next(2) Next(3) Next(4) .... 





just


Creates a sequence of any value that terminates. Completed



 example("just") { let sequence = Observable.just(100) sequence.subscribe { e in print(e) } } 


Console:
 --- just example --- Next(100) Completed 





never


An empty sequence whose observers are never called, i.e. no event will be generated



 example("never") { let sequence = Observable<Int>.never() sequence.subscribe { e in print(e) } } 


Console:
 --- never example --- 





of


The sequence from the variadic variable, after all the elements generated Completed



 example("simple of") { let sequence = Observable.of(1, 2) sequence.subscribe { e in print(e) } } example("of for Observables") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } } 


Console:
 --- simple of example --- Next(1) Next(2) Completed --- of for Observables example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed 


In the first case, we created a sequence of two numbers. In the second of the two Observable, and then they are combined with each other using the operator merge




range


Creates a sequence with a finite number of elements, increasing in increments of 1 from the specified value a specified number of times, after all the elements generated Completed



 example("range") { let sequence = Observable.range(start: 5, count: 3) sequence.subscribe { e in print(e) } } 


Console:
 --- range example --- Next(5) Next(6) Next(7) Completed 

Elements were generated starting from 5, 3 times with step 1




repeatElement


Infinitely create the specified element, without delay. There will never be a Completed or Error event.



 example("repeatElement") { let sequence = Observable.repeatElement(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } 


Console:
 --- repeatElement example --- Next(1) Next(2) Next(3) ..... 





timer


The infinite sequence, increasing from 0 with step 1, with the specified periodicity and the possibility of a delay at the start. There will never be a Completed or Error event.



 example("timer") { let sequence = Observable<Int64>.timer(2, period: 3, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } 


Console:
 --- timer example --- Next(0) Next(1) Next(2) 


In this example, the sequence will start generating items with a delay of 2 seconds, every 3 seconds

Combining Observable






amb



 SO = [Observable<T>]  SO1, SO2 = Observable<T> RO = Observable<T> 


Of all the Observable SOs, the one that first starts generating elements is selected, its elements are duplicated in the RO, the remaining SOs are ignored.



 example("amb") { let subjectA = PublishSubject<Int>() let subjectB = PublishSubject<Int>() let subjectC = PublishSubject<Int>() let subjectD = PublishSubject<Int>() let ambSequence = [subjectA, subjectB, subjectC, subjectD].amb() ambSequence.subscribe { e in print(e) } subjectC.onNext(0) subjectA.onNext(3) subjectB.onNext(102) subjectC.onNext(1) subjectD.onNext(45) } 


Console:
 --- amb example --- Next(0) Next(1) 


Since first generated the subjectC element, - only its elements are duplicated in the RO, the rest are ignored




combineLatest



 SO = SO1, SO2,... SON = Observable<T> RO = Observable<f(T,T)> 


As soon as all Observables have generated at least one element — these elements are used as parameters in the passed function, and the result of this function is generated by the RO as an element. Later on, when any Observable element is generated, a new result of the function is generated with the last elements from all combinable Observable



 example("combineLatest") { let firstSequence = createSequenceWithWait([1,2,3], waitTime: 2) { element in "\(element)" }.debug("firstSequence") let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 1) { element in "\(element)" } .delaySubscription(3, scheduler: MainScheduler.instance) .debug("secondSequence") let concatSequence = Observable.combineLatest(firstSequence, secondSequence) { first, second -> String in "\(first) - \(second)" } concatSequence.subscribe { e in print(e) } } 


Console:
 --- combineLatest example --- 2016-04-12 16:59:35.421: firstSequence -> subscribed 2016-04-12 16:59:35.422: secondSequence -> subscribed 2016-04-12 16:59:35.434: firstSequence -> Event Next(1) 2016-04-12 16:59:37.423: firstSequence -> Event Next(2) 2016-04-12 16:59:38.423: secondSequence -> Event Next(A) Next(2 - A) 2016-04-12 16:59:39.423: firstSequence -> Event Next(3) Next(3 - A) 2016-04-12 16:59:39.522: secondSequence -> Event Next(B) Next(3 - B) 2016-04-12 16:59:40.622: secondSequence -> Event Next(C) Next(3 - C) 2016-04-12 16:59:41.722: firstSequence -> Event Completed 2016-04-12 16:59:41.722: firstSequence -> disposed 2016-04-12 16:59:41.722: secondSequence -> Event Completed 2016-04-12 16:59:41.722: secondSequence -> disposed Completed 


In this example, I created Observable using createSequenceWithWait so that the elements are generated with a different delay so that you can see how the elements are mixed.
firstSequence managed to generate 1 and 2 before the secondSequence generated A, so 1 was dropped, and the first output was 2 - A




concat



 SO = Observable<Observable<T>>  SO1, SO2 = Observable<T> RO = Observable<T> 


In RO, the elements first include all the elements of the first Observable, and only then the next. This means that if the first Observable never generates Completed, the elements of the second will never go to the RO. Error in the current Observable is forwarded to the RO



 example("concat object method") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let concatSequence = firstSequence.concat(secondSequence) concatSequence.subscribe { e in print(e) } } example("concat from array") { let firstSequence = Observable.of(1,2,3) let secondSequence = Observable.of(4,5,6) let concatSequence = Observable.of(firstSequence, secondSequence) .concat() concatSequence.subscribe { e in print(e) } } 


Console:
 --- concat object method example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed --- concat from array example --- Next(1) Next(2) Next(3) Next(4) Next(5) Next(6) Completed 


In the first example, we attach the second Observable to the first.
In the second, we generate a sequence from an array.




merge



 SO = Observable<Observable<T>> RO = Observable<T> 


RO elements include elements from the original Observable in the order in which they were released in the original Observable



 example("simple merge") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } } example("merge with wait") { let firstSequence = createSequenceWithWait([1,2,3]) { element in "\(element)" } let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 2) { element in "\(element)" } let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } } 


Console:
 --- simple merge example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed --- merge with wait example --- Next(1) Next(A) Next(2) Next(3) Next(B) Next(C) Completed 


In the first example, we merge the two sequences created without delay, as a result, the first Observable has time to generate all its elements before it starts to do the second, the result is identical to concat
In the second case, the sequences are made with a delay in generation, and it is clear that the elements in the RO are now mixed up, in the order in which they were generated in the original Observable




startWith



 SO = Observable<T> RO = Observable<T> 


SO elements are added to the top.



 example("startWith") { let sequence = Observable.of(1, 2, 3).startWith(0) sequence.subscribe { e in print(e) } } 


Console:
 --- startWith example --- Next(0) Next(1) Next(2) Next(3) Completed 





switchLatest



 SO = Observable<Observable<T>> RO = Observable<T> 


Initially, we subscribe to O1 of the generated SO, its elements are mirrored in the RO. As soon as the next Observable is generated from SO, the elements of the previous Observable are discarded, since there is an answer from O1, we subscribe for O2 and so on. Thus, RO elements are only from the last generated Observable.



 example("switchLatest") { let varA = Variable<Int>(0) let varB = Variable<Int>(100) let proxyVar = Variable(varA.asObservable()) let concatSequence = proxyVar.asObservable().switchLatest() concatSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 3 proxyVar.value = varB.asObservable() varB.value = 4 varA.value = 5 } example("switchLatest") { let observableA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(3) { observer.on(.Next(20)) } delay(5) { observer.onCompleted() } return NopDisposable.instance }.debug("oA") let observableB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("oB") let observableC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("oC") let subjects = [observableA, observableB, observableC] let sequence:Observable<Observable<Int>> = createSequenceWithWait([observableA, observableB, observableC],waitTime:1) {$0} let switchLatestSequence:Observable<Int> = sequence.switchLatest() switchLatestSequence.subscribe { e in print(e) } } 


Console:
 --- switchLatest example --- Next(0) Next(1) Next(2) Next(3) Next(4) Completed --- switchLatest example --- 2016-04-12 17:15:22.710: oA -> subscribed 2016-04-12 17:15:22.711: oA -> Event Next(10) Next(10) 2016-04-12 17:15:23.797: oA -> disposed //      oB 2016-04-12 17:15:23.797: oB -> subscribed 2016-04-12 17:15:23.797: oB -> Event Next(100) Next(100) 2016-04-12 17:15:24.703: oB -> disposed //      oC 2016-04-12 17:15:24.703: oC -> subscribed 2016-04-12 17:15:24.703: oC -> Event Next(1000) Next(1000) 2016-04-12 17:15:25.800: oC -> Event Next(2000) Next(2000) 2016-04-12 17:15:26.703: oC -> Event Completed 2016-04-12 17:15:26.703: oC -> disposed Completed 


The first example shows how the command works in statics, when we manually reconnect Observable.
In the second we have a sequence with delays. observableA, observableB, observableC from SO are generated every 1 second. Their elements are generated with different delays.




withLatestFrom



 SO1, SO2 = Observable<T> RO = Observable<f(T,T)> 


As soon as O1 generates an element, if at least one element is generated in O2, if yes, then the last elements from O1 and O2 are taken and used as arguments for the passed function, the result of which is generated by RO as an element



 example("withLatestFrom") { let varA = Variable<Int>(0) let varB = Variable<Int>(10) let withLatestFromSequence = varA.asObservable().withLatestFrom(varB.asObservable()) { "\($0) - \($1)" } withLatestFromSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 20 varB.value = 30 varA.value = 5 varA.value = 6 } 


Console:
 --- withLatestFrom example --- Next(0 - 10) Next(1 - 10) Next(2 - 10) Next(5 - 30) Next(6 - 30) Completed 





zip



 SO = Observable<Observable<T>> RO = Observable<f(T,T)> 


RO elements are a combination of the elements generated by the source Observable, the union is on the index of the released element



 example("zip with simple Variable") { let varA = Variable<Int>(0) let varB = Variable<Int>(10) let zippedSequence = Observable.zip(varA.asObservable(), varB.asObservable()) { "\($0) - \($1)" } zippedSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 20 varB.value = 30 varA.value = 3 varA.value = 4 } example("zip with PublishSubject") { let subjectA = PublishSubject<Int>() let subjectB = PublishSubject<Int>() let zippedSequence = Observable.zip(subjectA, subjectB) { "\($0) - \($1)" } zippedSequence.subscribe { e in print(e) } subjectA.onNext(0) subjectA.onNext(1) subjectA.onNext(2) subjectB.onNext(100) subjectB.onNext(101) subjectA.onNext(3) subjectB.onNext(102) subjectA.onNext(4) } 


Console:
 --- zip with simple Variable example --- Next(0 - 10) Next(1 - 20) Next(2 - 30) Completed --- zip with PublishSubject example --- Next(0 - 100) Next(1 - 101) Next(2 - 102) 


From the examples it is clear that the elements are combined in the order in which they were generated in the source Observable

Filtration






distinctUntilChanged


Skipping all repeated items in succession.



 example("distinctUntilChanged") { let sequence = Observable.of(1, 2, 2, 3, 4, 4, 4, 1).distinctUntilChanged() sequence.subscribe { e in print(e) } } 


Console:
 --- distinctUntilChanged example --- Next(1) Next(2) Next(3) Next(4) Next(1) Completed 


Here is a delicate moment that elements that are not unique to the entire sequence are discarded, but only those that go in a row.




elementAt



Only the item N issued by the account gets into RO



 example("elementAt") { let sequence = Observable.of(0, 10, 20, 30, 40) .elementAt(2) sequence.subscribe { e in print(e) } } 


Console:
 --- elementAt example --- Next(20) Completed 





filter



All elements that do not satisfy the specified conditions are discarded.



 example("filter") { let sequence = Observable.of(1, 20, 3, 40) .filter{ $0 > 10} sequence.subscribe { e in print(e) } } 


Console:
 --- filter example --- Next(20) Next(40) Completed 





ignoreElements



Drops all elements, transmits only terminal messages Completed and Error



 example("ignoreElements") { let sequence = Observable.of(1, 2, 3, 4) .ignoreElements() sequence.subscribe { e in print(e) } } 


Console:
 --- ignoreElements example --- Completed 





sample



With each generated element of the sampler sequence (perceived as a timer) - take the last released element of the original sequence and duplicate it in the RO, IF it has not been previously generated



 example("sampler") { let sampler = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sampler") let sequence:Observable<Int> = createSequenceWithWait([1,2,3,4], waitTime: 3).sample(sampler) sequence.subscribe { e in print(e) } } 


Console:
 --- sampler example --- 2016-04-12 18:28:20.322: sampler -> subscribed 2016-04-12 18:28:21.323: sampler -> Event Next(0) Next(1) 2016-04-12 18:28:22.324: sampler -> Event Next(1) //   RO   , ..      2016-04-12 18:28:23.323: sampler -> Event Next(2) Next(2) 2016-04-12 18:28:24.323: sampler -> Event Next(3) //   RO   , ..      2016-04-12 18:28:25.323: sampler -> Event Next(4) //   RO   , ..      2016-04-12 18:28:26.323: sampler -> Event Next(5) Next(3) ... 





single



From the initial sequence, a single element is taken; if elements> 1, an error is generated. There is a variant with a predicate.



 example("single generate error") { let sequence = Observable.of(1, 2, 3, 4).single() sequence.subscribe { e in print(e) } } example("single") { let sequence = Observable.of(1, 2, 3, 5).single { $0 % 2 == 0 } sequence.subscribe { e in print(e) } } 


Console:
 --- single generate error example --- Next(1) Error(Sequence contains more than one element.) --- single example --- Next(2) Completed 


In the first example, the original sequence turned out to be more than 1 element, so an error was generated at the time of generating the second element in SO
In the second example, the predicate conditions satisfied only 1 element, therefore no error was generated.




skip


From SO we drop the first N elements



 example("skip") { let sequence = Observable.of(1, 2, 3, 4).skip(2) sequence.subscribe { e in print(e) } } 


Console:
 --- skip example --- Next(3) Next(4) Completed 





skip (duration)



From SO, discard the first elements that were generated in the first N



 example("skip duration with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 }.skip(2, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } 


Console:
 --- skip duration with wait example --- Next(3) Next(4) Completed 





skipUntil



We reject elements that were generated from the SO before the generation of elements by the sequence passed as a parameter.



 example("skipUntil") { let firstSequence = createSequenceWithWait([1,2,3,4]) { $0 } let secondSequence = Observable.just(1) .delaySubscription(1, scheduler: MainScheduler.instance) let skippedSequence = firstSequence.skipUntil(secondSequence) skippedSequence.subscribe { e in print(e) } } 


Console:
 --- skipUntil example --- Next(3) Next(4) Completed 


The generation of elements in the secondSequence was postponed for 1 second using the delaySubscription command, so the elements from the firstSequence were duplicated in the RO only after 1 second




skipWhile



Drop the SO elements until the condition returned by the function is true.



 example("skipWhile") { let firstSequence = [1,2,3,4,0].toObservable() let skipSequence = firstSequence.skipWhile { $0 < 3 } skipSequence.subscribe { e in print(e) } } 


Console:
 --- skipWhile example --- Next(3) Next(4) Next(0) Completed 





skipWhileWithIndex


We reject elements from SO until the condition returned by the function is true. The difference from skipWhile is that as another parameter passed to the function is the index of the generated element



 example("skipWhileWithIndex") { let firstSequence = [1,2,5,0,7].toObservable() let skipSequence = firstSequence.skipWhileWithIndex{ value, idx in value < 4 || idx < 2 } skipSequence.subscribe { e in print(e) } } 


Console:
 --- skipWhileWithIndex example --- Next(5) Next(0) Next(7) Completed 





take


From SO, only the first N elements are taken.



 example("take") { let sequence = Observable.of(1, 2, 3, 4).take(2) sequence.subscribe { e in print(e) } } 


Console:
 --- take example --- Next(1) Next(2) Completed 





take (duration)



From SO only elements generated in the first N seconds are taken.



 example("take duration with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let takeSequence = sequence.take(2, scheduler: MainScheduler.instance) takeSequence.subscribe { e in print(e) } } 


Console:
 --- take duration with wait example --- Next(1) Next(2) Completed 





takeLast



Only the last N elements are taken from SO. Which means, if SO never finishes generating the elements - not a single element will fall into the RO.



 example("takeLast") { let sequence = Observable.of(1, 2, 3, 4).takeLast(2) sequence.subscribe { e in print(e) } } example("takeLast with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let takeSequence = sequence.takeLast(2) takeSequence.subscribe { e in print(e) } } 


Console:
 --- takeLast example --- Next(3) Next(4) Completed --- takeLast with wait example --- Next(3) Next(4) Completed 

The second example is provided to illustrate the delay in generating elements in an RO because of waiting for the completion of element generation in SO




takeUntil



SO is taken from elements that were released prior to the generation of elements by the sequence passed as a parameter.



 example("takeUntil") { let stopSequence = Observable.just(1) .delaySubscription(2, scheduler: MainScheduler.instance) let sequence = createSequenceWithWait([1,2,3,4]) { $0 } .takeUntil(stopSequence) sequence.subscribe { e in print(e) } } 


Console:
 --- takeUntil example --- Next(1) Next(2) Completed 





takeWhile



Elements are taken from SO as long as the condition returned by the returned function is true.



 example("takeWhile") { let sequence = [1,2,3,4].toObservable().takeWhile{ $0 < 3 } sequence.subscribe { e in print(e) } } 


Console:
 --- takeWhile example --- Next(1) Next(2) Completed 





takeWhileWithIndex



Elements are taken from SO until the condition returned by the returned function is true. The difference from takeWhile is that, as another parameter passed to the function, is the index of the generated element.



 example("takeWhileWithIndex") { let sequence = [1,2,3,4,5,6].toObservable() .takeWhileWithIndex{ (val, idx) in val % 2 == 0 || idx < 3 } sequence.subscribe { e in print(e) } } 


Console:
 --- takeWhileWithIndex example --- Next(1) Next(2) Next(3) Next(4) Completed 





throttle



From SO, only elements are taken, after which there were no new elements for N seconds.



 example("throttle") { let sequence = Observable.of(1, 2, 3, 4) .throttle(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } example("throttle with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } .throttle(0.5, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } 


Console:
 --- throttle example --- Next(4) Completed --- throttle with wait example --- Next(1) Next(2) Next(3) Next(4) Completed 


In the first case, SO generates elements without delay, so only the last element has no new elements after it.
In the second example, the elements are generated more slowly than N seconds transferred to the throttle, therefore there is a sufficient time interval behind each generated element.

Transformation






buffer



 SO = Observable<>> RO = Observable<[T]> 


Elements from SO according to certain rules are combined into arrays and are generated in RO. The parameters are count, the maximum number of elements in the array, and timeSpan the time of maximum waiting for the current array to be filled from the SO elements. Thus, the RO element is an array [T], length from 0 to count.



 example("buffer") { let varA = Variable<Int>(0) let bufferSequence = varA.asObservable() .buffer(timeSpan: 3, count: 3, scheduler: MainScheduler.instance) bufferSequence.subscribe { e in print("\(NSDate()) - \(e)") } varA.value = 1 varA.value = 2 varA.value = 3 delay(3) { varA.value = 4 varA.value = 5 delay(5) { varA.value = 6 } } } 


Console:
 --- buffer example --- 2016-04-12 16:10:58 +0000 - Next([0, 1, 2]) 2016-04-12 16:11:01 +0000 - Next([3]) 2016-04-12 16:11:04 +0000 - Next([4, 5]) 2016-04-12 16:11:07 +0000 - Next([6]) 2016-04-12 16:11:07 +0000 - Completed 

The array length was specified as 3, - as soon as 3 elements were generated - the element [0, 1, 2] was generated in RO.
After element 3 was generated, there was a delay of 3 seconds, the timeout worked, and the array was not completely filled.
The same applies to the delay after the generation of element 5




flatmap



Each SO element is transformed into a separate Observable, and all elements from [O1, O2, O3 ...] are combined into RO. The order of generation of elements in the RO depends on the time of their generation in the source [O1, O2, O3 ...] (as in the merge command)



 example("flatMap with wait") { let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 } let flatMapSequence:Observable<String> = sequence.flatMap{val in createSequenceWithWait([10,11,12], waitTime: 2) { element in "\(element) - \(val)" } } flatMapSequence.subscribe { e in print(e) } } 


Console:
 --- flatMap with wait example --- Next(10 - 0) Next(10 - 1) Next(11 - 0) Next(10 - 2) Next(11 - 1) Next(12 - 0) Next(11 - 2) Next(12 - 1) Next(12 - 2) Completed 





flatMapFirst



Each SO element becomes a separate Observable.
1) Initially, we subscribe to O1, its elements are mirrored in RO. While O1 generates elements - all subsequent Observables generated from SO are discarded, we do not subscribe to them.
2) as soon as O1 ends, if a new Observable is generated, it will be subscribed to and its elements will be duplicated in the RO.
Repeat point 1, but instead of O1 we take the last generated Observable



 example("flatMapFirst") { let sequence:Observable<Int> = Observable.of(10, 20, 30) .debug("sequence") let flatMapSequence:Observable<String> = sequence .flatMapFirst{val in Observable.of(0, 1, 2) .map{"\($0) - \(val)" } } flatMapSequence.subscribe { e in print(e) } } example("flatMapFirst with delay") { let subjectA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(1) { observer.on(.Next(20)) } delay(7) { observer.onCompleted() } return NopDisposable.instance }.debug("sA") let subjectB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sB") let subjectC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sC") let subjects = [subjectA, subjectB, subjectC] let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:4){$0} .debug("sequence") let flatMapSequence:Observable<Int> = sequence.flatMapFirst{val in return subjects[val].asObservable() }.debug("flatMapSequence") flatMapSequence.subscribe { e in print(e) } } 


Console:
 --- flatMapFirst example --- 2016-04-12 19:19:46.915: sequence -> subscribed 2016-04-12 19:19:46.916: sequence -> Event Next(10) Next(0 - 10) Next(1 - 10) Next(2 - 10) 2016-04-12 19:19:46.918: sequence -> Event Next(20) Next(0 - 20) Next(1 - 20) Next(2 - 20) 2016-04-12 19:19:46.919: sequence -> Event Next(30) Next(0 - 30) Next(1 - 30) Next(2 - 30) 2016-04-12 19:19:46.921: sequence -> Event Completed Completed 2016-04-12 19:19:46.921: sequence -> disposed --- flatMapFirst with delay example --- 2016-04-12 19:19:46.925: flatMapSequence -> subscribed 2016-04-12 19:19:46.926: sequence -> subscribed 2016-04-12 19:19:46.935: sequence -> Event Next(0) // SO  1  2016-04-12 19:19:46.935: sA -> subscribed //     Observable sA,     2016-04-12 19:19:46.936: sA -> Event Next(10) 2016-04-12 19:19:46.936: flatMapSequence -> Event Next(10) Next(10) 2016-04-12 19:19:47.936: sA -> Event Next(20) 2016-04-12 19:19:47.936: flatMapSequence -> Event Next(20) Next(20) 2016-04-12 19:19:50.926: sequence -> Event Next(1) // SO  2 ,     sA   ,    sB  ,    2016-04-12 19:19:53.935: sA -> Event Completed 2016-04-12 19:19:53.936: sA -> disposed // sA   ,    2016-04-12 19:19:55.137: sequence -> Event Next(2) // SO  3  2016-04-12 19:19:55.137: sC -> subscribed // ..      Observable ( sA  , sB -  ) -      2016-04-12 19:19:55.137: sC -> Event Next(1000) 2016-04-12 19:19:55.137: flatMapSequence -> Event Next(1000) Next(1000) 2016-04-12 19:19:56.236: sC -> Event Next(2000) 2016-04-12 19:19:56.236: flatMapSequence -> Event Next(2000) Next(2000) 2016-04-12 19:19:57.335: sC -> Event Completed 2016-04-12 19:19:57.336: sC -> disposed 2016-04-12 19:19:58.926: sequence -> Event Completed 2016-04-12 19:19:58.926: flatMapSequence -> Event Completed Completed 2016-04-12 19:19:58.926: sequence -> disposed 


The first example shows that Observable have time to generate their elements, by the time when it comes the turn to subscribe to the new Observable - this is already allowed, so elements from all Observable fall into RO
But the second example is very voluminous, but will allow to observe in detail how subscription / unsubscribe occurs and how it affects element generation




flatMapLatest



Each SO element becomes a separate Observable. We initially subscribe to O1, its elements are mirrored in RO. As soon as the next element is released from the SO and the next Observable is generated on its basis, the elements of the previous Observable are discarded, since there is an answer. Thus, in RO, elements from the last generated Observable



 example("flatMapLatest") { let sequence:Observable<Int> = Observable.of(10, 20, 30) let flatMapSequence = sequence.flatMapLatest{val in Observable.of(0, 1, 2) .map{"\($0) - \(val)" } } flatMapSequence.subscribe { e in print(e) } } example("flatMapLatest with delay") { let subjectA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(3) { observer.on(.Next(20)) } delay(5) { observer.onCompleted() } return NopDisposable.instance }.debug("sA") let subjectB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sB") let subjectC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sC") let subjects = [subjectA, subjectB, subjectC] let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:1) {$0} .debug("sequence") let flatMapSequence:Observable<Int> = sequence.flatMapLatest{val in return subjects[val].asObservable() }.debug("flatMapSequence") flatMapSequence.subscribe { e in print(e) } } 


Console:
 --- flatMapLatest example --- Next(0 - 10) Next(1 - 10) Next(2 - 10) Next(0 - 20) Next(1 - 20) Next(2 - 20) Next(0 - 30) Next(1 - 30) Next(2 - 30) Completed --- flatMapLatest with delay example --- 2016-04-12 19:30:50.309: flatMapSequence -> subscribed 2016-04-12 19:30:50.310: sequence -> subscribed 2016-04-12 19:30:50.318: sequence -> Event Next(0) // SO  1 ,      sA 2016-04-12 19:30:50.319: sA -> subscribed //   sA 2016-04-12 19:30:50.319: sA -> Event Next(10) //    2016-04-12 19:30:50.319: flatMapSequence -> Event Next(10) // flatMap    Next(10) //     RO 2016-04-12 19:30:51.310: sequence -> Event Next(1) // SO  2 ,      sA 2016-04-12 19:30:51.311: sA -> disposed //   sA     ,     2016-04-12 19:30:51.311: sB -> subscribed //     Observable sB 2016-04-12 19:30:51.311: sB -> Event Next(100) 2016-04-12 19:30:51.311: flatMapSequence -> Event Next(100) Next(100) 2016-04-12 19:30:52.310: sequence -> Event Next(2) 2016-04-12 19:30:52.311: sB -> disposed 2016-04-12 19:30:52.311: sC -> subscribed 2016-04-12 19:30:52.311: sC -> Event Next(1000) 2016-04-12 19:30:52.311: flatMapSequence -> Event Next(1000) Next(1000) 2016-04-12 19:30:53.372: sequence -> Event Completed 2016-04-12 19:30:53.372: sequence -> disposed 2016-04-12 19:30:53.372: sC -> Event Next(2000) 2016-04-12 19:30:53.372: flatMapSequence -> Event Next(2000) Next(2000) 2016-04-12 19:30:54.501: sC -> Event Completed 2016-04-12 19:30:54.501: sC -> disposed 2016-04-12 19:30:54.501: flatMapSequence -> Event Completed Completed 


The first example shows that Observable have time to generate their elements, by the time the turn comes to subscribe to the new Observable - the previous one has already generated all its elements, therefore elements from all Observable fall into the RO
In the second example, due to delays in the generation, we see that as soon as a new Observable is generated, unsubscribe from previous Observable




flatMapWithIndex



Each SO element is transformed into a separate Observable, and all elements from [O1, O2, O3 ...] are combined into RO. The order of generation of elements in the RO depends on the time of their generation in the original [O1, O2, O3 ...] (as in the merge command). The difference from flatMap is that as another parameter passed to the function is the index of the generated element.



 example("flatMapWithIndex") { let sequence:Observable<Int> = Observable.of(10, 20, 30) let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in Observable.of("A", "B", "C").map{"index: (\(idx)) - \($0) - \(val)"} } print(flatMapSequence.dynamicType) flatMapSequence.subscribe { e in print(e) } } example("flatMapWithIndex with wait") { let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 } let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in createSequenceWithWait(["A","B","C"], waitTime: 2) { element in "index: (\(idx)) - \(element) - \(val)" } } print(flatMapSequence.dynamicType) flatMapSequence.subscribe { e in print(e) } } 


Console:
 FlatMapWithIndex<Int, Observable<String>> Next(index: (0) - A - 10) Next(index: (0) - B - 10) Next(index: (0) - C - 10) Next(index: (1) - A - 20) Next(index: (1) - B - 20) Next(index: (1) - C - 20) Next(index: (2) - A - 30) Next(index: (2) - B - 30) Next(index: (2) - C - 30) Completed --- flatMapWithIndex with wait example --- FlatMapWithIndex<Int, Observable<String>> Next(index: (0) - A - 0) Next(index: (1) - A - 1) Next(index: (0) - B - 0) Next(index: (2) - A - 2) Next(index: (1) - B - 1) Next(index: (0) - C - 0) Next(index: (2) - B - 2) Next(index: (1) - C - 1) Next(index: (2) - C - 2) Completed 





map



 Observable<T> -> Observable<U> 


SO elements are converted without changing the order of their generation. You can change not only the value, but also the type of elements.



 example("map") { let sequence = Observable.of(1, 2, 3) .map{ "\($0 * 5)" } sequence.subscribe { e in print(e) } } 


Console:
 --- map example --- Next(5) Next(10) Next(15) Completed 





mapWithIndex



 Observable<T> -> Observable<U> 


SO elements are converted without changing the order of their generation. You can change not only the value, but also the type of elements. The difference from map is that as another parameter passed to the function is the index of the generated element.



 example("mapWithIndex") { let sequence = Observable.of("A", "B", "C") .mapWithIndex({ "\($0) / \($1)" }) sequence.subscribe { e in print(e) } } 


Console:
 --- mapWithIndex example --- Next(A / 0) Next(B / 1) Next(C / 2) Completed 





window



 SO = Observable<T> RO = Observable<Observable<T>> 


An element from SO is transferred according to certain rules to the generated new Observable. The parameters are count, the maximum number of elements that will be generated by each Observable, and timeSpan is the maximum time to wait for the current Observable to be filled from the SO elements. Thus, the RO element, is an Observable number of generated elements of which is from 0 to N. The main difference from bufffer is that SO elements are mirrored by generated Observables instantly, and in the case of buffer, we have to wait for the maximum time specified as a parameter (if the buffer does not fill up before )



 example("window") { let varA = Variable<Int>(0) let bufferSequence:Observable<Observable<Int>> = varA.asObservable() .window(timeSpan: 3, count: 3, scheduler: MainScheduler.instance) .debug("bufferSequence") bufferSequence.subscribe { e in if case .Next(let observable) = e { print("\(NSDate()) -   Observable") observable.subscribe { val in print(val) } } } varA.value = 1 varA.value = 2 varA.value = 3 delay(4) { varA.value = 4 varA.value = 5 delay(4) { varA.value = 6 } } } 


Console:
 --- window example --- 2016-04-12 19:51:54.372: bufferSequence -> subscribed 2016-04-12 19:51:54.373: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:54 +0000 -   Observable Next(0) Next(1) Next(2) Completed 2016-04-12 19:51:54.377: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:54 +0000 -   Observable Next(3) Completed 2016-04-12 19:51:57.378: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:57 +0000 -   Observable Next(4) Next(5) Completed 2016-04-12 19:52:00.380: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:52:00 +0000 -   Observable Next(6) Completed 2016-04-12 19:52:02.895: bufferSequence -> Event Completed 


The example uses time delays which helps to achieve partial fullness generated by the Observable.

Math and aggregation operators






reduce



Each SO element is converted using the passed function, the result of the operation is passed as a parameter to the function in the next step. As soon as SO generates a terminal state, the RO generates a result, i.e. RO will generate only one element.



 example("reduce") { let sequence = Observable.of(1, 2, 3, 4) .reduce(1) { $0 * $1 } sequence.subscribe { e in print(e) } } 


Console:
 --- reduce example --- Next(24) Completed 





scan



Each SO element is converted using the passed function, the result of the operation is generated in RO, but other than that it is passed as a parameter to the function in the next step. Unlike reduce, the number of elements in RO is equal to the number of elements in SO.



 example("scan") { let sequence = Observable.of(1, 2, 3).scan(10) { result, element in return result + element } sequence.subscribe { e in print(e) } } example("scan multiply") { let sequence = Observable.of(2, 3, 5).scan(10) { result, element in return result * element } sequence.subscribe { e in print(e) } } 


Console:
 --- scan example --- Next(11) Next(13) Next(16) Completed --- scan multiply example --- Next(20) Next(60) Next(300) Completed 





toArray



 SO = Observable<T> RO = Observable<[T]> 


All elements from SO after the generation of the terminal state are combined into an array and generated RO



 example("toArray") { let sequence = Observable.of(1, 2, 3) let arraySequence = sequence.toArray() arraySequence.subscribe { e in print(e) } } 


Console:
 --- toArray example --- Next([1, 2, 3]) Completed 


Work with errors






catchError



Allows you to intercept the generated error from SO and replace it with a new Observable, which will now generate elements



 example("with catchError") { let sequenceWithError = Observable<Int>.create { observer in observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Next(4)) observer.onError(RxError.Unknown) observer.on(.Next(5)) return NopDisposable.instance } let sequenceIgnoreError = sequenceWithError.catchError{ error in return Observable.of(10, 11, 12) } sequenceIgnoreError.subscribe { e in print(e) } } 


Console:
 --- with catchError example --- Next(1) Next(2) Next(3) Next(4) Next(10) Next(11) Next(12) Completed 


After element 4 was generated, the error RxError.Unknown was generated, but we intercepted it and returned a new Observable instead.




catchErrorJustReturn



Allows you to intercept the generated error from SO and replace it with the specified element, then SO generates Completed



 example("with catchErrorJustReturn") { let sequenceWithError = Observable.of(1, 2, 3, 4) .concat(Observable.error(RxError.Unknown)) .concat(Observable.just(5)) let sequenceIgnoreError = sequenceWithError.catchErrorJustReturn(-1) sequenceIgnoreError.subscribe { e in print(e) } } 


Console:
 --- with catchErrorJustReturn example --- Next(1) Next(2) Next(3) Next(4) Next(-1) Completed 


After generating element 4, the error RxError.Unknown was generated, but we intercepted it and returned element -1 in return.




retry



Allows you to intercept the generated error from the SO and, depending on the parameter passed, try to start the SO c from the beginning the necessary number of times in the hope that the error will not happen again



 example("retry full sequence") { let sequenceWithError = Observable.of(1, 2, 3, 4).concat(Observable.error(RxError.Unknown)) let wholeSequenceWithErrorRetry = sequenceWithError.retry(2) wholeSequenceWithErrorRetry.subscribe { e in print(e) } } 


Console:
 --- retry full sequence example --- Next(1) Next(2) Next(3) Next(4) Next(1) Next(2) Next(3) Next(4) Error(Unknown error occured.) 

Sincethe retry (2) operator was applied - we repeated the transcript to SO once, but the error was repeated, and was generated in the RO.
Thus, the retry (1) would not do a single repetition.




retryWhen



Allows you to intercept the generated error from SO and depending on the type of error we either re-generate an error that forwards into the RO and this completes the execution, or we generate an Observable (tryObservable), the generation of each correct element of which will re-subscribe to SO, in the hope that the error will disappear. If tryObservable ends with an error, it is forwarded to the RO and the execution ends there.



 example("retryWhen") { var counter = 0 let sequenceWithError = Observable<Int>.create { observer in observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Next(4)) counter += 1 if counter < 3 { observer.onError(RxError.Unknown) } /*else { observer.onError(RxError.Overflow) }*/ observer.on(.Next(5)) return NopDisposable.instance }.debug("with error") let sequenceWithoutError = Observable<Int>.create { observer in observer.on(.Next(10)) //observer.onError(RxError.NoElements) return NopDisposable.instance }.debug("without error") let retrySequence = sequenceWithError.retryWhen{ (error: Observable<RxError>) -> Observable<Int> in let seq:Observable<Int> = error.flatMap { (generatedError: RxError) -> Observable<Int> in if case .Unknown = generatedError { return sequenceWithoutError } return Observable<Int>.error(generatedError) } return seq }//.debug() retrySequence.subscribe { e in print(e) } } 


Console:
 --- retryWhen example --- 2016-04-12 20:18:04.484: with error -> subscribed 2016-04-12 20:18:04.485: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.486: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.486: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.487: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.487: with error -> Event Error(Unknown error occured.) 2016-04-12 20:18:04.488: without error -> subscribed 2016-04-12 20:18:04.488: without error -> Event Next(10) 2016-04-12 20:18:04.489: with error -> disposed 2016-04-12 20:18:04.489: with error -> subscribed 2016-04-12 20:18:04.489: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.490: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.490: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.490: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.491: with error -> Event Error(Unknown error occured.) 2016-04-12 20:18:04.491: without error -> subscribed 2016-04-12 20:18:04.492: without error -> Event Next(10) 2016-04-12 20:18:04.492: with error -> disposed 2016-04-12 20:18:04.492: with error -> subscribed 2016-04-12 20:18:04.493: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.493: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.493: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.494: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.494: with error -> Event Next(5) Next(5) 


I built the increment of the variable i into the generation of sequenceWithError so that on the 3rd attempt the error disappeared. If you uncomment the generation of the error RxError.Overflow - we will not intercept it in the operator retryWhen and forward it to RO

Operators for working with Connectable Observable






multicast



Allows you to proxy items from the original SO to the Subject passed as a parameter. You need to subscribe to this Subject, the generation of Subject elements will begin after calling the connect operator.

 example("multicast") { let subject = PublishSubject<Int>() let firstSequence = createSequenceWithWait([0,1,2,3,4,5]) { $0 } .multicast(subject) delay(2) { _ = subject.subscribe { e in print("first: \(e)") } } delay(3) { _ = subject.subscribe { e in print("second: \(e)") } } firstSequence.connect() } 


Console:
 --- multicast example --- first: Next(2) first: Next(3) second: Next(3) first: Next(4) second: Next(4) first: Next(5) second: Next(5) first: Completed second: Completed 





publish



publish = multicast + replay subject
Allows you to create Connectable Observable that do not generate events even after subscribe. To start generating such an Observable, you must issue the connect command. This allows you to subscribe several Observers to one Observable and start generating items at the same time, regardless of when subscribe was performed.



 example("subscribe connectable sequnce with connect") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence").publish() var disposable1: Disposable! var disposable2: Disposable! disposable1 = sequence.subscribe { e in print("first: \(e)") } delay(2) { disposable2 = sequence.subscribe { e in print("second: \(e)") } } delay(4) { sequence.connect() } delay(8) { disposable1.dispose() disposable2.dispose() } } 


Console:
 --- subscribe connectable sequnce with connect example --- 2016-04-12 21:35:32.130: sequence -> subscribed 2016-04-12 21:35:33.131: sequence -> Event Next(0) first: Next(0) second: Next(0) 2016-04-12 21:35:34.131: sequence -> Event Next(1) first: Next(1) second: Next(1) 2016-04-12 21:35:35.132: sequence -> Event Next(2) first: Next(2) second: Next(2) 2016-04-12 21:35:36.132: sequence -> Event Next(3) 2016-04-12 21:35:37.132: sequence -> Event Next(4) 


As you can see, even though the subscription was made at different times, until you called the connect command, the generation of elements did not start. But thanks to the debug command, you can see that even after everyone unsubscribed - the sequence continued to generate elements




refCount



Allows you to create a normal Observable from Connectable. After the first call to subscribe to this normal Observable, Connectable subscribes to SO.
It turns out something like
publishSequence = SO.publish ()
refCountSequence = publishSequence.refCount ()

SO will continue to generate elements as long as there is at least one subscribed to refCountSequence. As soon as all the subscriptions to refCountSequence are canceled, the unsubscribe and publishSequence from SO occurs.



 example("with refCount") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence") let publishSequence = sequence.publish() //  Connectable Observable let refCountSequence = publishSequence.refCount().debug("refCountSequence") let subscription1 = refCountSequence.subscribe{ e in print("first: \(e)") } let subscription2 = refCountSequence.subscribe{ e in print("second: \(e)") } delay(2) { subscription1.dispose() //     } delay(3) { subscription2.dispose() //     ,    Observable    refCount .   Observable   SO } delay(5) { _ = refCountSequence.subscribe { e in print("after: \(e)") } } } 


Console:
 --- with refCount example --- 2016-04-12 20:25:24.154: refCountSequence -> subscribed //   refCountSequence  1  2016-04-12 20:25:24.155: sequence -> subscribed //   publishSequence   SO 2016-04-12 20:25:24.156: refCountSequence -> subscribed // //   refCountSequence  2  2016-04-12 20:25:25.156: sequence -> Event Next(0) 2016-04-12 20:25:25.156: refCountSequence -> Event Next(0) first: Next(0) 2016-04-12 20:25:25.156: refCountSequence -> Event Next(0) second: Next(0) 2016-04-12 20:25:26.156: sequence -> Event Next(1) 2016-04-12 20:25:26.156: refCountSequence -> Event Next(1) first: Next(1) 2016-04-12 20:25:26.157: refCountSequence -> Event Next(1) second: Next(1) 2016-04-12 20:25:26.353: refCountSequence -> disposed //   refCountSequence  1  2016-04-12 20:25:27.156: sequence -> Event Next(2) // SO   , ..      refCountSequence 2016-04-12 20:25:27.157: refCountSequence -> Event Next(2) second: Next(2) 2016-04-12 20:25:27.390: refCountSequence -> disposed //   refCountSequence  2  2016-04-12 20:25:27.390: sequence -> disposed //   refCountSequence   ,  publishSequence   SO 2016-04-12 20:25:29.157: refCountSequence -> subscribed //   refCountSequence  2016-04-12 20:25:29.157: sequence -> subscribed // ..    - publishSequence    SO 2016-04-12 20:25:30.158: sequence -> Event Next(0) 2016-04-12 20:25:30.159: refCountSequence -> Event Next(0) after: Next(0) 2016-04-12 20:25:31.158: sequence -> Event Next(1) 2016-04-12 20:25:31.159: refCountSequence -> Event Next(1) after: Next(1) 2016-04-12 20:25:32.159: sequence -> Event Next(2) 2016-04-12 20:25:32.159: refCountSequence -> Event Next(2) after: Next(2) ....     





replay



If SO is normal, converts it to Connectable. And after that, all who subscribe to it after the call to connect () will instantly receive the last generated N elements as the first elements. Even if everyone unsubscribes, Connectable will continue to generate elements.



 example("replay") { let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(2) let firstDisposable = firstSequence.subscribe { e in print("first: \(e)") } firstSequence.connect() var secondDisposable: Disposable! delay(3) { secondDisposable = firstSequence.subscribe { e in print("second: \(e)") } } delay(4) { firstDisposable.dispose() } delay(5) { secondDisposable.dispose() } delay(7) { firstSequence.subscribe { e in print("third: \(e)") } } } 


Console:
 --- replay example --- first: Next(0) first: Next(1) first: Next(2) second: Next(1) second: Next(2) first: Next(3) //        1  second: Next(3) second: Next(4) //       2 ,  SO    //        third: Next(5) third: Next(6) 





replayAll



If SO is normal, converts it to Connectable. All who subscribe to it after the call to connect () first receive all the elements that were generated earlier. Even if everyone unsubscribes, Connectable will continue to generate elements.



 example("replayAll") { let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replayAll() let firstDisposable = firstSequence.subscribe { e in print("first: \(e)") } firstSequence.connect() var secondDisposable: Disposable! delay(3) { secondDisposable = firstSequence.subscribe { e in print("second: \(e)") } } delay(4) { firstDisposable.dispose() } delay(5) { secondDisposable.dispose() } delay(7) { firstSequence.subscribe { e in print("third: \(e)") } } } 


Console:
 --- replayAll example --- first: Next(0) first: Next(1) first: Next(2) second: Next(0) second: Next(1) second: Next(2) first: Next(3) //        1  second: Next(3) second: Next(4) //       2 ,  SO    //        third: Next(0) third: Next(1) third: Next(2) third: Next(3) third: Next(4) third: Next(5) third: Next(6) third: Next(7) 


Auxiliary methods






debug



RO completely duplicates SO, but all events with time stamp are logged.

 example("debug") { let sequence = Observable<AnyObject>.of(1, 2, 3) .debug("sequence") .subscribe{} } 


Console:
 --- debug example --- 2016-04-12 21:41:08.467: sequence -> subscribed 2016-04-12 21:41:08.469: sequence -> Event Next(1) 2016-04-12 21:41:08.469: sequence -> Event Next(2) 2016-04-12 21:41:08.469: sequence -> Event Next(3) 2016-04-12 21:41:08.469: sequence -> Event Completed 





do / doOnNext



RO completely duplicates SO, but we build in an interceptor of all events from the SO life cycle

 example("simple doOn") { let firstSequence = Observable.of(1,2).doOn{e in print(e) } firstSequence.subscribeNext{ e in //        subscribe,  ,   doOn } } 


Console:
 --- simple doOn example --- Next(1) Next(2) Completed 





delaySubscription



Duplicate elements from SO to RO, but with a time delay specified as parameter



 example("delaySubscription") { let sequence = Observable.of(1, 2, 3).debug("sequence") .delaySubscription(3, scheduler: MainScheduler.instance).debug("delayed sequence") sequence.subscribe { e in print(e) } } 


Console:
 --- delaySubscription example --- 2016-04-12 21:44:05.226: delayed sequence -> subscribed //   delayed sequence   5  2016-04-12 21:44:08.228: sequence -> subscribed //    SO     3  2016-04-12 21:44:08.229: sequence -> Event Next(1) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(1) Next(1) 2016-04-12 21:44:08.229: sequence -> Event Next(2) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(2) Next(2) 2016-04-12 21:44:08.229: sequence -> Event Next(3) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(3) Next(3) 2016-04-12 21:44:08.230: sequence -> Event Completed 2016-04-12 21:44:08.230: delayed sequence -> Event Completed Completed 2016-04-12 21:44:08.230: sequence -> disposed 





observeOn



Indicates which Scheduler to do its Observer work, especially critical when working with a GUI

 example("without observeOn") { let sequence = Observable<AnyObject>.of(1, 2, 3) sequence.subscribe { e in print("\(NSThread.currentThread())\(e)") } } example("with observeOn") { let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) let sequence = Observable<AnyObject>.of(1, 2, 3) sequence.observeOn(ConcurrentDispatchQueueScheduler.init(queue: queue)) .subscribe { e in print("\(NSThread.currentThread())\(e)") } } 


Console:
 --- without observeOn example --- <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(1) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(2) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(3) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Completed --- with observeOn example --- <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(1) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(2) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(3) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Completed 


As you can see, thanks to observeOn, we were able to execute inside subscribe code on another thread.




subscribe



The statement connecting Observable with Observer allows you to subscribe to all events from the Observable.

 example("subscribe") { let firstSequence = Observable.of(1) firstSequence.subscribe { e in print(e) } firstSequence.subscribeCompleted { print("!completed") } firstSequence.subscribeNext{next in print("next: \(next)") } } example("subscribeNext") { let firstSequence = Observable.of(1) firstSequence.subscribeNext{next in print("next: \(next)") } } example("subscribeCompleted") { let firstSequence = Observable.of(1) firstSequence.subscribeCompleted { print("!completed") } } example("subscribeError") { let firstSequence = Observable<Int>.error(RxError.ArgumentOutOfRange) firstSequence.subscribeError {e in print("!error \(e)") } } 


Console:
 --- subscribe example --- Next(1) Completed !completed next: 1 --- subscribeNext example --- next: 1 --- subscribeCompleted example --- !completed --- subscribeError example --- !error Argument out of range. 


4 forms are shown: subscribe, subscribeNext, subscribeCompleted, subscribeError




subscribeOn



Indicates which Scheduler to do its job Observable, especially critical when working with GUI

 example("with subscribeOn and observeOn") { let queue1 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) let queue2 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) print("init thread: \(NSThread.currentThread())") let sequence = Observable<Int>.create { observer in print("observable thread: \(NSThread.currentThread())") observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Completed) return NopDisposable.instance } .subscribeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue1")) .observeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue2")) sequence.subscribe { e in print("observer thread: \(NSThread.currentThread()) \(e)") } } 


Console:
 --- with subscribeOn and observeOn example --- init thread: <NSThread: 0x7ff6914132b0>{number = 1, name = main} //      #1 observable thread: <NSThread: 0x7ff6916a0cb0>{number = 4, name = (null)} //   subscribeOn   Observable    #4   #1 observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(1) //   observer'     1    4,      observeOn.      observeOn,        subscribeOn #4 observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(2) observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Next(3) observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Completed 





timeout



Duplicate elements from SO to RO, but if during the specified time SO did not generate any element - RO generates an error



 example("failed timeout ") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let timeoutSequence = sequence.timeout(0.9, scheduler: MainScheduler.instance) timeoutSequence.subscribe { e in print(e) } } 


Console:
 --- failed timeout example --- Next(1) Error(Sequence timeout.) 





using



It allows instructing the Observable to create a resource that will live only as long as the RO is alive, 2 factories are passed as parameters, one generates a resource, the second - Observable, which will have a single lifetime

 class FakeDisposable: Disposable { func dispose() { print("disposed") } } example("using") { let sequence = Observable.using({ return FakeDisposable() }, observableFactory: { d in Observable.just(1) }) as Observable<Int> sequence.subscribe { e in print(e) } } 


Console:
 --- using example --- Next(1) Completed disposed 


As you can see, after Observable finished generating elements, the dispose method was called for our FakeDisposable resource.

Source: https://habr.com/ru/post/281292/


All Articles