본문 바로가기

책/코틀린 리액티브 프로그래밍

6. 연산자 및 오류처리

  1. 프로듀서(옵저버블/플로어블) 결합

 데이터를 사용하기에 앞서 여러 원천의 데이터를 결합하는 일은 일반적이다. 예를 들면 오프라인 우선방식에 따라 오프라인 애플리케이션을 구축하고 HTTP 호출에서 가져온 결과를 로컬 데이터베이스의 데이터와 결합하려는 상황이다.

 

 기본적으로 프로듀서(옵저버블/플로어블)를 결합하는 몇 가지 메커니즘이 있는데 다음과 같다.

  • 프로듀서 병합(Merging)
  • 프로듀서 이어붙이기(Concattenation)
  • 프로듀서 사이의 임의 결합
  • 집핑
  • 가장 최근 항목 결합

startWith 연산자

startWith는 다른 원천이 되는 옵저버블이나 Interator 인스턴스를 전달해 연산자가 구독하기 시작한 원천 옵저버블이 배출하기 전에 추가 할 수 있다.

  1. 주석 1 에서 startWith 연산자를 사용해 Iterator 인스턴스를 전달했다. startWith 연산자는 전달된 Iterator 인스턴스를 Observable 인스턴스로 내부 변환한다. (플로어블사용 시 플로어블인스턴스로 내부변환)
fun main(args: Array<String>) {
    println("startWith Iterator")
    Observable.range(5,10)
            .startWith(listOf(1,2,3,4))//(1)
            .subscribe {
                println("Received $it")
            }
    println("startWith another source Producer")
    Observable.range(5,10)
            .startWith(Observable.just(1,2,3,4))//(2)
            .subscribe {
                println("Received $it")
            }
}
/* 결과
Received 1
Received 2
.....
Received 13
Received 14
startWith another source Producuer
Received 1
Received 2
.....
Received 13
Received 14
*/

 startWith 의 구현을 보면 concatArray를 내부적으로 사용한다는 것을 알 수 있다.

fun startWith(items: Iterable<T>): Observable<T> {
    return concatArray<T>(fromIterable<out T>(items), this)
}

배출을 집핑하기: zip 연산자

  • zip 연산자는 여러 프로듀서의 배출을 하나의 배출로 누적시킨다. 또한 scan 또는 reduce 연산자의 배출에 적용되는 함수를 취하지만 다른 프로듀서의 배출에도 적용한다.
  • zip 연산자는 옵저버블 클래스의 companion object(자바의 정적메서드에 해당)에 정의되 있으므로 Observable.zip을 입력해 직접 사용할 수 있다.
  • zip 연산자의 유의사항
    • zip 연산자는 제공된 프로듀서가 배출될 때마다 작동한다. 
      1. x, y. z 프로듀서 전달
      2. zip 연산자는 x의 n번째 배출을 y와 z의 n번째 배출로 누적
    • zip 연산자는 함수를 적용하기 전에 각 프로듀서가 배출할 때까지 대기한다.
      1. zip 연산자의 프로듀서 Observable.interval사용 
      2. zip 연산자는 각 배출을 기다렸다 지정된 간격으로 누적값 배출
    • 어떤 프로듀서가 기다리는 아이탬을 배출하지 않고 onComplete 또는 onError를 알리면 다른 프로듀서의 배출을 포함해 이후 모든 배출을 폐기한다.
      1. 프로듀서 x가 10개 아이템,  y가 11개 아이템, z가 8개의 아이템을 배출
      2. zip 연산자는 모든 프로듀서의 첫 8개의 배출을 누적하고 그 이후의 프로듀서 x와 y의 나머지 모든 배출을 폐기한다.
fun main(args: Array<String>) {
    val observable1 = Observable.range(1,10)
    val observable2 = Observable.range(11,10)
    /*
      여기에서는 Observable 두개만 사용했지만 
      zip연산자는 최대 9개의 Observables/Flowables를 지원한다.
    */
    Observable.zip(
        observable1
        , observable2
        , io.reactivex.functions.BiFunction<Int, Int, Int> { 
            emissionO1, emissionO2 -> emissionO1+emissionO2
        }
    ).subscribe {
        println("Received $it")
    }
}
/* 결과 
Received 12
Received 14
Received 16
Received 18
Received 20
Received 22
Received 24
Received 26
Received 28
Received 30
*/

zipWith 연산자

zip 연산자의 인스턴스 버젼이다. 세 가지 이상의 옵저버블 인스턴스를 작업하고 싶으면 zipWith 대신 zip 연산자를 사용하는 것이 좋다.

  • 주석 1에서 인스턴스인 observable1 의 zipWith 연산자의 인자로 다른 Observable 인스턴스인 observable2와 람다를 사용해 배출에 적용했다.
  • 결과에서는 제공한 프로듀서와 구독한 프로두서의 배출을 누적한 것을 볼 수 있다.
fun main(args: Array<String>) {
    val observable1 = Observable.range(1,10)
    val observable2 = listOf(
        "String 1","String 2","String 3","String 4","String 5"
        ,"String 6","String 7","String 8","String 9","String 10"
    ).toObservable()

    observable1.zipWith(
        observable2
        , {e1:Int,e2:String -> "$e2 $e1"}
    )//(1)
    .subscribe {
        println("Received $it")
    }
}
/* 결과
Received String 1 1
Received String 2 2
Received String 3 3
Received String 4 4
Received String 5 5
Received String 6 6
Received String 7 7
Received String 8 8
Received String 9 9
Received String 10 10
*/

combineLatest 연산자

combileLatest 연산자는 zip 연산자와 비슷하지만 다음과 같은 차이점이 있다.

  • zip : 원천 프로듀서 각각이 배출하기를 기다림
  • combineLatest : 원천 프로듀서에서 배출을 받자마자 처리를 시작

zip 연산자의 처리 예

  1. 주석 1 에서는 100밀리초 간격으로 옵저버블을 생성
  2. 주석 2 에서는 250밀리초 간격으로 옵저버블을 생성
  3. zip 연산 후 총 간격은 350밀리초가 되고 지연 시간은 1100밀리초이기 때문에 결과에서 3개의 배출을 확인할 수 있다.
fun main(args: Array<String>) {
    val observable1 = Observable.interval(100,TimeUnit.MILLISECONDS)//(1)
    val observable2 = Observable.interval(250,TimeUnit.MILLISECONDS)//(2)

    Observable.zip(observable1,observable2,
            BiFunction { t1:Long, t2:Long -> "t1: $t1, t2: $t2" })//(3)
            .subscribe{
                println("Received $it")
            }

    runBlocking { delay(1100) }
}
/* 결과
Received t1: 0, t2: 0
Received t1: 1, t2: 1
Received t1: 2, t2: 2
Received t1: 3, t2: 3
*/

combineLatest를 사용한 동일한 예제

  1. 출력 결과에서 알 수 있듯이 combineLatest 연산자는 다른 모든 원천 프로듀서에 대해 마지막으로 생성된 값을 사용해 배출된 값을 즉시 처리하고 출력한다.
fun main(args: Array<String>) {
    val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    val observable2 = Observable.interval(250, TimeUnit.MILLISECONDS)

    Observable.combineLatest(observable1,observable2,
            BiFunction { t1:Long, t2:Long -> "t1: $t1, t2: $t2" })
            .subscribe{
                println("Received $it")
            }

    runBlocking { delay(1100) }
}
/* 결과
Received t1: 0, t2: 0
Received t1: 0, t2: 0
Received t1: 0, t2: 0
Received t1: 1, t2: 1
Received t1: 1, t2: 1
Received t1: 1, t2: 1
Received t1: 1, t2: 1
Received t1: 2, t2: 2
Received t1: 2, t2: 2
Received t1: 2, t2: 2
Received t1: 2, t2: 2
Received t1: 3, t2: 3
Received t1: 3, t2: 3
Received t1: 3, t2: 3
Received t1: 3, t2: 3
*/

옵저버블/플로어블 병합: merge 연산자

  1. 주석 1 에서 옵저버블 두개를 병합한 뒤 하나로 구독한다.
fun main(args: Array<String>) {
    val observable1 = listOf("Kotlin", "Scala", "Groovy").toObservable()
    val observable2 = listOf("Python", "Java", "C++", "C").toObservable()

    Observable
            .merge(observable1,observable2)//(1)
            .subscribe {
                println("Received $it")
            }
}
/* 결과
Received Kotlin
Received Scala
Received Groovy
Received Python
Received Java
Received C++
Received C
*/

병합 작업은 지정된 순서를 유지하지 않는다. 아래 예제를 보면

  1. 주석 1, 2 에서 Observable.interval 연산자로 Observable<Long> 인스턴스 두개를 만든다
  2. Observable 문자열에 번호를 더해서 매핑하고 Observable<String> 인스턴스를 가져왔다.
  3. map 연산자의 목적은 옵저버블의 ID를 출력에 삽입해 병합된 출력에서 원천을 쉽게 식별할수 있게 하는 것이다.
  4. 출력에서는 observable1이 먼저 병합연산자에 입력됬음에도 observable2가 먼저 배출되는 것으로 나타난다.
fun main(args: Array<String>) {
    val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS).map { "Observable 1 $it" }  // 1
    val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS).map { "Observable 2 $it" }  // 2

    Observable
            .merge(observable1,observable2)
            .subscribe {
                println("Received $it")
            }
    runBlocking { delay(1500) }
}
/* 결과
Received Observable 2 0
Received Observable 2 1
Received Observable 2 2
Received Observable 2 3
Received Observable 1 0
Received Observable 2 4
Received Observable 2 5
Received Observable 2 6
Received Observable 2 7
Received Observable 2 8
Received Observable 2 9
Received Observable 1 1
Received Observable 2 10
Received Observable 2 11
Received Observable 2 12
Received Observable 2 13
Received Observable 2 14
Received Observable 1 2
*/

merge 연산자는 최대 4개의 매개변수만 지원한다. 그 이상을 사용하려면 mergeArray를 사용한다.

 

fun main(args: Array<String>) {
    val observable1 = listOf("A", "B", "C").toObservable()
    val observable2 = listOf("D", "E", "F", "G").toObservable()
    val observable3 = listOf("I", "J", "K", "L").toObservable()
    val observable4 = listOf("M", "N", "O", "P").toObservable()
    val observable5 = listOf("Q", "R", "S", "T").toObservable()
    val observable6 = listOf("U", "V", "W", "X").toObservable()
    val observable7 = listOf("Y", "Z").toObservable()

    Observable.mergeArray(
        observable1, observable2, observable3
        ,observable4, observable5, observable6
        , observable7
    )
    .subscribe {
                println("Received $it")
    }
}
/* 결과
Received A
Received B
Received C
Received D
Received E
Received F
Received G
Received I
Received J
Received K
Received L
Received M
Received N
Received O
Received P
Received Q
Received R
Received S
Received T
Received U
Received V
Received W
Received X
Received Y
Received Z
*/

mergeWith

merge 의 인스턴스 버젼이다. 

fun main(args: Array<String>) {
    val observable1 = listOf("Kotlin", "Scala", "Groovy").toObservable()
    val observable2 = listOf("Python", "Java", "C++", "C").toObservable()

    observable1
            .mergeWith(observable2)
            .subscribe {
                println("Received $it")
            }
}
/* 결과
Received Kotlin
Received Scala
Received Groovy
Received Python
Received Java
Received C++
Received C
*/

프로듀서 이어 붙이기(옵저버블/플로어블) : concat

병합의 사전적 의미는 순서와 관계없이 두가지를 결합해 새로운 것을 만든 것이다. 순서를 유지하려면 하나씩 이어 붙이기를 한다. 

concat 연산자는 현재 원천 옵저버블에서 onComplete를 수신한 후에만 큐에 존재하는 다음 원천 옵저버블을 구독한다. 

  1. 주석 1 에서는 Observable.interval 로 생성된 옵저버블이 Long.MAX_VALUE 가 될때 까지 onComplete 를 배출하지 않기 때문에 take(2) 연산자를 추가로 사용했다.
  2. 결과를 보면 observable1 에서 onComplete 알림을 받은 후 observable2 가 구독된다.

merge 연산자와 마찬가지로 concatArray, concatWith 연산자가 존재하고 이어 붙인다는 점 빼곤 동일한 방식으로 동작한다.

fun main(args: Array<String>) {
    val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
            .take(2)//(1)
            .map { "Observable 1 $it" }//(2)
    val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS).map { "Observable 2 $it" }//(3)

    Observable
            .concat(observable1,observable2)
            .subscribe {
                println("Received $it")
            }

    runBlocking { delay(1500) }
}
/* 결과 
Received Observable 1 0
Received Observable 1 1
Received Observable 2 0
Received Observable 2 1
Received Observable 2 2
Received Observable 2 3
Received Observable 2 4
*/

프로듀서 임의 결합

amb 연산자는 Observable(Iterable<Observable> 인스턴스)의 목록을 매개 변수로 사용한다. Iterable 인스턴스에 있는 모든 옵저버블을 구독하고 첫 번째로 배출한 옵저버블로부터 수신한 아이템을 배출한 뒤 나머지 옵저버블의 결과는 전부 폐기한다.

  1. 주석 1, 2 에서 Observable.interval 옵저버블을 500밀리초, 100밀리초 간격으로 생성했다.
  2. 주석 3 에서 listOf 함수를 사용해 두 옵저버블로부터 List<Observable>을 생성하고 amb 연산자로 전달했다.
  3. 결과에서 amb 연산자가 observable2의 배출을 먼저 입력받았기(100밀리초로 먼저 받음)  때문에 observable1을 무시하고 있는 것을 알 수 있다.

amb 에는 마찬가지로 ambArray, ambWith 같은 연산자도 존재한다.

fun main(args: Array<String>) {
    val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS).map { "Observable 1 $it" }//(1)
    val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS).map { "Observable 2 $it" }//(2)

    Observable
            .amb(listOf(observable1,observable2))//(3)
            .subscribe {
                println("Received $it")
            }

    runBlocking { delay(1500) }
}
/* 결과
Observable 2 0
Observable 2 1
Observable 2 2
...
Observable 2 14
Observable 2 15
*/

그룹핑

groupBy 연산자를 사용하면 특정 속성을 기준으로 배출을 분류할 수 있다.

  1. 주석 1에서는 groupBy 연산자를 사용해 그룹화를 이용해 실행되는 Predicate를 전달했다. groupBy 연산자는 Predicate의 결과를 통해 배출을 그룹화 한다.
  2. 주석2 에서는 blockingSubscribe 연산자를 사용해 새로 생성된 Observable<GroupedObservable<K, T>>인스턴스에 구독했다. 간단히 subscribe 연산자를 사용할 수도 있지만 콘솔에 출력을 인쇠할 것이기 때문에 subscribe를 사용하면 엉망으로 보일 것이다. subscribe 연산자가 다음 배출 전에 현재 배출에서 주어진 연산을 기다리지 않기 때문이다. 반면에 blockingSubscribe는 새로운 처리를 진행하기 전에 배출에서 처리가 완료될 때까지 프로그램을 대기 상태로 만든다.
  3. 주석 3에서는 배출된 Grouped 인스턴스의 키를 인쇄한 후에도 동일한 작업을 수행했다.
    • groupBy 연산자는 그룹을 포함하는 GroupedObservable을 배출하는 옵저버블을 반환한다. 그래서 blockingSubscribe 안에서는 배출된 GroupedObservable 인스턴스를 구독해야한다.
fun main(args: Array<String>) {
    val observable = Observable.range(1,30)

    observable.groupBy {            // 1
        it%5
    }.blockingSubscribe {           // 2
        println("Key ${it.key} ")
        it.subscribe {              // 3
            println("Received $it")
        }
    }
}
/* 결과
Key 1
Recieved 1
Recieved 6
...
Recieved 21
Recieved 26
Key 2
Recieved 2
Recieved 7
...
Recieved 22
Recieved 27
Key 3
Recieved 3
Recieved 8
...
Recieved 23
Recieved 28
Key 4
Recieved 4
Recieved 9
...
Recieved 24
Recieved 29
Key 0
Recieved 5
Recieved 10
...
Recieved 25
Recieved 30
*/

flatMap, concatMap 세부사항

 

아래 예제 에서는 옵저버블을 생성하고 delay 연산과 함께 flatMap 연산자를 사용해 임의의 지연을 배출에 추가했다.

  • 결과에서 보듯이 merge 연산자가 한번에 모두 비동기적으로 배출을 구독하고 다시 배출하기 때문데 순서가 유지되지 않는다.
fun main(args: Array<String>) {
    Observable.range(1,10)
            .flatMap {
                val randDelay = Random().nextInt(10)
                return@flatMap Observable.just(it)
                        .delay(randDelay.toLong(),TimeUnit.MILLISECONDS)//(1)
            }
            .blockingSubscribe {
                println("Received $it")
            }
}
/* 결과
Received 2
Received 5
Received 6
Received 1
Received 4
Received 10
Received 9
Received 8
Received 3
Received 7
*/

concatMap 연산자는 내부적으로 concat을 사용하기 때문에 규정된 배출 순서를 유지한다.

fun main(args: Array<String>) {
    Observable.range(1,10)
            .concatMap {
                val randDelay = Random().nextInt(10)
                return@concatMap Observable.just(it)
                        .delay(randDelay.toLong(),TimeUnit.MILLISECONDS)//(1)
            }
            .blockingSubscribe {
                println("Received $it")
            }
}
/* 결과
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
*/

flatMap 연산자가 적합한 경우

  • 페이지나 액티비티 또는 프레그먼트에서 데이터의 목록을 다룰 때 해당 목록의 아이템 별로 데이터베이스나 서버에 전송하고 싶을 경우 적합하다. 비동기적으로 동작하기 때문에 순서가 중요하지 않은 경우 빠르게 동작한다.
  • 목록의 아이템에 대한 작업을 비동기적으로 비교적 짧은 기간에 수행하려는 경우 적합하다.

concatMap 연산자가 적합한 경우

  • 사용자에게 다운로드하고 있는 목록을 표시할 때 적합하다. 이 때는 순서가 매우 중요하다.
  • 정렬된 목록의 순서를 그대로 유지하고 작업하고 싶은 경우 적합하다.

switchMap 연산자

switchMap 연산자는 원천 프로듀서(Observable/Flowable)의 모든 배출을 비동기로 대기하지만 정해진 시간 이내에 최신 아이템만 배출한다. 

  1. 주석 1에서는 delay연산자를 사용하지 않고 switchMap 연산자로 배출했다. 
    • 결과는 추가 배출을 받기 전에 모든 아이템을 다시 내보냈다.
  2. 주석 2에서는 delay연산자를 사용하고 switchMap 연산자로 배출했다.
    • 결과는 switchMap이 배출 이전에 연속적으로 값을 전달 받았지만 마지막 아이템만을 배출했음을 알 수 있다.

 

fun main(args: Array<String>) {
    println("Without delay")
    Observable.range(1,10)
            .switchMap {
                val randDelay = Random().nextInt(10)
                return@switchMap Observable.just(it)//(1)
            }
            .blockingSubscribe {
                println("Received $it")
            }
    println("With delay")
    Observable.range(1,10)
            .switchMap {
                val randDelay = Random().nextInt(10)
                return@switchMap Observable.just(it)
                        .delay(randDelay.toLong(), TimeUnit.MILLISECONDS)//(2)
            }
            .blockingSubscribe {
                println("Received $it")
            }
}
/* 결과
Without delay
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
With delay
Received 10
*/

아래 예제는 3으로 나눌 수 있는 모든 수는 그대로 배출하고 나머지는 지연을 추가했다. 

  • switchMap 연산자는 지연 없이 배출된 숫자들만 다시 배출시킨다.
  • 이유는 switchMap 연산자가 다음 아이템을 받기 이전에 그것들을 배출하는 것이 가능했기 때문이다.
fun main(args: Array<String>) {
    Observable.range(1,10)
            .switchMap {
                val randDelay = Random().nextInt(10)
                if(it%3 == 0)
                    Observable.just(it)
                else
                    Observable.just(it)
                            .delay(randDelay.toLong(), TimeUnit.MILLISECONDS)
            }
            .blockingSubscribe {
                println("Received $it")
            }
}
/* 결과
Received 3
Received 6
Received 9
Received 10
*/

배출 건너뛰거나 취하기

배출 건너뛰기

특정 조건이 충족된 경우 또는 무조건 선두의 배출 일부를 건너뛰고자 하는 경우에 사용하는 연산자 모음이다. 주요 연산자 들이다.

  • skip
  • skipLast
  • skipWhile
  • skipUntil

skip

  • skip(count: Long) : count를 입력받아 n개의 배출량을 무시한다.
  • skip(time: Long, unit: TimeUnit) : 지정된 시간동안 발생한 모든 배출을 무시한다.
  1. 주석 1에서는 skip(count) 연산자를 사용해 5개의 배출을 건너뛴다.
  2. 주석 2에서는 skip(time, unit) 연산자를 사용해 구독의 처음 400밀리초 동안 발생한 모든 배출을 건너뛴다.
fun main(args: Array<String>) {
    val observable1 = Observable.range(1,20)
    observable1
            .skip(5)//(1)
            .subscribe(object:Observer<Int> {
                override fun onError(e: Throwable) {
                    println("Error $e")
                }

                override fun onComplete() {
                    println("Complete")
                }

                override fun onNext(t: Int) {
                    println("Received $t")
                }

                override fun onSubscribe(d: Disposable) {
                    println("starting skip(count)")
                }

            })

    val observable2 = Observable.interval(100,TimeUnit.MILLISECONDS)
    observable2
            .skip(400,TimeUnit.MILLISECONDS)//(2)
            .subscribe(
                    object:Observer<Long> {
                        override fun onError(e: Throwable) {
                            println("Error $e")
                        }

                        override fun onComplete() {
                            println("Complete")
                        }

                        override fun onNext(t: Long) {
                            println("Received $t")
                        }

                        override fun onSubscribe(d: Disposable) {
                            println("starting skip(time)")
                        }

                    }
            )

    runBlocking {
        delay(1000)
    }

}
/* 결과
starting skip(count)
Received 6
Received 7
...
Received 19
Received 20
Complete
starting skip(time)
Received 4
Received 5
...
Received 8
Received 9
*/

skipLast

skipLast 연산자는 skip 과 같지만 뒤에서부터 배출을 제거한다.

  1. 주석 1에서는 skipList(count) 연산자를 사용해 마지막 5개 배출을 제거했다.
fun main(args: Array<String>) {
    val observable = Observable.range(1,20)
    observable
            .skipLast(5)//(1)
            .subscribe(object: Observer<Int> {
                override fun onError(e: Throwable) {
                    println("Error $e")
                }

                override fun onComplete() {
                    println("Complete")
                }

                override fun onNext(t: Int) {
                    println("Received $t")
                }

                override fun onSubscribe(d: Disposable) {
                    println("starting skipLast(count)")
                }

            })
}
/* 결과
starting skipLast(count)
Received 1
Received 2
...
Received 14
Received 15
Complete
*/

skipWhile

skipWhile 은 카운트, 시간이 아닌 Predicate(논리 표현식)를 기반으로 배출을 건너뛴다. 

  • 주석 1에서 값이 10보다 작은 값들은 참이 되어 배출을 건너 뛰게된다.
fun main(args: Array<String>) {
    val observable = Observable.range(1,20)
    observable
            .skipWhile {item->item<10}//(1)
            .subscribe(object: Observer<Int> {
                override fun onError(e: Throwable) {
                    println("Error $e")
                }

                override fun onComplete() {
                    println("Complete")
                }

                override fun onNext(t: Int) {
                    println("Received $t")
                }

                override fun onSubscribe(d: Disposable) {
                    println("starting skipWhile")
                }

            })
}
/* 결과
strting skipWhile
Received 10
Received 11
...
Received 20
Complete
*/

skipUntil

  1. 주석 1에서 500밀리초 뒤에 배출을 시작 하는 Observable.timer 로 observable2를 생성했다.
  2. 주석 2에서 skipUntil 연산자를 이용하여 observable2가 배출을 시작하기 전까지 observable1의 모든 배출을 무시하도록 했다.

 

fun main(args: Array<String>) {
    val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)

    val observable2 = Observable.timer(500,TimeUnit.MILLISECONDS)//(1)

    observable1
            .skipUntil(observable2)//(2)
            .subscribe(
                    object: Observer<Long> {
                        override fun onError(e: Throwable) {
                            println("Error $e")
                        }

                        override fun onComplete() {
                            println("Complete")
                        }

                        override fun onNext(t: Long) {
                            println("Received $t")
                        }

                        override fun onSubscribe(d: Disposable) {
                            println("starting skipUntil")
                        }

                    }
            )

    runBlocking { delay(1500) }
}
/* 결과
strting skipWhile
Received 5
Received 6
...
Received 14
*/

take 연산자(take, takeList, takeWhile, takeUntil)

take 연산자는 skip 연산자와 정확히 반대로 동작한다.

 

fun main(args: Array<String>) {
    val observable1 = Observable.range(1,20)
    observable1
            .take(5)//(1)
            .subscribe(object:Observer<Int> {
                override fun onError(e: Throwable) {
                    println("Error $e")
                }

                override fun onComplete() {
                    println("Complete")
                }

                override fun onNext(t: Int) {
                    println("Received $t")
                }

                override fun onSubscribe(d: Disposable) {
                    println("starting skip(count)")
                }

            })

    val observable2 = Observable.interval(100,TimeUnit.MILLISECONDS)
    observable2
            .take(400,TimeUnit.MILLISECONDS)//(2)
            .subscribe(
                    object:Observer<Long> {
                        override fun onError(e: Throwable) {
                            println("Error $e")
                        }

                        override fun onComplete() {
                            println("Complete")
                        }

                        override fun onNext(t: Long) {
                            println("Received $t")
                        }

                        override fun onSubscribe(d: Disposable) {
                            println("starting skip(time)")
                        }

                    }
            )

    runBlocking {
        delay(1000)
    }

}
/* 결과
starting take(5)
Received 1
Received 2
...
Received 5
Complete
starting take(400mills)
Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
Complete
*/

takeLast

fun main(args: Array<String>) {
    val observable = Observable.range(1,20)
    observable
            .takeLast(5)//(1)
            .subscribe(object: Observer<Int> {
                override fun onError(e: Throwable) {
                    println("Error $e")
                }

                override fun onComplete() {
                    println("Complete")
                }

                override fun onNext(t: Int) {
                    println("Received $t")
                }

                override fun onSubscribe(d: Disposable) {
                    println("starting skipLast(count)")
                }

            })
}
/* 결과
starting takeLast(5)
Received 16
Received 17
Received 18
Received 19
Received 20
Complete
*/

takeWhile도 정 반대이다. 10보다 작은 숫자일 때 출력한다.

fun main(args: Array<String>) {
    val observable = Observable.range(1,20)
    observable
            .takeWhile{item->item<10}//(1)
            .subscribe(object: Observer<Int> {
                override fun onError(e: Throwable) {
                    println("Error $e")
                }

                override fun onComplete() {
                    println("Complete")
                }

                override fun onNext(t: Int) {
                    println("Received $t")
                }

                override fun onSubscribe(d: Disposable) {
                    println("starting skipWhile")
                }

            })
}
/* 결과
starting takeLast(5)
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Complete
Complete
*/

에러 연산자

예외가 발생할 경우 핸들링을 할 수가 없다. 에러 연산자는 이러한 예외에 핸들링을 가능하게한다.

  • onErrorReturn : 주석 1 과같이 하게 되면 예외가 아닌 -1을 배출하고 예외는 발생하지 않는다.
  • onErrorResumeNext : 주석 2와 같이 예외가 발생하면 지정된 프로듀서(옵저버블/플로어블)로 계속 배출이 진행된다.

 

fun main(args: Array<String>) {
    Observable.just(1,2,3,4,5)
            .map { it/(3-it) }
            //.onErrorReturn { -1 }                       // 1
            //.onErrorResumeNext(Observable.range(10, 5)) // 2
            .subscribe {
                println("Received $it")
            }
}

/* onErrorReturn 결과
Received 0
Received 2
Received -1
*/

/* onErrorREsumeNext
Received 0
Received 2
Received 10
Received 11
Received 12
Received 13
Received 14
*/

에러 발생 시 재시도하기

retry 연산자는 에러가 발생했을  때 동일한 프로듀서에 연산을 재시도하거나 다시 구독 할 수 있도록 해주는 에러 처리 연산자이다. 

  1. 주석 1 에서는 retry 연산자를 사용해 재시도를 세번으로 제한하고
  2. 주석 2 에서는 Predicate와 함께 retry 연산자를 사용했다. try 연산자는 Predicate가 참을 반환하는 한 계속해서 재시도하고 거짓을 반환하면 바로 다운스트림에 에러를 전달한다.
fun main(args: Array<String>) {
    Observable.just(1,2,3,4,5)
            .map { it/(3-it) }
            .retry(3)                   //(1)
            .subscribeBy (
                    onNext  = {println("Received $it")},
                    onError = {println("Error")}
            )
    println("\n With Predicate \n")
    var retryCount = 0
    Observable.just(1,2,3,4,5)
            .map { it/(3-it) }
            .retry {                    //(2)
                _, _->
                (++retryCount)<3
            }
            .subscribeBy (
                    onNext  = {println("Received $it")},
                    onError = {println("Error")}
            )
}
/* 결과
Received 0
Received 2
Received 0
Received 2
Received 0
Received 2
Received 0
Received 2
Error

 With Predicate 

Received 0
Received 2
Received 0
Received 2
Received 0
Received 2
Received 0
Received 2
Error
*/

HTTP 예제

예제를 실행하기 전 추가 플러그인 RxJava-Apache-HTTP를 사용한다. gradle에 다음 의존성을 추가한다.

//RxJava - Apache - HTTP
compile "com.netflex.rxjava:rxjava-apapche-http:0.20.7"

 

  1. HTTPAsyncClients.createDefault()로 CloseableHttpAsyncClient 인스턴스를 가져왔다.
  2. httpClient.startI()를 사용하여 클라이언트를 시작한다.
  3. GET 요청을 생성하고 그것을 옵저블 타입의 ObservableHttpResponse로 변경했는데, 응답 본문에 접근하기 위해 flatMap 연산자를 사용했다.
  4. flatMap 내부에서는 map 연산자를 사용해 byte를 string타입으로 변환했다.
  5. onErrorReturn 연산자를 사용해 에러가 발생했을 시에 기본 string을 반환하도록 했다.
  6. 최종적으로 onErrorReturn 연산자 이후에는 구독을 메서드 체인에 추가했는데 주석 6에서 응답을 콘솔에 출력한다.
  7. 응답을 처리하고 난 후에는 바로 httpClient를 종료시켜 준다.
fun main(args: Array<String>) {
    val httpClient = HttpAsyncClients.createDefault()  // (1)
    httpClient.start()                                 // (2)
    ObservableHttp.createGet("http://rivuchk.com/feed/json", httpClient).toObservable()  //(3)
            .flatMap{ response ->
                response.content.map{ bytes ->
                    String(bytes)
                }                                      // (4)
            }
            .onErrorReturn {                           // (5)
                "Error Parsing data "
            }
            .subscribe {
                println(it)                             // (6)
                httpClient.close()                      // (7)
            }
}