옵저버블
일련의 연산자를 거친 아이템을 최종 옵저버로 내보내는 푸시 기반의 조합 가능한 이터레이터이다. 여기서 옵저버는 아이템들을 소비한다.
- 옵저버는 옵저버블을 구독한다.
- 옵저버블이 그 내부의 아이템들을 내보내기 시작한다.
- 옵저버는 옵저버블에서 내보내는 모든 아이템에 반응한다.
옵저버블이 동작하는 방법
- onNext : 옵저버블은 모든 아이템을 하나씩 이 메서드에 전달한다.
- onComplete : 모든 아이템이 onNext 메서드를 통과하면 옵저버블은 onComplete 메서드를 호출한다.
- onError : 옵저버블에서 에러가 발생하면 onError 메서드가 호출돼 정의된 대로 에러를 처리한다. onError와 onComplete는 터미널 이벤트이다. onError가 호출됬을 경우 onComplete가 호출되지 않으면, 반대의 경우도 마찬가지다.
fun main(args: Array<String>) {
val observer: Observer<Any> = object: Observer<Any> { // 1
override fun onComplete() { // 2
println("All Complete")
}
override fun onNext(item: Any) { // 3
println("Next $item")
}
override fun onError(e: Throwable) { // 4
println("Error Occured $e")
}
// 옵저버가 옵저버블을 구독할 때마다 호출된다.
override fun onSubscribe(d: Disposable) { // 5
println ("Subscribed to $d")
}
}
val observable: Observable<Any>
= listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f ).toObservable () // 6
observable.subscribe(observer) // 7
val observableOnList: Observable<List<Any>> = Observable.just(
listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f),
listOf("List with Single Item"),
listOf(1, 2, 3, 4, 5, 6 ) // 8
)
observableList.subscribe(observer) // 9
}
Observable.create 메서드의 이해
Observable.create 메서드는 특히 사용자가 지정한 데이터 구조를 사용하거나 내보내는 값을 제어하려고 할때 유용하다. 다른 스레드에서 옵저버로 값을 내보낼 수도 있다.
fun main(args: Array<String>) {
val observer: Observer<Any> = object: Observer<String> {
override fun onComplete() {
println ("All Complete")
}
override fun onNext(item: Any) {
println ("Next $item")
}
override fun onError(e : Throwable) {
println ("Error Occured ${e.message}")
}
override fun onSubscribe(d : Disposable) {
println ("New Subscription")
}
} // Observer 생성
var observable: Observable<String> = Observable.create<String> { // 1
it.onNext("Emit 1")
it.onNext("Emit 2")
it.onNext("Emit 3")
it.onNext("Emit 4")
it.onComplete()
}
observable.subscribe(observer)
var observable2: Observable<String> = Observable.create<String> { // 2
it.onNext("Emit 1")
it.onNext("Emit 2")
it.onNext("Emit 3")
it.onNext("Emit 4")
it.onError(Exception("My Custom Exception"))
}
observable2.subscribe(observer)
}
Observable.from 메서드의 이해
from 메서드의 도움을 받아 거의 모든 코틀린 구조체로 부터 Observable 인스턴스를 생성할 수 있다.
RxKotlin 1 에서는 Observable.from 을 사용한다. 그러나 RxKotlin 2.0에서 연산자 오버로드로서 fromArray, fromIterable, fromFuture 등과 같은 접미사가 추가됬다.
fun main(args: Array<String>) {
var observer: Observable<String> = object: Observer<String> {
...
}// Observer 생성
val list = listOf("String 1", "String 2", "String 3", "String 4")
val observableFromIterable: Observerble<String> = Observable.fromIterable(list) // 1
observableFromIterable.subscribe(observer)
val callable = object: Callable<String> {
override fun call(): String {
return "Fron Callable"
}
}
val observableFromCallable: Observable<String> = Observable.fromCallable(callable) // 2
observableFromCallable.subscribe(observer)
val list:List<String> = object: Future<String> {
override fun get(): String = "Hello From Future"
override fun get(timeout: Long, unit: TimeUnit?): String = "Hello From Future"
override fun isDone(): Boolean = true
override fun isCancelled(): Boolean = false
override fun cancel(mayInterruptIfRunning: Boolean): Boolean = false
}
val observerbleFromFuture: Observable<String> = Observable.fromFuture(future) // 3
observerbleFromFuture.subscribe(observer)
}
toObservable의 확장 함수 이해
fun main(args: Array<String>) {
var observer: Observable<String> = object: Observer<String> {
...
}// Observer 생성
val list = listOf("String 1", "String 2", "String 3", "String 4")
val observable: Observerble<String> = list.toObservable()
observable.subscribe(observer)
}
toObservable의 메서드 내부 observable.kt
fun <T: Any> Iterator<T>.toObservavle(): Observable<T> = toIterable().toObservable()
fun <T: Any> Iterable<T>.toObservavle(): Observable<T> = Observable.fromIterable(this)
fun <T: Any> Sequence<T>.toObservavle(): Observable<T> = asIterable().toObservable()
fun <T: Any> Iterable<Observable<out T>>.merge(): Observable<T> = Observable.merge(this.toObservable())
fun <T: Any> Iterable<Observable<out T>>.mergeDelayError(): Observable<T> = Observable.mergeDelayError(this.toObservable())
Observable.just 메서드의 이해
넘겨진 인자만을 배출하는 옵저버블을 생성한다. Iterable 인스턴스를 Observable.just에 단일 인자로 넘기면 전체 목록을 하나의 아이템으로 배출하는데 이는 Iterable 내부의 각각의 아이템을 Observable로 생성하는 Observable.from 과는 다르다는 점을 유의하자. Observable.just를 호출하면 다음과 같은 일이 일어난다.
- 인자와 함께 Observable.just를 호출
- Observable.jeus는 옵저버블을 생성
- onNext 알림을 통해 각각의 아이템을 내보냄
- 모든 인자의 제출이 완료되면 onComplete 알림을 실행
fun main(args: Array<String>) {
var observer: Observable<String> = object: Observer<String> {
...
}// Observer 생성
Observable.just("A String").subscribe(observer)
Observable.just(54).subscribe(observer)
Observable.just(
listOf("String 1", "String 2", "String 3", "String 4")
).subscribe(observer)
Observable.just(
mapOf(
Pair("Key 1", "Value 1"),
Pair("Key 2", "Value 2"),
Pair("Key 3", "Value 3")
)
).subscribe(observer)
Observable.just(arrayListOf(1, 2, 3, 4, 5, 6)).subscribe(observer)
// 각각 별개의 아이템으로 받아들여 내보내 진다.
Observable.just("String 1", "String 2", "String 3").subscribe(observer) // 1
}
Observable의 다른 팩토리 메서드
- Observavle.range() : 옵저버블을 생성하고 제공된 start부터 시작해 count만큼의 정수를 내보낸다.
- Observavle.empty() : 이 옵저버블은 onNext()로 항목을 내보내지 않고 즉시 onComplete()를 발생시킨다.
- Observable.interval() : 지정된 간격만큼의 숫자를 0부터 순차적으로 내보낸다. 이 행동은 구독을 취소하거나 프로그램이 종료될때까지 이어진다.
- Observable.timer() : 지정된 시간이 경과한 후에 한 번만 실행된다.
fun main(args: Array<String>) {
var observer: Observable<String> = object: Observer<String> {
...
}// Observer 생성
Observable.range(1,10).subscribe(observer) // 1
Observable.empty().subscribe(observer) // 2
runBlocking {
Observable.interval(300, TimeUnit.MILLISECONDS).subscribe(observer) // 3
delay(900)
Observable.timer(400, TimeUnit.MILLISECONDS).subscribe(observer) // 4
delay(450)
}
}
구독자: Observer 인터페이스
기본적으로 RxKotlin 1.x 의 구독자(Subscriber)는 RxKotlin 2.x에서 옵저버(Observer)로 변경됬다. RxKotlin 1.x에는 Observer 인터페이스가 있다. 하지만 subscribe() 메서드에 전달하는 것은 Subscriber이고, 이는 Observer를 구현한다.
네 가지 메서드가 있다.
- onNext : 아이템을 하나씩 넘겨주기 위해서 옵저버블은 옵저버의 이 메서드를 호출한다.
- onComplete : 옵저버블이 onNext를 통한 아이템 전달이 종료됐음을 알리고 싶을 때 옵저버의 onComplete를 호출한다.
- onError : 옵저버블에서 에러가 발생했을 때 옵저버에 정의된 로직이 있다면 onError를 호출하고 그렇지 않다면 예외를 발생시킨다.
- onSubscriber: Observable이 새로운 Observer를 구독할 때마다 호출된다.
구독과 해지
Observable(관찰되야 하는 대상), Observer(관찰해야 하는 주체), subscribe 연산자는 이 둘을 연결하는 매개체 용도로 사용된다.
fun main(args: Array<String>) {
val observable: Observable<Int> = Observable.range(1,5) // 1
observable.subscribe({ // 2
// onNext 메서드
println("Next $it")
},{
// onError 메서드
println("Error ${it.message}")
},{
// onComplete 메서드
println("Done")
})
var observer: Observable<Int> = object: Observer<Int> { // 3
override fun onComplete() {
println("All Complete")
}
override fun onNext(item: Int) {
println("Next $item")
}
override fun onError(e: Throwable) {
println("Error Occurred ${e.message}")
}
override fun onSubscribe(d: Disposable) {
println("New Subscription")
}
}
observable.subscribe(observer)
}
onSubscribe 메서드의 매개변수인 Disposable 인터페이스로 구독을 해지 할 수 있다.
fun main(args: Array<String>) {
runBlocking {
val observable: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS) // 1
val observer: Observer<Long> = object: Observer<Long> {
lateinit var disposable: Disposable // 2
override fun onSubscribe(d: Disposable) {
disposable = d // 3
}
override fun onNext(item: Long) {
println("Received $item")
/*
제출된 값이 10보다 크거나 같은지 확인하고,
배출이 이미 중단됬거나 해지되지 않은 경우 해지한다.
*/
if(item >= 10 && !disposable.isDisposed) {
disposable.dispose() // 5
println("Disposed")
}
}
override fun onError(e: Throwable) {
println("Error ${e.message}")
}
override fun onComplete() {
println("Complete")
}
}
observable.subscribe(observer)
delay(1500) // 6
}
}
Disposable 인터페이스의 정의
interface Disposable {
/**
* 리소스를 처리한다. 연산은 멱등성을 가져야 한다.
*/
fun dispose()
/**
* 리소스가 처리됬다면 참을 반환한다.
*/
val isDisposed: Boolean
}
핫, 콜드 옵저버블
콜드 옵저버블
콜드 옵저버블은 구독 시에 실행을 시작하고 subscribe가 호출되면 아이템을 푸시하기 시작하는데 각 구독에서 아이템의 동일한 순서를 푸시한다.
fun main(args: Array<String>) {
val observable: Observable<String>
= listOf("String 1", "String 2", "String 3", "String 4").toObservable() // 1
observable.subscribe({ // 2
print("Received $it")
},{
print("Error ${it.message}")
},{
print("Done")
})
observable.subscribe({ // 3
print("Received $it")
},{
print("Error ${it.message}")
},{
print("Done")
})
}
핫 옵저버블
콜드 옵저버블을 CD/DVD레코딩으로 본다면 핫 옵저버블은 TV채널과 비슷하다. 핫 옵저버블은 시청자가 시청하는지 여부에 관계없이 콘텐츠를 계속 브로드캐스팅(배출)한다.
핫 옵저버블은 데이터 보다는 이벤트와 유사하다. 이벤트에는 데이터가 포함될 수 있지만 시간에 민감한 특징을 가지는데 최근 가입한 Observer가 이전에 내보낸 데이터를 놓칠 수 있기 때문이다. 이런 특징은 안드로이드/자바FX/스윙 등에서 UI 이벤트를 다룰 때 유용하다. 또한 서버 요청을 흉내 내는 데 유용하다.
ConnectableObservable 객체의 소개
ConnectableObservable의 주요 목적은 한 옵저버블에 여러 개의 구독을 연결해 하나의 푸시에 대응할 수 있도록 하는 것이다. 이는 푸시를 반복하고 각 구독마다 따로 푸시를 보내는 콜드 옵저버블과는 상이하다.
- toObservable로 옵저버블을 생성하고 publish 연산자로 콜드 옵저버블을 ConnectableObserverble로 변환했다.
- ConnectableObserverble을 구독했다.
- map 연산자로 String 을 뒤집었다. (String 1 -> 1 gnirtS)
- ConnectableObserverble을 구독했다.
- connect 메소드호출로 두 옵저버에게 모두 배출이 시작됬다.
- 옵저버블에게 단 한번의 배출로 모든 구독(Subscription)/관찰자(Observers)에게 배출을 전달하는 메커니즘을 멀티캐스팅이라고 한다
- connect 메소드 호출 이후에 일어난 모든 새로운 구독은 이전에 생성된 배출을 놓치게 된다.
fun main(args: Array<String>) {
val connectableObservable = listOf(
"String 1", "String 2", "String 3", "String 4", "String 5"
).toObservable().publish() // 1
connectableObservable.subscribe({ println("Subscription 1: $it") }) // 2
connectableObservable.map(String::reversed) // 3
.subscribe({ println("Subscription 2: $it") }) // 4
connectableObservable.connect() // 5
// 배출을 받지 못함
connectableObservable.subscribe({ println("Subscription 3: $it") }) // 6
}
- Observable.interval 옵저버블을 사용한 이유는 각 배출마다 간격이 생겨 connect 이후의 구독에 약간의 공간을 줄 수 있기 때문이다.
- publish 로 ConnectableObserverble 로 변환하고 두번 구독 후 connect 를 호출했다.
- connect 후 지연, 다시 구독 세번째 구독이 일부 데이터를 인쇄할 수 있도록 다시 지연을 호출한다.
- 출력을 보면 3번째 옵저버가 5번째 배출을 받고 이전의 구독을 모두 놓친다.
fun main(args: Array<String>) {
val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish() // 1
connectableObservable.subscribe({ println("Subscription 1: $it") }) // 2
connectableObservable.subscribe({ println("Subscription 2: $it") }) // 3
connectableObservable.connect() // 4
runBlocking { delay(500) } // 5
connectableObservable.subscribe({ println("Subscription 3: $it") }) // 6
runBlocking { delay(500) } // 7
}
Subjects
핫 옵저버블을 구현하는데 또 다른 좋은 방법은 Subject 이다. Subjects는 옵저버블과 옵저버의 조합인데 두 가지 모두의 공통된 동작을 갖고 있기 때문이다. 핫 옵저버블과 마찬가지로 내부 Observer 목록을 유지하고 배출 시에 가입한 모든 옵저버에게 단일 푸시를 전달한다.
Subjects가 제공하는 것
- 옵저버블이 가져야 하는 모든 연산자를 갖고 있다.
- 옵저버와 마찬가지로 배출된 모든 값에 접근할 수 있다.
- Subject가 완료/오류/구독해지 된 후에는 재사용 할 수 없다.
- 가장 흥미로운 점은 그 자체로 값을 전달한다는 것이다. 자세히 설명하자면 onNext를 사용해 값을 Subject(Observer)측에 전달하면 Observerble에서 접근 가능하게 된다.
- interval 메소드를 이용 100밀초 간겨으로 옵저버블 인스턴스를 다시 생성한다.
- PublishSubject.create() 로 Subject를 생성한다.
- 옵저버 처럼 Subject 인스턴스를 사용해 옵저버블 인스턴스의 배출을 구독한다.
- Subject의 인스턴스를 사용해 Subject 인스턴스에 의한 배출에 접근하기 위해 람다를 사용해 구독한다.
- 프로그램을 1100밀리초 동안 대기시켜서 인터벌 프로그램에 의한 출력을 불 수 있다.
-
더보기delay 메서드는 자바의 sleep 메서드와 비슷하다고 생각할 수 있다. 유일한 차이점은 코루틴 컨텍스트 내에서 delay 메서드를 사용해야 한다는 것이다. 따라서 delay 메서드를 사용하려면 코루틴 컨텍스트를 지정하고 시작해야 하는데 항상 가능한 것은 아니다. runBlocking 메서드를 이런 경우에 활용할 수 있다. runBlocking이 모든 코드 샐행을 완료할 때까지 스레드를 차단하면서 호출 스레드 내부에서 코루틴 컨텍스트를 모킹(흉내)한다.
-
Subject 인스턴스는 옵저버블 인스턴스에 의한 배출에 귀를 귀울이고 있다가 그 배출들을 자신들의 Observer에게 브로드캐스팅하는데 이는 CD/DVD 레코딩을 브로드캐스팅하는 TV 채널과 유사하다.
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS) // 1
val subject = PublishSubject.create<Long>() // 2
observable.subscribe(subject) // 3
subject.subscribe({ println("Received $it") }) // 4
runBlocking { delay(1100) } // 5
}
/*
결과
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
.....
Subscription 1 Received 9
Subscription 1 Received 10
*/
Subject는 콜드 옵저버블과 같이 행동을 반복하지 않는다. Subject는 모든 옵저버에게 전달된 배출을 중계하고, 콜드 옵저버블을 핫 옵저버블로 변경시킨다.
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS) // 1
val subject = PublishSubject.create<Long>() // 2
observable.subscribe(subject) // 3
subject.subscribe({ println("Subscription 1 Received $it") }) // 4
runBlocking { delay(1100) } // 5
subject.subscribe({ println("Subscription 2 Received $it") }) // 6
runBlocking { delay(1100) } // 7
}
/*
결과
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
.....
Subscription 1 Received 9
Subscription 1 Received 10
Subscription 1 Received 11
Subscription 2 Received 11
Subscription 1 Received 12
Subscription 2 Received 12
....
Subscription 1 Received 20
Subscription 2 Received 20
Subscription 1 Received 21
Subscription 2 Received 21
*/
다양한 구독자
AsyncSubject 이해
AsyncSubject는 수신 대기중인 소스 옵저버블의 마지막 값과 배출만 전달한다. 더 명확하게 말하면 AsyncSubject는 마지막 값을 한번만 배출한다.
fun main(args: Array<String>) {
val observable = Observable.just(1, 2, 3, 4) // 1
val subject = AsyncSubject.create<Int>() // 2
observable.subscribe(subject) // 3
subject.subscribe({
//onNext
println("Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("Complete")
})
subject.onComplete() // 5
}
/* 결과
Received 4
Complete
*/
여기서는 onNext를 통해 모든값을 전달했고, 두가지 구독 모두 마지막 값 (5)만 출력했다. 결과에서 알수 있듯이 AsyncSubject는 하나의 값을 사용해 여러 옵저버에 내보내는 작업을 반복한다.
fun main(args: Array<String>) {
val subject = AsyncSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({
//onNext
println("S1 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S1 Complete")
})
subject.onNext(5)
subject.subscribe({
//onNext
println("S2 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S2 Complete")
})
subject.onComplete()
}
/* 결과
S1 Received 5
S1 Complete
S2 Received 5
S2 Complete
*/
PublishSubject 이해
PublishSubject는 onNext 메서드 또는 다른 구독을 통해 값을 받았는지 여부에 관계없이 구독 시점에 이어지는 모든 값을 배출한다. 가장 많이 사용되는 Subject변형이다.
BehaviorSubject 이해
AsyncSubject와 PublishSubject를 결합해서 두 가지의 장점을 모두 취하면 어떨까?
BehaviorSubject는 멀티캐스팅으로 동작하는데 구독 전의 마지막 아이템과 구독 후 모든 아이템을 배출한다. 즉 내부 옵저버 목록을 유지하는데 중복 전달 없이 모든 옵저버에게 동일한 배출을 전달한다.
fun main(args: Array<String>) {
val subject = BehaviorSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({
//onNext
println("S1 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S1 Complete")
})
subject.onNext(5)
subject.subscribe({
//onNext
println("S2 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S2 Complete")
})
subject.onComplete()
}
/* 결과
첫 번째 구독은 4와 5를 받는다.
4는 구독하기에 앞서, 5번은 그 후에 배출됐다.
두 번째 구독 시에는 구독 전에 배출된 5만들 받게 된다.
S1 Received 4
S1 Received 5
S2 Received 5
S1 Complete
S2 Complete
*/
ReplaySubject 이해
ReplaySubject는 갖고 있는 모든 아이템을 옵저버의 구독 시점과 상관없이 다시 전달하는데 콜드 옵저버블과 유사하다.
fun main(args: Array<String>) {
val subject = ReplaySubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({
//onNext
println("S1 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S1 Complete")
})
subject.onNext(5)
subject.subscribe({
//onNext
println("S2 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S2 Complete")
})
subject.onComplete()
}
/* 결과
두가지 구독 모두 모든 배출을 받고 있다.
S1 Received 1
S1 Received 2
...
S1 Received 5
S2 Received 1
S2 Received 2
...
S2 Received 5
S1 Complete
S2 Complete
*/
'책 > 코틀린 리액티브 프로그래밍' 카테고리의 다른 글
6. 연산자 및 오류처리 (0) | 2020.05.28 |
---|---|
5. 비동기 데이터 연산자와 변환 (0) | 2020.05.27 |
4. 백프레셔와 플로어블 소개 (0) | 2020.05.26 |
2. 코틀린과 RxKotlin을 사용한 함수형 프로그래밍 (0) | 2020.05.25 |
1. 리액티브 프로그래밍의 소개 (0) | 2020.05.25 |