애플리케이션에서 비즈니스 로직과 동작을 구현하려면 블로킹 코드를 작성하거나 명령형 프로그래밍과 리액티브 프로그래밍을 혼합하는 대신 연산자를 사용해야 한다. 연산자를 사용하여 순수하게 반응형으로 유지하면 낮은 메모리 사용, 유연한 동시성 및 일회성(disposablility)들을 쉽게 얻을 수 있는데 명령형 프로그래밍과 섞어서 사용할 경우 효과가 줄어들거나 사라지게 된다.
필터링 연산자
프로듀서의 원하는 배출만 받고 나머지는 필터링 할 경우 사용한다.
debounce
예를 들면 텍스트 입력 구현시 모든 키눌림에 대해서 연산을 원하지 않을 때 사용하는 연산자이다. 사용자가 실제로 원하는 키워드와 일치하는 쿼리를 얻을 수 있을 때까지 입력을 중단하기를 잠시 기다렸다가 다운스트림 연산자에게 전달한다.
- 주석 2 에서 debounce 연산자는 각 배출 이후 200밀리초 동안 대기하고 있다가 각 간격에서 아무런 배출이 발생하지 않았을 때만 아이템을 배출하는 다운스트림을 생성한다.
fun main(args: Array<String>) {
createObservable()//(1)
.debounce(200, TimeUnit.MILLISECONDS)//(2)
.subscribe {
println(it)//(3)
}
}
inline fun createObservable():Observable<String> = Observable.create<String> {
it.onNext("R")//(4)
runBlocking { delay(100) }//(5)
it.onNext("Re")
it.onNext("Reac")
runBlocking { delay(130) }
it.onNext("Reactiv")
runBlocking { delay(140) }
it.onNext("Reactive")
runBlocking { delay(250) }//(6)
it.onNext("Reactive P")
runBlocking { delay(130) }
it.onNext("Reactive Pro")
runBlocking { delay(100) }
it.onNext("Reactive Progra")
runBlocking { delay(100) }
it.onNext("Reactive Programming")
runBlocking { delay(300) }
it.onNext("Reactive Programming in")
runBlocking { delay(100) }
it.onNext("Reactive Programming in Ko")
runBlocking { delay(150) }
it.onNext("Reactive Programming in Kotlin")
runBlocking { delay(250) }
it.onComplete()
}
/* 결과
Reactive
Reactive Programming
Reactive Programming in Kotlin
*/
distinct 연산자 : distinct, distinctUntilChanged
distinct 연산자는 뒤이어 발생하는 모든 배출을 기억하고 미래에 동일한 아이템을 걸러낸다.
fun main(args: Array<String>) {
listOf(1,2,2,3,4,5,5,5,6,7,8,9,3,10)//(1)
.toObservable()//(2)
.distinct()//(3)
.subscribe { println("Received $it") }//(4)
}
/* 결과
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
*/
distinctUntilChanged 연산자는 모든 중복된 배출을 폐기하는 대신 연속적인 중복 배출만 폐기하고 나머지는 그대로 유지한다.
distinct 연산자는 onComplete를 받을 때까지 각 아이템을 기억하고 있지만 distinctUntilChanged 연산자는 새로운 아이템을 받기 전까지의 아이템만 기억한다.
fun main(args: Array<String>) {
listOf(1,2,2,3,4,5,5,5,6,7,8,9,3,10)//(1)
.toObservable()//(2)
.distinctUntilChanged()//(3)
.subscribe { println("Received $it") }//(4)
}
/* 결과
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 3
Received 10
*/
elementAt
프로듀서로 부터 n번째 요소를 받아서 단독 배출로 내보낸다.
- Observable의 다섯 번째 요소를 요청하고 결과를 배출한다. (인덱스 0부터 시작)
- Observable에 존재하지 않는 50번째 요소를 요청했지만 아무것도 배출되지 않았다.
- 이 연산자는 Maybe 모나드의 도움으로 이 동작을 수행한다.
fun main(args: Array<String>) {
val observable = listOf(10,1,2,5,8,6,9)
.toObservable()
observable.elementAt(5)//(1)
.subscribe { println("Received $it") }
observable.elementAt(50)//(2)
.subscribe { println("Received $it") }
}
/* 결과
Received 6
*/
filter
- filter 연산자를 사용해 배출에서 홀수를 필터링 했다.
fun main(args: Array<String>) {
Observable.range(1,20)//(1)
.filter{//(2)
it%2==0
}
.subscribe {
println("Received $it")
}
}
/* 결과
Received 2
Received 4
Received 6
Received 8
Received 10
Received 12
Received 14
Received 16
Received 18
Received 20
*/
first, last
첫 번째 마지막 배출만 유지하고 나머지는 폐기한다.
- defaultValue 매개변수가 2로 설정된 first 연산자를 사용하는데, 첫 번째 요소에 액세스할 수 없는 경우 defaultValue 매개변수를 배출한다.
- last 연산자를 사용했다.
- first 연산자를 빈 옵저버블에 다시 사용했다. 따라서 액세스할 수 없는 2를 배출했다.
fun main(args: Array<String>) {
val observable = Observable.range(1,10)
observable.first(2)//(1)
.subscribeBy { item -> println("Received $item") }
observable.last(2)//(2)
.subscribeBy { item -> println("Received $item") }
Observable.empty<Int>().first(2)//(3)
.subscribeBy { item -> println("Received $item") }
}
/* 결과
Received 1
Received 10
Received 2
*/
ignoreElements
onComplete 이벤트에만 존재하는 Completable 모나드를 반환한다.
fun main(args: Array<String>) {
val observable = Observable.range(1,10)
observable
.ignoreElements()
.subscribe { println("Completed") }//(1)
}
/* 결과
Completed
*/
변환연산자
프로듀서가 배출한 아이템을 변형하도록 도와준다.
map
map 연산자는 제공된 Function<T, R> 람다를 적용해 타입 T로 배출된 아이템을 타입 R의 배출로 변환한다.
fun main(args: Array<String>) {
val observable = listOf(10, 9, 8, 7, 6, 5, 4, 3, 2, 1).toObservable()
observable.map {
number-> "Transforming Int to String $number"
}.subscribe {
item-> println("Received $item")
}
}
/* 결과
Received Transforming Int to String 10
Received Transforming Int to String 9
Received Transforming Int to String 8
Received Transforming Int to String 7
Received Transforming Int to String 6
Received Transforming Int to String 5
Received Transforming Int to String 4
Received Transforming Int to String 3
Received Transforming Int to String 2
Received Transforming Int to String 1
*/
cast
Observable 에서 람다보다 깔끔하게 데이터 캐스팅을 위해서 사용한다.
- 주석 5와 6에 MyItem, MyItemInherit 두가지 클래스를 정의했다.
- 주석 1에서 MyItemInherit 리스트를 생성했다.
- 주석 2에서 리스트를 옵저버블로 생성한다.
- 주석 3 에서 map 연산자에게 MyItem 으로 캐스팅하는 람다를 전달했다.
- 주석 4 에서는 cast 연산자를 사용하여 동일한 작업을 하는데 훨씬 깨끗하고 단순해 보인다.
fun main(args: Array<String>) {
val list = listOf<MyItemInherit>(
MyItemInherit(1),
MyItemInherit(2),
MyItemInherit(3),
MyItemInherit(4),
MyItemInherit(5),
MyItemInherit(6),
MyItemInherit(7),
MyItemInherit(8),
MyItemInherit(9),
MyItemInherit(10)
)//(1)
list.toObservable()//(2)
.map { it as MyItem }//(3)
.subscribe {
println(it)
}
println("cast")
list.toObservable()
.cast(MyItem::class.java)//(4)
.subscribe {
println(it)
}
}
open class MyItem(val id:Int) {//(5)
override fun toString(): String {
return "[MyItem $id]"
}
}
class MyItemInherit(id:Int):MyItem(id) {//(6)
override fun toString(): String {
return "[MyItemInherit $id]"
}
}
flatMap
map 연산자는 각 배출을 가져와서 변환하지만, flatMap 연산자는 새로운 프로듀서를 만들고 소스 프로듀서에 전달한 함수를 각 배출에 적용한다.
fun main(args: Array<String>) {
val observable = listOf(10, 9, 8, 7, 6, 5, 4, 3, 2, 1).toObservable()
observable.flatMap {
number-> Observable.just("Transforming Int to String $number")
}.subscribe {
item-> println("Received $item")
}
}
/* 결과
Received Transforming Int to String 10
Received Transforming Int to String 9
Received Transforming Int to String 8
Received Transforming Int to String 7
Received Transforming Int to String 6
Received Transforming Int to String 5
Received Transforming Int to String 4
Received Transforming Int to String 3
Received Transforming Int to String 2
Received Transforming Int to String 1
*/
각 배출마다 여러 아이템을 생성하는 경우
- Observable.create 연산자로 Observable 인스턴스를 생성했다.
- Observable.create 연산자에서 3개의 문자열을 배출하는데 그 후에 onComplete 알림을 보낸다.
- 출력을 보면 onComplete 알림을 보내기 전에 모든 항목을 배출한다. 그 이유는 모든 옵저버블이 함께 결합된 후 다운스트림으로 구독했기 때문이다. flatMap 연산자는 내부적으로 merge 연산자를 사용해 여러 옵저버블을 결합한다.
- concatMap은 merge 연산자 대신 concat 연산자를 사용해 두 개의 Observable/Flowable를 결합하는 동일한 연산을 수행한다.
fun main(args: Array<String>) {
val observable = listOf(10, 9, 8, 7, 6, 5, 4, 3, 2, 1).toObservable()
observable.flatMap {
number-> Observable.create<String> { // 1
it.onNext("The Number $number")
it.onNext("number/2 ${number/2}")
it.onNext("number%2 ${number%2}")
it.onComplete() // 2
}
}.subscribeBy(
onNext = {
item-> println("Received $item")
},
onComplete = {
print("Complete")
}
)
}
/* 결과
Received The Number 10
Received number number/2 5
Received number number%2 0
Received The Number 9
Received number number/2 4
Received number number%2 1
Received The Number 8
Received number number/2 4
Received number number%2 0
Received The Number 7
Received number number/2 3
Received number number%2 1
Received The Number 6
Received number number/2 3
Received number number%2 0
Received The Number 5
Received number number/2 2
Received number number%2 1
Received The Number 4
Received number number/2 2
Received number number%2 0
Received The Number 3
Received number number/2 1
Received number number%2 1
Received The Number 2
Received number number/2 1
Received number number%2 0
Received The Number 1
Received number number/2 0
Received number number%2 1
Complete
*/
defaultIfEmpty
연산자를 필터링하거나 복잡한 요구 사항을 다루는 동안 빈 프로듀서가 나타날 수 있다. defaultIfEmpty 연산자는 이런 상황을 처리하는데 도움이 된다.
/* 10의 범위를 벗어났기 때문에 빈 옵저버블을 반환한다. */
fun main(args: Array<String>) {
Observable.range(0,10)//(1)
.filter{it>15}//(2)
.subscribe({
println("Received $it")
})
}
/* defaultIfEmpty 연산자로 필터링 하여 비어있을 경우 옵저버블에 15를 추가해준다. */
fun main(args: Array<String>) {
Observable.range(0,10)//(1)
.filter{it>15}//(2)
.defaultIfEmpty(15)//(3)
.subscribe({
println("Received $it")
})
}
/* 결과
Received 15
*/
switchIfEmpty
defaultIfEmpty 연산자는 빈 프로듀서에 배출을 추가하지만 switchIfEmpty 연산자는 source 프로듀서가 비어있을 경우 대체 프로듀서로부터 배출한다.
아이템을 전달해야 하는 defaultIfEmpty 연산자와는 달리 switchIfEmpty 연산자는 대체프로듀서를 전달해야 한다.
fun main(args: Array<String>) {
Observable.range(0,10)//(1)
.filter{it>15}//(2)
.switchIfEmpty(Observable.range(11,10))//(3)
.subscribe({
println("Received $it")
})
}
/* 결과
Received 11
Received 12
Received 13
Received 14
Received 15
Received 16
Received 17
Received 18
Received 19
Received 20
*/
startWith 연산자
startWith 연산자는 기존 배출 리스트에 접두사를 추가했다.
fun main(args: Array<String>) {
Observable.range(0,10)//(1)
.startWith(-1)//(2)
.subscribe({
println("Received $it")
})
listOf("C","C++","Java","Kotlin","Scala","Groovy")//(3)
.toObservable()
.startWith("Programming Languages")//(4)
.subscribe({
println("Received $it")
})
}
/* 결과
Received -1
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received Programming Language
Received C
Received C++
Received Java
Received Kotlin
Received Scala
Received Groovy
*/
sorted
fun main(args: Array<String>) {
println("default with integer")
listOf(2,6,7,1,3,4,5,8,10,9)
.toObservable()
.sorted() //(1)
.subscribe { println("Received $it") }
println("default with String")
listOf("alpha","gamma","beta","theta")
.toObservable()
.sorted() //(2)
.subscribe { println("Received $it") }
println("custom sortFunction with integer")
listOf(2,6,7,1,3,4,5,8,10,9)
.toObservable()
.sorted { item1, item2 -> if(item1>item2) -1 else 1 } //(3)
.subscribe { println("Received $it") }
println("custom sortFunction with custom class-object")
listOf(MyItem1(2),MyItem1(6),
MyItem1(7),MyItem1(1),MyItem1(3),
MyItem1(4),MyItem1(5),MyItem1(8),
MyItem1(10),MyItem1(9))
.toObservable()
.sorted { item1, item2 -> if(item1.item<item2.item) -1 else 1 }//(4)
.subscribe { println("Received $it") }
}
data class MyItem1(val item:Int)
/* 결과
default with integer
Received 1
Received 2
...
Received 10
default with String
Received alpha
Received beta
Received gamma
Received theta
custom sortFunction with integer
Received 10
Received 9
...
Received 1
custom sortFunction with custom class-object
Received MyItem1(item=1)
Received MyItem1(item=2)
...
Received MyItem1(item=10)
*/
데이터 모으기: scan
그래프에서 볼 수 있듯이 스캔 연산자는 제공된 누적 함수를 기반으로 현재 배출 아이템과 모든 이전 배출을 아이템을 모은다.
- 주석 1은 모든 정수의 합을 구할 때 scan 을 사용했다.
- 주석 2는 연결된 문자열을 구할 때 사용했다.
- 주석 3은 이전 누적 값에 10을 곱하고 현재 배출값을 더해서 정수를 연결했다.
fun main(args: Array<String>) {
Observable.range(1,10)
.scan { previousAccumulation, newEmission -> previousAccumulation+newEmission }//(1)
.subscribe { println("Received $it") }
listOf("String 1","String 2", "String 3", "String 4")
.toObservable()
.scan{ previousAccumulation, newEmission -> previousAccumulation+" "+newEmission }//(2)
.subscribe { println("Received $it") }
Observable.range(1,5)
.scan { previousAccumulation, newEmission -> previousAccumulation*10+newEmission }//(3)
.subscribe { println("Received $it") }
}
/* 결과
Received 1
Received 3
Received 6
Received 10
Received 15
Received 21
Received 28
Received 36
Received 45
Received 55
Received string1
Received string1 string2
Received string1 string2 string3
Received string1 string2 string3 string4
Received 1
Received 12
Received 123
Received 1234
Received 12345
*/
축소 연산자
배출을 축적하고 통합해야 요구사항이 있을 수가 있는데, 이런 요구사항에 따르는 거의 모든 연산자는 유한한 데이터 집합만 통합할 수 있으므로 onComplete를 호출하는 유한한 프로듀서(옵저버블/플로어블) 에서만 작동한다.
배출량 계산 : count
count 연산자는 프로듀서를 구독하고 배출량을 계산하며 프로듀서의 배출량을 담고 있는 Single 객체를 배출한다.
출력에서 볼수 있듯이 카운트 연산자는 프로듀서 배출을 계산하고 onComplete 알림을 받으면 배출량을 내보낸다.
fun main(args: Array<String>) {
listOf(1,5,9,7,6,4,3,2,4,6,9).toObservable()
.count()
.subscribeBy { println("count $it") }
}
/* 결과
count 11
*/
배출누적: reduce
- reduce는 프로듀서의 모든 배출을 누적해서 onComplete 알림을 받으면 누적된 배출을 내보낸다.
- scan 연산자와의 차이점
- scan : 배출이 발생할 때마다 누적했다가 배출
- reduce : 모든 배출을 누적하다가 onComplete 알림을 수신했을 때 배출
fun main(args: Array<String>) {
Observable.range(1,10)
.reduce { previousAccumulation, newEmission -> previousAccumulation+newEmission }
.subscribeBy { println("accumulation $it") }
Observable.range(1,5)
.reduce { previousAccumulation, newEmission -> previousAccumulation*10+newEmission }
.subscribeBy { println("accumulation $it") }
}
/* 결과
accumulation 55
accumulation 12345
*/
컬렉션 연산자
RxKotlin은 예외적인 경우를 모두 고려해 컬렉션 연산자를 제공한다. 컬렉션 연산자도 모든 배출을 대기하고 있다가 컬렉션 객체로 축적해서 반환한다.
주요 컬렉션 연산자 목록이다.
- toList 와 toSortedList
- toMap
- toMultiMap
- collect
오류처리 연산자
onError 이벤트로 오류 처리는 가능하지만 오류가 다운스트림인 컨슈머 체인에 배출되고 구독이 즉시 종료된다는 문제점이 있다.
아래 예제를 보면 에러처리 후 더 이상 Subscription은 배출을 받지 못한다.
fun main(args: Array<String>) {
Observavle.just(1, 2, 3, 4, 5, 6, 7, "Errr", 8, 9, 10)
.map { it.toIntOrError() }
.subscribeBy(
onNext = {
println("Next $it")
},
onError = {
println("Errror $it")
}
)
}
/* 결과
Next 1
Next 2
Next 3
Next 4
Next 5
Next 6
Next 7
Error java.lang.NumberFormatException: For input string: "Errr"
*/
오류 이후에는 최소한 대체 소스 프로듀서를 통해 재구동하거나 전환 하는 방법이 있어야 한다. 이는 오류 처리 연산자로 가능하며 주요 목록이다.
- onErrorResumeNext()
- onErrorReturn()
- onExcpeptionResumeNext()
- retry()
- entryWhen()
유틸리티 연산자
유틸리티 연산자는 배출물에 대한 특정 행동, 각 항목의 타임스탬프 저장, 캐싱 등과 같은 다양한 유틸리티 작업을 수행하는데 도움을 준다.
유틸리티 연산자 목록은 다음과 같다.
- doOnNext, doOnComplete, doOnError
- doOnSubscribe, doOnDispose, doOnSuccess
- serialize
- cache
'책 > 코틀린 리액티브 프로그래밍' 카테고리의 다른 글
7. RxKotlin의 스케줄러를 사용한 동시성과 병렬 처리 (0) | 2020.05.29 |
---|---|
6. 연산자 및 오류처리 (0) | 2020.05.28 |
4. 백프레셔와 플로어블 소개 (0) | 2020.05.26 |
3. 옵저버블과 옵저버 구독자 (0) | 2020.05.26 |
2. 코틀린과 RxKotlin을 사용한 함수형 프로그래밍 (0) | 2020.05.25 |