public enum ResultType { case Infinite case Completed case Error } public func createSequenceWithWait<T, U>(array: [T], waitTime: Int64 = 1, resultType: ResultType = .Completed, describer: ((value: T) -> U)? = nil) -> Observable<U> { return Observable<U>.create{ observer in for (idx, letter) in array.enumerate() { let time = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(idx) * Int64(NSEC_PER_SEC)) dispatch_after(time, dispatch_get_main_queue()) { if let describer = describer { let value = describer(value: letter) observer.on(.Next(value)) } else { observer.on(.Next(letter as! U)) } } } if resultType != .Infinite { let allTime = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(array.count) * Int64(NSEC_PER_SEC)) dispatch_after(allTime, dispatch_get_main_queue()) { switch resultType { case .Completed: observer.onCompleted() case .Error: observer.onError(RxError.Unknown) default: break } } } return NopDisposable.instance } }
public func example(description: String, action: () -> ()) { print("\n--- \(description) example ---") action() }
import XCPlayground XCPlaygroundPage.currentPage.needsIndefiniteExecution = true
example("as Observable") { let variable = Variable<Int>(0) variable.asObservable().subscribe { e in print(e) } variable.value = 1 }
--- as Observable example --- Next(0) Next(1) Completed
example("create") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let multipleSequence = Observable<Observable<AnyObject>>.create { observer in observer.on(.Next(firstSequence)) observer.on(.Next(secondSequence)) return NopDisposable.instance } let concatSequence = multipleSequence.concat() concatSequence.subscribe { e in print(e) } }
--- create example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C)
example("without deferred") { var i = 1 let justObservable = Observable.just(i) i = 2 _ = justObservable.subscribeNext{ print ("i = \($0)") } } example("with deferred") { var i = 1 let deferredJustObservable = Observable.deferred{ Observable.just(i) } i = 2 _ = deferredJustObservable.subscribeNext{ print ("i = \($0)") } }
--- without deferred example --- i = 1 --- with deferred example --- i = 2
example("empty") { let sequence = Observable<Int>.empty() sequence.subscribe { e in print(e) } }
--- empty example --- Completed
example("error") { let sequence = Observable<Int> .error(RxError.Unknown) sequence.subscribe { e in print(e) } }
--- error example --- Error(Unknown error occured.)
example("interval") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
--- interval example --- Next(0) Next(1) Next(2) Next(3) Next(4) ....
example("just") { let sequence = Observable.just(100) sequence.subscribe { e in print(e) } }
--- just example --- Next(100) Completed
example("never") { let sequence = Observable<Int>.never() sequence.subscribe { e in print(e) } }
--- never example ---
example("simple of") { let sequence = Observable.of(1, 2) sequence.subscribe { e in print(e) } } example("of for Observables") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } }
--- simple of example --- Next(1) Next(2) Completed --- of for Observables example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed
example("range") { let sequence = Observable.range(start: 5, count: 3) sequence.subscribe { e in print(e) } }
--- range example --- Next(5) Next(6) Next(7) Completed
example("repeatElement") { let sequence = Observable.repeatElement(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
--- repeatElement example --- Next(1) Next(2) Next(3) .....
example("timer") { let sequence = Observable<Int64>.timer(2, period: 3, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
--- timer example --- Next(0) Next(1) Next(2)
SO = [Observable<T>] SO1, SO2 = Observable<T> RO = Observable<T>
example("amb") { let subjectA = PublishSubject<Int>() let subjectB = PublishSubject<Int>() let subjectC = PublishSubject<Int>() let subjectD = PublishSubject<Int>() let ambSequence = [subjectA, subjectB, subjectC, subjectD].amb() ambSequence.subscribe { e in print(e) } subjectC.onNext(0) subjectA.onNext(3) subjectB.onNext(102) subjectC.onNext(1) subjectD.onNext(45) }
--- amb example --- Next(0) Next(1)
SO = SO1, SO2,... SON = Observable<T> RO = Observable<f(T,T)>
example("combineLatest") { let firstSequence = createSequenceWithWait([1,2,3], waitTime: 2) { element in "\(element)" }.debug("firstSequence") let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 1) { element in "\(element)" } .delaySubscription(3, scheduler: MainScheduler.instance) .debug("secondSequence") let concatSequence = Observable.combineLatest(firstSequence, secondSequence) { first, second -> String in "\(first) - \(second)" } concatSequence.subscribe { e in print(e) } }
--- combineLatest example --- 2016-04-12 16:59:35.421: firstSequence -> subscribed 2016-04-12 16:59:35.422: secondSequence -> subscribed 2016-04-12 16:59:35.434: firstSequence -> Event Next(1) 2016-04-12 16:59:37.423: firstSequence -> Event Next(2) 2016-04-12 16:59:38.423: secondSequence -> Event Next(A) Next(2 - A) 2016-04-12 16:59:39.423: firstSequence -> Event Next(3) Next(3 - A) 2016-04-12 16:59:39.522: secondSequence -> Event Next(B) Next(3 - B) 2016-04-12 16:59:40.622: secondSequence -> Event Next(C) Next(3 - C) 2016-04-12 16:59:41.722: firstSequence -> Event Completed 2016-04-12 16:59:41.722: firstSequence -> disposed 2016-04-12 16:59:41.722: secondSequence -> Event Completed 2016-04-12 16:59:41.722: secondSequence -> disposed Completed
SO = Observable<Observable<T>> SO1, SO2 = Observable<T> RO = Observable<T>
example("concat object method") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let concatSequence = firstSequence.concat(secondSequence) concatSequence.subscribe { e in print(e) } } example("concat from array") { let firstSequence = Observable.of(1,2,3) let secondSequence = Observable.of(4,5,6) let concatSequence = Observable.of(firstSequence, secondSequence) .concat() concatSequence.subscribe { e in print(e) } }
--- concat object method example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed --- concat from array example --- Next(1) Next(2) Next(3) Next(4) Next(5) Next(6) Completed
SO = Observable<Observable<T>> RO = Observable<T>
example("simple merge") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } } example("merge with wait") { let firstSequence = createSequenceWithWait([1,2,3]) { element in "\(element)" } let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 2) { element in "\(element)" } let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } }
--- simple merge example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed --- merge with wait example --- Next(1) Next(A) Next(2) Next(3) Next(B) Next(C) Completed
SO = Observable<T> RO = Observable<T>
example("startWith") { let sequence = Observable.of(1, 2, 3).startWith(0) sequence.subscribe { e in print(e) } }
--- startWith example --- Next(0) Next(1) Next(2) Next(3) Completed
SO = Observable<Observable<T>> RO = Observable<T>
example("switchLatest") { let varA = Variable<Int>(0) let varB = Variable<Int>(100) let proxyVar = Variable(varA.asObservable()) let concatSequence = proxyVar.asObservable().switchLatest() concatSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 3 proxyVar.value = varB.asObservable() varB.value = 4 varA.value = 5 } example("switchLatest") { let observableA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(3) { observer.on(.Next(20)) } delay(5) { observer.onCompleted() } return NopDisposable.instance }.debug("oA") let observableB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("oB") let observableC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("oC") let subjects = [observableA, observableB, observableC] let sequence:Observable<Observable<Int>> = createSequenceWithWait([observableA, observableB, observableC],waitTime:1) {$0} let switchLatestSequence:Observable<Int> = sequence.switchLatest() switchLatestSequence.subscribe { e in print(e) } }
--- switchLatest example --- Next(0) Next(1) Next(2) Next(3) Next(4) Completed --- switchLatest example --- 2016-04-12 17:15:22.710: oA -> subscribed 2016-04-12 17:15:22.711: oA -> Event Next(10) Next(10) 2016-04-12 17:15:23.797: oA -> disposed // oB 2016-04-12 17:15:23.797: oB -> subscribed 2016-04-12 17:15:23.797: oB -> Event Next(100) Next(100) 2016-04-12 17:15:24.703: oB -> disposed // oC 2016-04-12 17:15:24.703: oC -> subscribed 2016-04-12 17:15:24.703: oC -> Event Next(1000) Next(1000) 2016-04-12 17:15:25.800: oC -> Event Next(2000) Next(2000) 2016-04-12 17:15:26.703: oC -> Event Completed 2016-04-12 17:15:26.703: oC -> disposed Completed
SO1, SO2 = Observable<T> RO = Observable<f(T,T)>
example("withLatestFrom") { let varA = Variable<Int>(0) let varB = Variable<Int>(10) let withLatestFromSequence = varA.asObservable().withLatestFrom(varB.asObservable()) { "\($0) - \($1)" } withLatestFromSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 20 varB.value = 30 varA.value = 5 varA.value = 6 }
--- withLatestFrom example --- Next(0 - 10) Next(1 - 10) Next(2 - 10) Next(5 - 30) Next(6 - 30) Completed
SO = Observable<Observable<T>> RO = Observable<f(T,T)>
example("zip with simple Variable") { let varA = Variable<Int>(0) let varB = Variable<Int>(10) let zippedSequence = Observable.zip(varA.asObservable(), varB.asObservable()) { "\($0) - \($1)" } zippedSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 20 varB.value = 30 varA.value = 3 varA.value = 4 } example("zip with PublishSubject") { let subjectA = PublishSubject<Int>() let subjectB = PublishSubject<Int>() let zippedSequence = Observable.zip(subjectA, subjectB) { "\($0) - \($1)" } zippedSequence.subscribe { e in print(e) } subjectA.onNext(0) subjectA.onNext(1) subjectA.onNext(2) subjectB.onNext(100) subjectB.onNext(101) subjectA.onNext(3) subjectB.onNext(102) subjectA.onNext(4) }
--- zip with simple Variable example --- Next(0 - 10) Next(1 - 20) Next(2 - 30) Completed --- zip with PublishSubject example --- Next(0 - 100) Next(1 - 101) Next(2 - 102)
example("distinctUntilChanged") { let sequence = Observable.of(1, 2, 2, 3, 4, 4, 4, 1).distinctUntilChanged() sequence.subscribe { e in print(e) } }
--- distinctUntilChanged example --- Next(1) Next(2) Next(3) Next(4) Next(1) Completed
example("elementAt") { let sequence = Observable.of(0, 10, 20, 30, 40) .elementAt(2) sequence.subscribe { e in print(e) } }
--- elementAt example --- Next(20) Completed
example("filter") { let sequence = Observable.of(1, 20, 3, 40) .filter{ $0 > 10} sequence.subscribe { e in print(e) } }
--- filter example --- Next(20) Next(40) Completed
example("ignoreElements") { let sequence = Observable.of(1, 2, 3, 4) .ignoreElements() sequence.subscribe { e in print(e) } }
--- ignoreElements example --- Completed
example("sampler") { let sampler = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sampler") let sequence:Observable<Int> = createSequenceWithWait([1,2,3,4], waitTime: 3).sample(sampler) sequence.subscribe { e in print(e) } }
--- sampler example --- 2016-04-12 18:28:20.322: sampler -> subscribed 2016-04-12 18:28:21.323: sampler -> Event Next(0) Next(1) 2016-04-12 18:28:22.324: sampler -> Event Next(1) // RO , .. 2016-04-12 18:28:23.323: sampler -> Event Next(2) Next(2) 2016-04-12 18:28:24.323: sampler -> Event Next(3) // RO , .. 2016-04-12 18:28:25.323: sampler -> Event Next(4) // RO , .. 2016-04-12 18:28:26.323: sampler -> Event Next(5) Next(3) ...
example("single generate error") { let sequence = Observable.of(1, 2, 3, 4).single() sequence.subscribe { e in print(e) } } example("single") { let sequence = Observable.of(1, 2, 3, 5).single { $0 % 2 == 0 } sequence.subscribe { e in print(e) } }
--- single generate error example --- Next(1) Error(Sequence contains more than one element.) --- single example --- Next(2) Completed
example("skip") { let sequence = Observable.of(1, 2, 3, 4).skip(2) sequence.subscribe { e in print(e) } }
--- skip example --- Next(3) Next(4) Completed
example("skip duration with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 }.skip(2, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
--- skip duration with wait example --- Next(3) Next(4) Completed
example("skipUntil") { let firstSequence = createSequenceWithWait([1,2,3,4]) { $0 } let secondSequence = Observable.just(1) .delaySubscription(1, scheduler: MainScheduler.instance) let skippedSequence = firstSequence.skipUntil(secondSequence) skippedSequence.subscribe { e in print(e) } }
--- skipUntil example --- Next(3) Next(4) Completed
example("skipWhile") { let firstSequence = [1,2,3,4,0].toObservable() let skipSequence = firstSequence.skipWhile { $0 < 3 } skipSequence.subscribe { e in print(e) } }
--- skipWhile example --- Next(3) Next(4) Next(0) Completed
example("skipWhileWithIndex") { let firstSequence = [1,2,5,0,7].toObservable() let skipSequence = firstSequence.skipWhileWithIndex{ value, idx in value < 4 || idx < 2 } skipSequence.subscribe { e in print(e) } }
--- skipWhileWithIndex example --- Next(5) Next(0) Next(7) Completed
example("take") { let sequence = Observable.of(1, 2, 3, 4).take(2) sequence.subscribe { e in print(e) } }
--- take example --- Next(1) Next(2) Completed
example("take duration with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let takeSequence = sequence.take(2, scheduler: MainScheduler.instance) takeSequence.subscribe { e in print(e) } }
--- take duration with wait example --- Next(1) Next(2) Completed
example("takeLast") { let sequence = Observable.of(1, 2, 3, 4).takeLast(2) sequence.subscribe { e in print(e) } } example("takeLast with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let takeSequence = sequence.takeLast(2) takeSequence.subscribe { e in print(e) } }
--- takeLast example --- Next(3) Next(4) Completed --- takeLast with wait example --- Next(3) Next(4) Completed
example("takeUntil") { let stopSequence = Observable.just(1) .delaySubscription(2, scheduler: MainScheduler.instance) let sequence = createSequenceWithWait([1,2,3,4]) { $0 } .takeUntil(stopSequence) sequence.subscribe { e in print(e) } }
--- takeUntil example --- Next(1) Next(2) Completed
example("takeWhile") { let sequence = [1,2,3,4].toObservable().takeWhile{ $0 < 3 } sequence.subscribe { e in print(e) } }
--- takeWhile example --- Next(1) Next(2) Completed
example("takeWhileWithIndex") { let sequence = [1,2,3,4,5,6].toObservable() .takeWhileWithIndex{ (val, idx) in val % 2 == 0 || idx < 3 } sequence.subscribe { e in print(e) } }
--- takeWhileWithIndex example --- Next(1) Next(2) Next(3) Next(4) Completed
example("throttle") { let sequence = Observable.of(1, 2, 3, 4) .throttle(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } example("throttle with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } .throttle(0.5, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
--- throttle example --- Next(4) Completed --- throttle with wait example --- Next(1) Next(2) Next(3) Next(4) Completed
SO = Observable<>> RO = Observable<[T]>
example("buffer") { let varA = Variable<Int>(0) let bufferSequence = varA.asObservable() .buffer(timeSpan: 3, count: 3, scheduler: MainScheduler.instance) bufferSequence.subscribe { e in print("\(NSDate()) - \(e)") } varA.value = 1 varA.value = 2 varA.value = 3 delay(3) { varA.value = 4 varA.value = 5 delay(5) { varA.value = 6 } } }
--- buffer example --- 2016-04-12 16:10:58 +0000 - Next([0, 1, 2]) 2016-04-12 16:11:01 +0000 - Next([3]) 2016-04-12 16:11:04 +0000 - Next([4, 5]) 2016-04-12 16:11:07 +0000 - Next([6]) 2016-04-12 16:11:07 +0000 - Completed
example("flatMap with wait") { let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 } let flatMapSequence:Observable<String> = sequence.flatMap{val in createSequenceWithWait([10,11,12], waitTime: 2) { element in "\(element) - \(val)" } } flatMapSequence.subscribe { e in print(e) } }
--- flatMap with wait example --- Next(10 - 0) Next(10 - 1) Next(11 - 0) Next(10 - 2) Next(11 - 1) Next(12 - 0) Next(11 - 2) Next(12 - 1) Next(12 - 2) Completed
example("flatMapFirst") { let sequence:Observable<Int> = Observable.of(10, 20, 30) .debug("sequence") let flatMapSequence:Observable<String> = sequence .flatMapFirst{val in Observable.of(0, 1, 2) .map{"\($0) - \(val)" } } flatMapSequence.subscribe { e in print(e) } } example("flatMapFirst with delay") { let subjectA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(1) { observer.on(.Next(20)) } delay(7) { observer.onCompleted() } return NopDisposable.instance }.debug("sA") let subjectB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sB") let subjectC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sC") let subjects = [subjectA, subjectB, subjectC] let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:4){$0} .debug("sequence") let flatMapSequence:Observable<Int> = sequence.flatMapFirst{val in return subjects[val].asObservable() }.debug("flatMapSequence") flatMapSequence.subscribe { e in print(e) } }
--- flatMapFirst example --- 2016-04-12 19:19:46.915: sequence -> subscribed 2016-04-12 19:19:46.916: sequence -> Event Next(10) Next(0 - 10) Next(1 - 10) Next(2 - 10) 2016-04-12 19:19:46.918: sequence -> Event Next(20) Next(0 - 20) Next(1 - 20) Next(2 - 20) 2016-04-12 19:19:46.919: sequence -> Event Next(30) Next(0 - 30) Next(1 - 30) Next(2 - 30) 2016-04-12 19:19:46.921: sequence -> Event Completed Completed 2016-04-12 19:19:46.921: sequence -> disposed --- flatMapFirst with delay example --- 2016-04-12 19:19:46.925: flatMapSequence -> subscribed 2016-04-12 19:19:46.926: sequence -> subscribed 2016-04-12 19:19:46.935: sequence -> Event Next(0) // SO 1 2016-04-12 19:19:46.935: sA -> subscribed // Observable sA, 2016-04-12 19:19:46.936: sA -> Event Next(10) 2016-04-12 19:19:46.936: flatMapSequence -> Event Next(10) Next(10) 2016-04-12 19:19:47.936: sA -> Event Next(20) 2016-04-12 19:19:47.936: flatMapSequence -> Event Next(20) Next(20) 2016-04-12 19:19:50.926: sequence -> Event Next(1) // SO 2 , sA , sB , 2016-04-12 19:19:53.935: sA -> Event Completed 2016-04-12 19:19:53.936: sA -> disposed // sA , 2016-04-12 19:19:55.137: sequence -> Event Next(2) // SO 3 2016-04-12 19:19:55.137: sC -> subscribed // .. Observable ( sA , sB - ) - 2016-04-12 19:19:55.137: sC -> Event Next(1000) 2016-04-12 19:19:55.137: flatMapSequence -> Event Next(1000) Next(1000) 2016-04-12 19:19:56.236: sC -> Event Next(2000) 2016-04-12 19:19:56.236: flatMapSequence -> Event Next(2000) Next(2000) 2016-04-12 19:19:57.335: sC -> Event Completed 2016-04-12 19:19:57.336: sC -> disposed 2016-04-12 19:19:58.926: sequence -> Event Completed 2016-04-12 19:19:58.926: flatMapSequence -> Event Completed Completed 2016-04-12 19:19:58.926: sequence -> disposed
example("flatMapLatest") { let sequence:Observable<Int> = Observable.of(10, 20, 30) let flatMapSequence = sequence.flatMapLatest{val in Observable.of(0, 1, 2) .map{"\($0) - \(val)" } } flatMapSequence.subscribe { e in print(e) } } example("flatMapLatest with delay") { let subjectA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(3) { observer.on(.Next(20)) } delay(5) { observer.onCompleted() } return NopDisposable.instance }.debug("sA") let subjectB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sB") let subjectC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sC") let subjects = [subjectA, subjectB, subjectC] let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:1) {$0} .debug("sequence") let flatMapSequence:Observable<Int> = sequence.flatMapLatest{val in return subjects[val].asObservable() }.debug("flatMapSequence") flatMapSequence.subscribe { e in print(e) } }
--- flatMapLatest example --- Next(0 - 10) Next(1 - 10) Next(2 - 10) Next(0 - 20) Next(1 - 20) Next(2 - 20) Next(0 - 30) Next(1 - 30) Next(2 - 30) Completed --- flatMapLatest with delay example --- 2016-04-12 19:30:50.309: flatMapSequence -> subscribed 2016-04-12 19:30:50.310: sequence -> subscribed 2016-04-12 19:30:50.318: sequence -> Event Next(0) // SO 1 , sA 2016-04-12 19:30:50.319: sA -> subscribed // sA 2016-04-12 19:30:50.319: sA -> Event Next(10) // 2016-04-12 19:30:50.319: flatMapSequence -> Event Next(10) // flatMap Next(10) // RO 2016-04-12 19:30:51.310: sequence -> Event Next(1) // SO 2 , sA 2016-04-12 19:30:51.311: sA -> disposed // sA , 2016-04-12 19:30:51.311: sB -> subscribed // Observable sB 2016-04-12 19:30:51.311: sB -> Event Next(100) 2016-04-12 19:30:51.311: flatMapSequence -> Event Next(100) Next(100) 2016-04-12 19:30:52.310: sequence -> Event Next(2) 2016-04-12 19:30:52.311: sB -> disposed 2016-04-12 19:30:52.311: sC -> subscribed 2016-04-12 19:30:52.311: sC -> Event Next(1000) 2016-04-12 19:30:52.311: flatMapSequence -> Event Next(1000) Next(1000) 2016-04-12 19:30:53.372: sequence -> Event Completed 2016-04-12 19:30:53.372: sequence -> disposed 2016-04-12 19:30:53.372: sC -> Event Next(2000) 2016-04-12 19:30:53.372: flatMapSequence -> Event Next(2000) Next(2000) 2016-04-12 19:30:54.501: sC -> Event Completed 2016-04-12 19:30:54.501: sC -> disposed 2016-04-12 19:30:54.501: flatMapSequence -> Event Completed Completed
example("flatMapWithIndex") { let sequence:Observable<Int> = Observable.of(10, 20, 30) let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in Observable.of("A", "B", "C").map{"index: (\(idx)) - \($0) - \(val)"} } print(flatMapSequence.dynamicType) flatMapSequence.subscribe { e in print(e) } } example("flatMapWithIndex with wait") { let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 } let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in createSequenceWithWait(["A","B","C"], waitTime: 2) { element in "index: (\(idx)) - \(element) - \(val)" } } print(flatMapSequence.dynamicType) flatMapSequence.subscribe { e in print(e) } }
FlatMapWithIndex<Int, Observable<String>> Next(index: (0) - A - 10) Next(index: (0) - B - 10) Next(index: (0) - C - 10) Next(index: (1) - A - 20) Next(index: (1) - B - 20) Next(index: (1) - C - 20) Next(index: (2) - A - 30) Next(index: (2) - B - 30) Next(index: (2) - C - 30) Completed --- flatMapWithIndex with wait example --- FlatMapWithIndex<Int, Observable<String>> Next(index: (0) - A - 0) Next(index: (1) - A - 1) Next(index: (0) - B - 0) Next(index: (2) - A - 2) Next(index: (1) - B - 1) Next(index: (0) - C - 0) Next(index: (2) - B - 2) Next(index: (1) - C - 1) Next(index: (2) - C - 2) Completed
Observable<T> -> Observable<U>
example("map") { let sequence = Observable.of(1, 2, 3) .map{ "\($0 * 5)" } sequence.subscribe { e in print(e) } }
--- map example --- Next(5) Next(10) Next(15) Completed
Observable<T> -> Observable<U>
example("mapWithIndex") { let sequence = Observable.of("A", "B", "C") .mapWithIndex({ "\($0) / \($1)" }) sequence.subscribe { e in print(e) } }
--- mapWithIndex example --- Next(A / 0) Next(B / 1) Next(C / 2) Completed
SO = Observable<T> RO = Observable<Observable<T>>
example("window") { let varA = Variable<Int>(0) let bufferSequence:Observable<Observable<Int>> = varA.asObservable() .window(timeSpan: 3, count: 3, scheduler: MainScheduler.instance) .debug("bufferSequence") bufferSequence.subscribe { e in if case .Next(let observable) = e { print("\(NSDate()) - Observable") observable.subscribe { val in print(val) } } } varA.value = 1 varA.value = 2 varA.value = 3 delay(4) { varA.value = 4 varA.value = 5 delay(4) { varA.value = 6 } } }
--- window example --- 2016-04-12 19:51:54.372: bufferSequence -> subscribed 2016-04-12 19:51:54.373: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:54 +0000 - Observable Next(0) Next(1) Next(2) Completed 2016-04-12 19:51:54.377: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:54 +0000 - Observable Next(3) Completed 2016-04-12 19:51:57.378: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:57 +0000 - Observable Next(4) Next(5) Completed 2016-04-12 19:52:00.380: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:52:00 +0000 - Observable Next(6) Completed 2016-04-12 19:52:02.895: bufferSequence -> Event Completed
example("reduce") { let sequence = Observable.of(1, 2, 3, 4) .reduce(1) { $0 * $1 } sequence.subscribe { e in print(e) } }
--- reduce example --- Next(24) Completed
example("scan") { let sequence = Observable.of(1, 2, 3).scan(10) { result, element in return result + element } sequence.subscribe { e in print(e) } } example("scan multiply") { let sequence = Observable.of(2, 3, 5).scan(10) { result, element in return result * element } sequence.subscribe { e in print(e) } }
--- scan example --- Next(11) Next(13) Next(16) Completed --- scan multiply example --- Next(20) Next(60) Next(300) Completed
SO = Observable<T> RO = Observable<[T]>
example("toArray") { let sequence = Observable.of(1, 2, 3) let arraySequence = sequence.toArray() arraySequence.subscribe { e in print(e) } }
--- toArray example --- Next([1, 2, 3]) Completed
example("with catchError") { let sequenceWithError = Observable<Int>.create { observer in observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Next(4)) observer.onError(RxError.Unknown) observer.on(.Next(5)) return NopDisposable.instance } let sequenceIgnoreError = sequenceWithError.catchError{ error in return Observable.of(10, 11, 12) } sequenceIgnoreError.subscribe { e in print(e) } }
--- with catchError example --- Next(1) Next(2) Next(3) Next(4) Next(10) Next(11) Next(12) Completed
example("with catchErrorJustReturn") { let sequenceWithError = Observable.of(1, 2, 3, 4) .concat(Observable.error(RxError.Unknown)) .concat(Observable.just(5)) let sequenceIgnoreError = sequenceWithError.catchErrorJustReturn(-1) sequenceIgnoreError.subscribe { e in print(e) } }
--- with catchErrorJustReturn example --- Next(1) Next(2) Next(3) Next(4) Next(-1) Completed
example("retry full sequence") { let sequenceWithError = Observable.of(1, 2, 3, 4).concat(Observable.error(RxError.Unknown)) let wholeSequenceWithErrorRetry = sequenceWithError.retry(2) wholeSequenceWithErrorRetry.subscribe { e in print(e) } }
--- retry full sequence example --- Next(1) Next(2) Next(3) Next(4) Next(1) Next(2) Next(3) Next(4) Error(Unknown error occured.)
example("retryWhen") { var counter = 0 let sequenceWithError = Observable<Int>.create { observer in observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Next(4)) counter += 1 if counter < 3 { observer.onError(RxError.Unknown) } /*else { observer.onError(RxError.Overflow) }*/ observer.on(.Next(5)) return NopDisposable.instance }.debug("with error") let sequenceWithoutError = Observable<Int>.create { observer in observer.on(.Next(10)) //observer.onError(RxError.NoElements) return NopDisposable.instance }.debug("without error") let retrySequence = sequenceWithError.retryWhen{ (error: Observable<RxError>) -> Observable<Int> in let seq:Observable<Int> = error.flatMap { (generatedError: RxError) -> Observable<Int> in if case .Unknown = generatedError { return sequenceWithoutError } return Observable<Int>.error(generatedError) } return seq }//.debug() retrySequence.subscribe { e in print(e) } }
--- retryWhen example --- 2016-04-12 20:18:04.484: with error -> subscribed 2016-04-12 20:18:04.485: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.486: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.486: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.487: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.487: with error -> Event Error(Unknown error occured.) 2016-04-12 20:18:04.488: without error -> subscribed 2016-04-12 20:18:04.488: without error -> Event Next(10) 2016-04-12 20:18:04.489: with error -> disposed 2016-04-12 20:18:04.489: with error -> subscribed 2016-04-12 20:18:04.489: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.490: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.490: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.490: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.491: with error -> Event Error(Unknown error occured.) 2016-04-12 20:18:04.491: without error -> subscribed 2016-04-12 20:18:04.492: without error -> Event Next(10) 2016-04-12 20:18:04.492: with error -> disposed 2016-04-12 20:18:04.492: with error -> subscribed 2016-04-12 20:18:04.493: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.493: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.493: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.494: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.494: with error -> Event Next(5) Next(5)
example("multicast") { let subject = PublishSubject<Int>() let firstSequence = createSequenceWithWait([0,1,2,3,4,5]) { $0 } .multicast(subject) delay(2) { _ = subject.subscribe { e in print("first: \(e)") } } delay(3) { _ = subject.subscribe { e in print("second: \(e)") } } firstSequence.connect() }
--- multicast example --- first: Next(2) first: Next(3) second: Next(3) first: Next(4) second: Next(4) first: Next(5) second: Next(5) first: Completed second: Completed
example("subscribe connectable sequnce with connect") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence").publish() var disposable1: Disposable! var disposable2: Disposable! disposable1 = sequence.subscribe { e in print("first: \(e)") } delay(2) { disposable2 = sequence.subscribe { e in print("second: \(e)") } } delay(4) { sequence.connect() } delay(8) { disposable1.dispose() disposable2.dispose() } }
--- subscribe connectable sequnce with connect example --- 2016-04-12 21:35:32.130: sequence -> subscribed 2016-04-12 21:35:33.131: sequence -> Event Next(0) first: Next(0) second: Next(0) 2016-04-12 21:35:34.131: sequence -> Event Next(1) first: Next(1) second: Next(1) 2016-04-12 21:35:35.132: sequence -> Event Next(2) first: Next(2) second: Next(2) 2016-04-12 21:35:36.132: sequence -> Event Next(3) 2016-04-12 21:35:37.132: sequence -> Event Next(4)
example("with refCount") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence") let publishSequence = sequence.publish() // Connectable Observable let refCountSequence = publishSequence.refCount().debug("refCountSequence") let subscription1 = refCountSequence.subscribe{ e in print("first: \(e)") } let subscription2 = refCountSequence.subscribe{ e in print("second: \(e)") } delay(2) { subscription1.dispose() // } delay(3) { subscription2.dispose() // , Observable refCount . Observable SO } delay(5) { _ = refCountSequence.subscribe { e in print("after: \(e)") } } }
--- with refCount example --- 2016-04-12 20:25:24.154: refCountSequence -> subscribed // refCountSequence 1 2016-04-12 20:25:24.155: sequence -> subscribed // publishSequence SO 2016-04-12 20:25:24.156: refCountSequence -> subscribed // // refCountSequence 2 2016-04-12 20:25:25.156: sequence -> Event Next(0) 2016-04-12 20:25:25.156: refCountSequence -> Event Next(0) first: Next(0) 2016-04-12 20:25:25.156: refCountSequence -> Event Next(0) second: Next(0) 2016-04-12 20:25:26.156: sequence -> Event Next(1) 2016-04-12 20:25:26.156: refCountSequence -> Event Next(1) first: Next(1) 2016-04-12 20:25:26.157: refCountSequence -> Event Next(1) second: Next(1) 2016-04-12 20:25:26.353: refCountSequence -> disposed // refCountSequence 1 2016-04-12 20:25:27.156: sequence -> Event Next(2) // SO , .. refCountSequence 2016-04-12 20:25:27.157: refCountSequence -> Event Next(2) second: Next(2) 2016-04-12 20:25:27.390: refCountSequence -> disposed // refCountSequence 2 2016-04-12 20:25:27.390: sequence -> disposed // refCountSequence , publishSequence SO 2016-04-12 20:25:29.157: refCountSequence -> subscribed // refCountSequence 2016-04-12 20:25:29.157: sequence -> subscribed // .. - publishSequence SO 2016-04-12 20:25:30.158: sequence -> Event Next(0) 2016-04-12 20:25:30.159: refCountSequence -> Event Next(0) after: Next(0) 2016-04-12 20:25:31.158: sequence -> Event Next(1) 2016-04-12 20:25:31.159: refCountSequence -> Event Next(1) after: Next(1) 2016-04-12 20:25:32.159: sequence -> Event Next(2) 2016-04-12 20:25:32.159: refCountSequence -> Event Next(2) after: Next(2) ....
example("replay") { let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(2) let firstDisposable = firstSequence.subscribe { e in print("first: \(e)") } firstSequence.connect() var secondDisposable: Disposable! delay(3) { secondDisposable = firstSequence.subscribe { e in print("second: \(e)") } } delay(4) { firstDisposable.dispose() } delay(5) { secondDisposable.dispose() } delay(7) { firstSequence.subscribe { e in print("third: \(e)") } } }
--- replay example --- first: Next(0) first: Next(1) first: Next(2) second: Next(1) second: Next(2) first: Next(3) // 1 second: Next(3) second: Next(4) // 2 , SO // third: Next(5) third: Next(6)
example("replayAll") { let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replayAll() let firstDisposable = firstSequence.subscribe { e in print("first: \(e)") } firstSequence.connect() var secondDisposable: Disposable! delay(3) { secondDisposable = firstSequence.subscribe { e in print("second: \(e)") } } delay(4) { firstDisposable.dispose() } delay(5) { secondDisposable.dispose() } delay(7) { firstSequence.subscribe { e in print("third: \(e)") } } }
--- replayAll example --- first: Next(0) first: Next(1) first: Next(2) second: Next(0) second: Next(1) second: Next(2) first: Next(3) // 1 second: Next(3) second: Next(4) // 2 , SO // third: Next(0) third: Next(1) third: Next(2) third: Next(3) third: Next(4) third: Next(5) third: Next(6) third: Next(7)
example("debug") { let sequence = Observable<AnyObject>.of(1, 2, 3) .debug("sequence") .subscribe{} }
--- debug example --- 2016-04-12 21:41:08.467: sequence -> subscribed 2016-04-12 21:41:08.469: sequence -> Event Next(1) 2016-04-12 21:41:08.469: sequence -> Event Next(2) 2016-04-12 21:41:08.469: sequence -> Event Next(3) 2016-04-12 21:41:08.469: sequence -> Event Completed
example("simple doOn") { let firstSequence = Observable.of(1,2).doOn{e in print(e) } firstSequence.subscribeNext{ e in // subscribe, , doOn } }
--- simple doOn example --- Next(1) Next(2) Completed
example("delaySubscription") { let sequence = Observable.of(1, 2, 3).debug("sequence") .delaySubscription(3, scheduler: MainScheduler.instance).debug("delayed sequence") sequence.subscribe { e in print(e) } }
--- delaySubscription example --- 2016-04-12 21:44:05.226: delayed sequence -> subscribed // delayed sequence 5 2016-04-12 21:44:08.228: sequence -> subscribed // SO 3 2016-04-12 21:44:08.229: sequence -> Event Next(1) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(1) Next(1) 2016-04-12 21:44:08.229: sequence -> Event Next(2) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(2) Next(2) 2016-04-12 21:44:08.229: sequence -> Event Next(3) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(3) Next(3) 2016-04-12 21:44:08.230: sequence -> Event Completed 2016-04-12 21:44:08.230: delayed sequence -> Event Completed Completed 2016-04-12 21:44:08.230: sequence -> disposed
example("without observeOn") { let sequence = Observable<AnyObject>.of(1, 2, 3) sequence.subscribe { e in print("\(NSThread.currentThread())\(e)") } } example("with observeOn") { let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) let sequence = Observable<AnyObject>.of(1, 2, 3) sequence.observeOn(ConcurrentDispatchQueueScheduler.init(queue: queue)) .subscribe { e in print("\(NSThread.currentThread())\(e)") } }
--- without observeOn example --- <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(1) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(2) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(3) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Completed --- with observeOn example --- <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(1) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(2) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(3) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Completed
example("subscribe") { let firstSequence = Observable.of(1) firstSequence.subscribe { e in print(e) } firstSequence.subscribeCompleted { print("!completed") } firstSequence.subscribeNext{next in print("next: \(next)") } } example("subscribeNext") { let firstSequence = Observable.of(1) firstSequence.subscribeNext{next in print("next: \(next)") } } example("subscribeCompleted") { let firstSequence = Observable.of(1) firstSequence.subscribeCompleted { print("!completed") } } example("subscribeError") { let firstSequence = Observable<Int>.error(RxError.ArgumentOutOfRange) firstSequence.subscribeError {e in print("!error \(e)") } }
--- subscribe example --- Next(1) Completed !completed next: 1 --- subscribeNext example --- next: 1 --- subscribeCompleted example --- !completed --- subscribeError example --- !error Argument out of range.
example("with subscribeOn and observeOn") { let queue1 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) let queue2 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) print("init thread: \(NSThread.currentThread())") let sequence = Observable<Int>.create { observer in print("observable thread: \(NSThread.currentThread())") observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Completed) return NopDisposable.instance } .subscribeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue1")) .observeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue2")) sequence.subscribe { e in print("observer thread: \(NSThread.currentThread()) \(e)") } }
--- with subscribeOn and observeOn example --- init thread: <NSThread: 0x7ff6914132b0>{number = 1, name = main} // #1 observable thread: <NSThread: 0x7ff6916a0cb0>{number = 4, name = (null)} // subscribeOn Observable #4 #1 observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(1) // observer' 1 4, observeOn. observeOn, subscribeOn #4 observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(2) observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Next(3) observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Completed
example("failed timeout ") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let timeoutSequence = sequence.timeout(0.9, scheduler: MainScheduler.instance) timeoutSequence.subscribe { e in print(e) } }
--- failed timeout example --- Next(1) Error(Sequence timeout.)
class FakeDisposable: Disposable { func dispose() { print("disposed") } } example("using") { let sequence = Observable.using({ return FakeDisposable() }, observableFactory: { d in Observable.just(1) }) as Observable<Int> sequence.subscribe { e in print(e) } }
--- using example --- Next(1) Completed disposed
Source: https://habr.com/ru/post/281292/
All Articles