자원관리
프로그램을 개발할때 자원은 사용하고 반드시 종료시켜줘야 한다.
RxKotlin에서는 일회성 자원은 손십게 사용하기 위해 using 연산자를 사용한다.
using 연산자의 정의는 다음과 같다.
- 리소스를 생성하고 반환하는 Callable 인스턴스를 입력받는다.
- 리소스 인스턴스를 만들기 앞서 첫번째 람다를 호출한다. 그런다음 구독할 수 있는 Observable을 반환하기 위해 리소스 인스턴스를 두번째 람다로 전달한다.
- 마지막으로 Observable이 onComplete 이벤트를 호출하면 세번째 람다를 호출해 리소스를 종료 시킨다.
fun <T, D> using(
resourceSupplier: Callable<out D>
, sourceSupplier: Function<in D, out ObservableSource<out T>>
, disposer: Consumer<in D>): Observable<T> {
return usin(resourceSupplier, sourceSupplier, disposer, true)
}
자원을 열고 닫는 것을 확인하기 위해 목업 리소스 클래스를 정의한다.
class Resource():Closeable {
init {
println("Resource Created")
}
val data:String = "Hello World"
override fun close() {
println("Resource Closed")
}
}
using 연산자를 사용하는 예제이다.
- 주석 1에서 첫 번째 람다는 리소스 인스턴스를 생성하고 반환했다.
- 주석 2에서 두 번째 람다는 리소스를 매개변수로 취하고 Observable 을 생성해 반환했다.
- 주석 3에서 세 번째 람다는 리소스를 매개변수로 취하고 그것을 종료했다.
fun main(args: Array<String>) {
Observable.using({ // 1
Resource()
},{ // 2
resource:Resource->
Observable.just(resource)
},{ // 3
resource:Resource->
resource.close()
}).subscribe {
println("Resource Data ${it.data}")
}
}
/* 결과
Resource Created
Resource Data Hello World
Resource Closed
*/
자신만의 연산자 작성하기
lift 연산자는 자시만의 연산자를 만들기 위한 연산자이다. 자신만의 연산자를 구현하려면 lift 연산자는 ObservableOperator 인터페이스를 매개변수로 받는데 해당 인터페이스를 구현해서 적용하면 된다.
- Downstream : 연산자의 다운스트림에 전달될 유형을 지정
- Upstream : 연산자가 업스트림으로부터 수신할 유형을 지정
interface ObservableOperator<Downstream, Upstream> {
/**
* 자식 옵저버에게 함수를 적용해 새로운 옵저버를 반환한다.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
@NonNull
@Throws(Exception:class)
fun apply(@NonNull observer: Observer<in Downstream>): Observer<in Upstream>;
}
ObservableOperator 인터페이스를 구현한 예제이다.
- 연산자는 첫 번째 요소로 일련번호가 추가된 페어를 배출해야한다. 페어의 두번째 요소는 실제 배출값이다.
- 연산자는 포괄적이어야 하고 모든 유형의 옵저버블과 작동해야 한다.
- 다른 연산자와 마찬가지로 다른 연산자와 동시에 동작해야 한다.
class AddSerialNumber<T> : ObservableOperator<Pair<Int,T>,T> {
val counter:AtomicInteger = AtomicInteger()
override fun apply(observer: Observer<in Pair<Int, T>>): Observer<in T> {
return object : Observer<T> {
override fun onComplete() {
observer.onComplete()
}
override fun onSubscribe(d: Disposable) {
observer.onSubscribe(d)
}
override fun onError(e: Throwable) {
observer.onError(e)
}
override fun onNext(t: T) {
observer.onNext(Pair(counter.incrementAndGet(),t))
}
}
}
}
구현된 AddSerialNumber 를 lift 연산자로 적용한 예제이다.
fun main(args: Array<String>) {
Observable.range(10,20)
.lift(AddSerialNumber<Int>())
.subscribeBy (
onNext = {
println("Next $it")
},
onError = {
it.printStackTrace()
},
onComplete = {
println("Completed")
}
)
}
/* 결과
Next (1, 10)
Next (2, 11)
...
Next (19, 28)
Next (20, 29)
Complete
*/
ObservableOperator 인터페이스는 하나의 메서드만 있으므로 람다로도 바꿀 수 있다.
fun main(args: Array<String>) {
listOf("Reactive","Programming","in","Kotlin","by Rivu Chakraborty","Packt")
.toObservable()
.lift<Pair<Int,String>> {
observer ->
val counter = AtomicInteger()
object :Observer<String> {
override fun onSubscribe(d: Disposable) {
observer.onSubscribe(d)
}
override fun onNext(t: String) {
observer.onNext(Pair(counter.incrementAndGet(), t))
}
override fun onComplete() {
observer.onComplete()
}
override fun onError(e: Throwable) {
observer.onError(e)
}
}
}
.subscribeBy (
onNext = {
println("Next $it")
},
onError = {
it.printStackTrace()
},
onComplete = {
println("Completed")
}
)
}
/* 결과
Next (1, Reactive)
Next (2, Programming)
Next (3, in)
Next (4, Kotlin)
Next (5, by Rivu Charraborty)
Next (6, Packt)
Complete
변환자로 연산자 합성
subscribeOn 연산자와 observeOn 연산자를 체인으로 연결한 예제이다.
fun main(args: Array<String>) {
Observable.range(1,10)
.map {
println("map - ${Thread.currentThread().name} $it")
it
}
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe {
println("onNext - ${Thread.currentThread().name} $it")
}
runBlocking { delay(100) }
}
/* 결과
map - RxComputationThreadPool-1 1
map - RxComputationThreadPool-1 2
...
map - RxComputationThreadPool-1 9
map - RxComputationThreadPool-1 10
onNext - RxCachedThreadScheduler-1 1
onNext - RxCachedThreadScheduler-1 2
...
onNext - RxCachedThreadScheduler-1 9
onNext - RxCachedThreadScheduler-1 10
*/
위의 연산자 조합을 따로 분리하여 구현(재사용하기 위해)할 수 있는데, 그 구현을 사용할 수 있게 해주는 연산자가 compose 연산자이다.
compose 연산자는 ObservableTransformer 인터페이스를 매개변수로 받는데 해당 인터페이스를 구현하면 전용 연산자조합이 된다.
interface ObservableTrasformer<Upstream, Downstream> {
/**
* 업스트림 옵저버블에 함수를 적용하고 선택적인 엘리먼트 타입의 ObservableSource를 반환한다.
* @param upstream the upstream Observable instance
* @return the transformed ObservableSource instance
*/
@NonNull
fun apply(@NonNull upstream: Observable<Upstream>) : ObservableSource<Downstream>
}
여기서는 ObservableTransformer 구현체로서 SchedulerManager를 구현하고 compose 연산자에 적용하였다.
fun main(args: Array<String>) {
Observable.range(1,10)
.map {
println("map - ${Thread.currentThread().name} $it")
it
}
.compose(SchedulerManager(Schedulers.computation(),
Schedulers.io()))
.subscribe {
println("onNext - ${Thread.currentThread().name} $it")
}
runBlocking { delay(100) }
}
class SchedulerManager<T>(val subscribeScheduler:Scheduler
, val observeScheduler:Scheduler):ObservableTransformer<T,T> {
override fun apply(upstream: Observable<T>): ObservableSource<T> {
return upstream.subscribeOn(subscribeScheduler)
.observeOn(observeScheduler)
}
}
ObservableTransformer 인터페이스를 람다로도 구현할 수 있다.
fun main(args: Array<String>) {
Observable.range(1,10)
.compose<List<Int>> {
upstream: Observable<Int> ->
upstream.toList().toObservable()
}
.first(listOf())
.subscribeBy {
println(it)
}
}
/* 결과
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
*/
'책 > 코틀린 리액티브 프로그래밍' 카테고리의 다른 글
10. 스프링 JPA와 하이버네이트를 사용한 REST API (0) | 2020.05.31 |
---|---|
10. 코틀린 개발자를 위한 스프링 웹 프로그래밍 소개 (0) | 2020.05.31 |
8. RxKotlin 애플리케이션 테스트 (0) | 2020.05.30 |
7. RxKotlin의 스케줄러를 사용한 동시성과 병렬 처리 (0) | 2020.05.29 |
6. 연산자 및 오류처리 (0) | 2020.05.28 |