본문 바로가기

책/코틀린 리액티브 프로그래밍

5. 비동기 데이터 연산자와 변환

 애플리케이션에서 비즈니스 로직과 동작을 구현하려면 블로킹 코드를 작성하거나 명령형 프로그래밍과 리액티브 프로그래밍을 혼합하는 대신 연산자를 사용해야 한다. 연산자를 사용하여 순수하게 반응형으로 유지하면 낮은 메모리 사용, 유연한 동시성 및 일회성(disposablility)들을 쉽게 얻을 수 있는데 명령형 프로그래밍과 섞어서 사용할 경우 효과가 줄어들거나 사라지게 된다. 

필터링 연산자

프로듀서의 원하는 배출만 받고 나머지는 필터링 할 경우 사용한다.

debounce

예를 들면 텍스트 입력 구현시 모든 키눌림에 대해서 연산을 원하지 않을 때 사용하는 연산자이다. 사용자가 실제로 원하는 키워드와 일치하는 쿼리를 얻을 수 있을 때까지 입력을 중단하기를 잠시 기다렸다가 다운스트림 연산자에게 전달한다. 

  • 주석 2 에서 debounce 연산자는 각 배출 이후 200밀리초 동안 대기하고 있다가 각 간격에서 아무런 배출이 발생하지 않았을 때만 아이템을 배출하는 다운스트림을 생성한다.
fun main(args: Array<String>) {
    createObservable()//(1)
            .debounce(200, TimeUnit.MILLISECONDS)//(2)
            .subscribe {
                println(it)//(3)
            }
}

inline fun createObservable():Observable<String> = Observable.create<String> {
    it.onNext("R")//(4)
    runBlocking { delay(100) }//(5)
    it.onNext("Re")
    it.onNext("Reac")
    runBlocking { delay(130) }
    it.onNext("Reactiv")
    runBlocking { delay(140) }
    it.onNext("Reactive")
    runBlocking { delay(250) }//(6)
    it.onNext("Reactive P")
    runBlocking { delay(130) }
    it.onNext("Reactive Pro")
    runBlocking { delay(100) }
    it.onNext("Reactive Progra")
    runBlocking { delay(100) }
    it.onNext("Reactive Programming")
    runBlocking { delay(300) }
    it.onNext("Reactive Programming in")
    runBlocking { delay(100) }
    it.onNext("Reactive Programming in Ko")
    runBlocking { delay(150) }
    it.onNext("Reactive Programming in Kotlin")
    runBlocking { delay(250) }
    it.onComplete()
}

/* 결과 
Reactive
Reactive Programming
Reactive Programming in Kotlin
*/

distinct 연산자 : distinct,  distinctUntilChanged

distinct 연산자는  뒤이어 발생하는 모든 배출을 기억하고 미래에 동일한 아이템을 걸러낸다.

fun main(args: Array<String>) {
    listOf(1,2,2,3,4,5,5,5,6,7,8,9,3,10)//(1)
            .toObservable()//(2)
            .distinct()//(3)
            .subscribe { println("Received $it") }//(4)
}

/* 결과 
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
*/

 distinctUntilChanged 연산자는 모든 중복된 배출을 폐기하는 대신 연속적인 중복 배출만 폐기하고 나머지는 그대로 유지한다.

 distinct 연산자는 onComplete를 받을 때까지 각 아이템을 기억하고 있지만 distinctUntilChanged 연산자는 새로운 아이템을 받기 전까지의 아이템만 기억한다.

fun main(args: Array<String>) {
    listOf(1,2,2,3,4,5,5,5,6,7,8,9,3,10)//(1)
            .toObservable()//(2)
            .distinctUntilChanged()//(3)
            .subscribe { println("Received $it") }//(4)
}
/* 결과 
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 3
Received 10
*/

elementAt

프로듀서로 부터 n번째 요소를 받아서 단독 배출로 내보낸다.

  • Observable의 다섯 번째 요소를 요청하고 결과를 배출한다. (인덱스 0부터 시작)
  • Observable에 존재하지 않는 50번째 요소를 요청했지만 아무것도 배출되지 않았다.
  • 이 연산자는 Maybe 모나드의 도움으로 이 동작을 수행한다.
fun main(args: Array<String>) {
    val observable = listOf(10,1,2,5,8,6,9)
            .toObservable()

    observable.elementAt(5)//(1)
            .subscribe { println("Received $it") }

    observable.elementAt(50)//(2)
            .subscribe { println("Received $it") }
}
/* 결과
Received 6
*/

filter

  • filter 연산자를 사용해 배출에서 홀수를 필터링 했다.
fun main(args: Array<String>) {
    Observable.range(1,20)//(1)
            .filter{//(2)
                it%2==0
            }
            .subscribe {
                println("Received $it")
            }
}

/* 결과
Received 2
Received 4
Received 6
Received 8
Received 10
Received 12
Received 14
Received 16
Received 18
Received 20
*/

first, last

첫 번째 마지막 배출만 유지하고 나머지는 폐기한다.

  1. defaultValue 매개변수가 2로 설정된 first 연산자를 사용하는데, 첫 번째 요소에 액세스할 수 없는 경우 defaultValue 매개변수를 배출한다.
  2. last 연산자를 사용했다.
  3. first 연산자를 빈 옵저버블에 다시 사용했다. 따라서 액세스할 수 없는 2를 배출했다.
fun main(args: Array<String>) {
    val observable = Observable.range(1,10)
    observable.first(2)//(1)
            .subscribeBy { item -> println("Received $item") }

    observable.last(2)//(2)
            .subscribeBy { item -> println("Received $item") }

    Observable.empty<Int>().first(2)//(3)
            .subscribeBy { item -> println("Received $item") }
}

/* 결과
Received 1
Received 10
Received 2
*/

ignoreElements

onComplete 이벤트에만 존재하는 Completable 모나드를 반환한다.

fun main(args: Array<String>) {
    val observable = Observable.range(1,10)
    observable
            .ignoreElements()
            .subscribe { println("Completed") }//(1)
}
/* 결과
Completed
*/

변환연산자

프로듀서가 배출한 아이템을 변형하도록 도와준다.

map

map 연산자는 제공된 Function<T, R> 람다를 적용해 타입 T로 배출된 아이템을 타입 R의 배출로 변환한다.

fun main(args: Array<String>) {
    val observable = listOf(10, 9, 8, 7, 6, 5, 4, 3, 2, 1).toObservable()
    observable.map {
        number-> "Transforming Int to String $number"
    }.subscribe {
        item-> println("Received $item")
    }
}

/* 결과
Received Transforming Int to String 10
Received Transforming Int to String 9
Received Transforming Int to String 8
Received Transforming Int to String 7
Received Transforming Int to String 6
Received Transforming Int to String 5
Received Transforming Int to String 4
Received Transforming Int to String 3
Received Transforming Int to String 2
Received Transforming Int to String 1
*/

cast

Observable 에서 람다보다 깔끔하게 데이터 캐스팅을 위해서 사용한다.

  1. 주석 5와 6에 MyItem, MyItemInherit 두가지 클래스를 정의했다.
  2. 주석 1에서 MyItemInherit 리스트를 생성했다.
  3. 주석 2에서 리스트를 옵저버블로 생성한다.
  4. 주석 3 에서 map 연산자에게 MyItem 으로 캐스팅하는 람다를 전달했다.
  5. 주석 4 에서는 cast 연산자를 사용하여 동일한 작업을 하는데 훨씬 깨끗하고 단순해 보인다.
fun main(args: Array<String>) {
    val list = listOf<MyItemInherit>(
            MyItemInherit(1),
            MyItemInherit(2),
            MyItemInherit(3),
            MyItemInherit(4),
            MyItemInherit(5),
            MyItemInherit(6),
            MyItemInherit(7),
            MyItemInherit(8),
            MyItemInherit(9),
            MyItemInherit(10)
    )//(1)

    list.toObservable()//(2)
            .map { it as MyItem }//(3)
            .subscribe {
                println(it)
            }

    println("cast")

    list.toObservable()
            .cast(MyItem::class.java)//(4)
            .subscribe {
                println(it)
            }
}

open class MyItem(val id:Int) {//(5)
    override fun toString(): String {
        return "[MyItem $id]"
    }
}

class MyItemInherit(id:Int):MyItem(id) {//(6)
    override fun toString(): String {
        return "[MyItemInherit $id]"
    }
}

flatMap

map 연산자는 각 배출을 가져와서 변환하지만, flatMap 연산자는 새로운 프로듀서를 만들고 소스 프로듀서에 전달한 함수를 각 배출에 적용한다.

fun main(args: Array<String>) {
    val observable = listOf(10, 9, 8, 7, 6, 5, 4, 3, 2, 1).toObservable()
    observable.flatMap {
        number-> Observable.just("Transforming Int to String $number")
    }.subscribe {
        item-> println("Received $item")
    }
}

/* 결과
Received Transforming Int to String 10
Received Transforming Int to String 9
Received Transforming Int to String 8
Received Transforming Int to String 7
Received Transforming Int to String 6
Received Transforming Int to String 5
Received Transforming Int to String 4
Received Transforming Int to String 3
Received Transforming Int to String 2
Received Transforming Int to String 1
*/

각 배출마다 여러 아이템을 생성하는 경우

  1. Observable.create 연산자로 Observable 인스턴스를 생성했다.
  2. Observable.create 연산자에서 3개의 문자열을 배출하는데 그 후에 onComplete 알림을 보낸다.
  3. 출력을 보면 onComplete 알림을 보내기 전에 모든 항목을 배출한다. 그 이유는 모든 옵저버블이 함께 결합된 후 다운스트림으로 구독했기 때문이다. flatMap 연산자는 내부적으로 merge 연산자를 사용해 여러 옵저버블을 결합한다.
    • concatMap은 merge 연산자 대신 concat 연산자를 사용해 두 개의 Observable/Flowable를 결합하는 동일한 연산을 수행한다.
fun main(args: Array<String>) {
    val observable = listOf(10, 9, 8, 7, 6, 5, 4, 3, 2, 1).toObservable()
    observable.flatMap {
        number-> Observable.create<String> {    // 1
            it.onNext("The Number $number")
            it.onNext("number/2 ${number/2}")
            it.onNext("number%2 ${number%2}")
            it.onComplete()                     // 2
        }
    }.subscribeBy(
        onNext = {
            item-> println("Received $item")
        },
        onComplete = {
            print("Complete")
        }
    )
}

/* 결과
Received The Number 10
Received number number/2 5
Received number number%2 0
Received The Number 9
Received number number/2 4
Received number number%2 1
Received The Number 8
Received number number/2 4
Received number number%2 0
Received The Number 7
Received number number/2 3
Received number number%2 1
Received The Number 6
Received number number/2 3
Received number number%2 0
Received The Number 5
Received number number/2 2
Received number number%2 1
Received The Number 4
Received number number/2 2
Received number number%2 0
Received The Number 3
Received number number/2 1
Received number number%2 1
Received The Number 2
Received number number/2 1
Received number number%2 0
Received The Number 1
Received number number/2 0
Received number number%2 1
Complete
*/

defaultIfEmpty

연산자를 필터링하거나 복잡한 요구 사항을 다루는 동안 빈 프로듀서가 나타날 수 있다. defaultIfEmpty 연산자는 이런 상황을 처리하는데 도움이 된다.

/* 10의 범위를 벗어났기 때문에 빈 옵저버블을 반환한다. */
fun main(args: Array<String>) {
    Observable.range(0,10)//(1)
            .filter{it>15}//(2)
            .subscribe({
                println("Received $it")
            })
}


/* defaultIfEmpty 연산자로 필터링 하여 비어있을 경우 옵저버블에 15를 추가해준다. */
fun main(args: Array<String>) {
    Observable.range(0,10)//(1)
            .filter{it>15}//(2)
            .defaultIfEmpty(15)//(3)
            .subscribe({
                println("Received $it")
            })
}
/* 결과
Received 15
*/

switchIfEmpty

defaultIfEmpty 연산자는 빈 프로듀서에 배출을 추가하지만 switchIfEmpty 연산자는 source 프로듀서가 비어있을 경우 대체 프로듀서로부터 배출한다. 

아이템을 전달해야 하는 defaultIfEmpty 연산자와는 달리 switchIfEmpty 연산자는 대체프로듀서를 전달해야 한다.

fun main(args: Array<String>) {
    Observable.range(0,10)//(1)
            .filter{it>15}//(2)
            .switchIfEmpty(Observable.range(11,10))//(3)
            .subscribe({
                println("Received $it")
            })
}
/* 결과
Received 11
Received 12
Received 13
Received 14
Received 15
Received 16
Received 17
Received 18
Received 19
Received 20
*/

startWith 연산자

startWith 연산자는 기존 배출 리스트에 접두사를 추가했다.

fun main(args: Array<String>) {
    Observable.range(0,10)//(1)
            .startWith(-1)//(2)
            .subscribe({
                println("Received $it")
            })

    listOf("C","C++","Java","Kotlin","Scala","Groovy")//(3)
            .toObservable()
            .startWith("Programming Languages")//(4)
            .subscribe({
                println("Received $it")
            })
}
/* 결과 
Received -1
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received Programming Language
Received C
Received C++
Received Java
Received Kotlin
Received Scala
Received Groovy
*/

sorted

fun main(args: Array<String>) {
    println("default with integer")
    listOf(2,6,7,1,3,4,5,8,10,9)
            .toObservable()
            .sorted()         //(1)
            .subscribe { println("Received $it") }

    println("default with String")
    listOf("alpha","gamma","beta","theta")
            .toObservable()
            .sorted()          //(2)
            .subscribe { println("Received $it") }

    println("custom sortFunction with integer")
    listOf(2,6,7,1,3,4,5,8,10,9)
            .toObservable()
            .sorted { item1, item2 -> if(item1>item2) -1 else 1 } //(3)
            .subscribe { println("Received $it") }

    println("custom sortFunction with custom class-object")
    listOf(MyItem1(2),MyItem1(6),
            MyItem1(7),MyItem1(1),MyItem1(3),
            MyItem1(4),MyItem1(5),MyItem1(8),
            MyItem1(10),MyItem1(9))
            .toObservable()
            .sorted { item1, item2 -> if(item1.item<item2.item) -1 else 1 }//(4)
            .subscribe { println("Received $it") }
}

data class MyItem1(val item:Int)

/* 결과
default with integer
Received 1
Received 2
...
Received 10
default with String
Received alpha
Received beta
Received gamma
Received theta
custom sortFunction with integer
Received 10
Received 9
...
Received 1
custom sortFunction with custom class-object
Received MyItem1(item=1)
Received MyItem1(item=2)
...
Received MyItem1(item=10)
*/

데이터 모으기: scan 

그래프에서 볼 수 있듯이 스캔 연산자는 제공된 누적 함수를 기반으로 현재 배출 아이템과 모든 이전 배출을 아이템을 모은다.

  1. 주석 1은 모든 정수의 합을 구할 때 scan 을 사용했다.
  2. 주석 2는 연결된 문자열을 구할 때 사용했다.
  3. 주석 3은 이전 누적 값에 10을 곱하고 현재 배출값을 더해서 정수를 연결했다.
fun main(args: Array<String>) {
    Observable.range(1,10)
            .scan { previousAccumulation, newEmission ->  previousAccumulation+newEmission }//(1)
            .subscribe { println("Received $it") }

    listOf("String 1","String 2", "String 3", "String 4")
            .toObservable()
            .scan{ previousAccumulation, newEmission ->  previousAccumulation+" "+newEmission }//(2)
            .subscribe { println("Received $it") }

    Observable.range(1,5)
            .scan { previousAccumulation, newEmission ->  previousAccumulation*10+newEmission }//(3)
            .subscribe { println("Received $it") }

}
/* 결과
Received 1
Received 3
Received 6
Received 10
Received 15
Received 21
Received 28
Received 36
Received 45
Received 55
Received string1
Received string1 string2
Received string1 string2 string3
Received string1 string2 string3 string4
Received 1
Received 12
Received 123
Received 1234
Received 12345
*/

축소 연산자

배출을 축적하고 통합해야 요구사항이 있을 수가 있는데, 이런 요구사항에 따르는 거의 모든 연산자는 유한한 데이터 집합만 통합할 수 있으므로 onComplete를 호출하는 유한한 프로듀서(옵저버블/플로어블) 에서만 작동한다.

배출량 계산 : count

count 연산자는 프로듀서를 구독하고 배출량을 계산하며 프로듀서의 배출량을 담고 있는 Single 객체를 배출한다.

출력에서 볼수 있듯이 카운트 연산자는 프로듀서 배출을 계산하고 onComplete 알림을 받으면 배출량을 내보낸다.

fun main(args: Array<String>) {
    listOf(1,5,9,7,6,4,3,2,4,6,9).toObservable()
            .count()
            .subscribeBy { println("count $it") }
}
/*  결과
count 11
*/

배출누적: reduce

  • reduce는 프로듀서의 모든 배출을 누적해서 onComplete 알림을 받으면 누적된 배출을 내보낸다.
  • scan 연산자와의 차이점
    • scan : 배출이 발생할 때마다 누적했다가 배출
    • reduce : 모든 배출을 누적하다가 onComplete 알림을 수신했을 때 배출
fun main(args: Array<String>) {
    Observable.range(1,10)
            .reduce { previousAccumulation, newEmission ->  previousAccumulation+newEmission  }
            .subscribeBy { println("accumulation $it") }

    Observable.range(1,5)
            .reduce { previousAccumulation, newEmission ->  previousAccumulation*10+newEmission  }
            .subscribeBy { println("accumulation $it") }
}
/* 결과
accumulation 55
accumulation 12345
*/

컬렉션 연산자

RxKotlin은 예외적인 경우를 모두 고려해 컬렉션 연산자를 제공한다. 컬렉션 연산자도 모든 배출을 대기하고 있다가 컬렉션 객체로 축적해서 반환한다.

주요 컬렉션 연산자 목록이다.

  • toList 와 toSortedList
  • toMap
  • toMultiMap
  • collect

오류처리 연산자

onError 이벤트로 오류 처리는 가능하지만 오류가 다운스트림인 컨슈머 체인에 배출되고 구독이 즉시 종료된다는 문제점이 있다. 

아래 예제를 보면 에러처리 후  더 이상 Subscription은 배출을 받지 못한다.

fun main(args: Array<String>) {
    Observavle.just(1, 2, 3, 4, 5, 6, 7, "Errr", 8, 9, 10)
        .map { it.toIntOrError() }
        .subscribeBy(
            onNext = {
                println("Next $it")
            },
            onError = {
                println("Errror $it")
            }
        )
}
/* 결과
Next 1
Next 2
Next 3
Next 4
Next 5
Next 6
Next 7
Error java.lang.NumberFormatException: For input string: "Errr"
*/

오류 이후에는 최소한 대체 소스 프로듀서를 통해 재구동하거나 전환 하는 방법이 있어야 한다. 이는 오류 처리 연산자로 가능하며 주요 목록이다.

  • onErrorResumeNext()
  • onErrorReturn()
  • onExcpeptionResumeNext()
  • retry()
  • entryWhen()

유틸리티 연산자

유틸리티 연산자는 배출물에 대한 특정 행동, 각 항목의 타임스탬프 저장, 캐싱 등과 같은 다양한 유틸리티 작업을 수행하는데 도움을 준다.

 

유틸리티 연산자 목록은 다음과 같다.

  • doOnNext, doOnComplete, doOnError
  • doOnSubscribe, doOnDispose, doOnSuccess
  • serialize
  • cache