본문 바로가기

책/코틀린 리액티브 프로그래밍

4. 백프레셔와 플로어블 소개

옵져버블이 옵저버가 소비할 수 있는 처리량 보다 더 빨리 아이템이 배출된다면 어떨까? 이 문제가 언제 어떻게 발생할 수 있는지 알아보고 해결 방법도 알아본다. 

  • 백프레셔(Backpressure) 이해하기
  • 플로어블(Flowables) 및 가입자
  • Flowable.create()로 플로어블 생성하기
  • 옵저버블과 플로어블 동시에 사용하기
  • 백프레셔 연산자
  • Flowable.generate() 연산자

백프레셔 이해

이 프로그램은 사실 두 옵저버에게 한번만 배출하는 subject인 핫 옵저버블로서의 행동을 멈춘 것은 아니다. 그러나 첫 번째 옵저버에서 각 계산이 오래 걸렸기 때문에 각 배출들은 대기열로 들어가게 된 것이다. 이것은 OutOfMemoryError 예외를 포함해 많은 문제를 일으킬 수 있으므로 분명히 좋은 행동이 아니다.

fun main(args: Array<String>) {
    val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) // 1
    val subject = BehaviorSubject.create<Int>()
    subject.observeOn(Schedulers.computation())  // 2
           .subscribe({                          // 3
               println("Subs 1 Received $it")
               runBlocking { delay(200) }        // 4
           })
           
    subject.observeOn(Schedulers.computation())  // 5
           .subscribe({                          // 6
               println("Subs 2 Received $it")
           })
           
    observable.subscribe(subject)                // 7
    runBlocking {  delay(2000) }                 // 8
}

/*
결과 

Subs 1 Received 1
Subs 2 Received 1
Subs 2 Received 2
Subs 2 Received 3
Subs 2 Received 4
Subs 2 Received 5
Subs 2 Received 6
Subs 2 Received 7
Subs 2 Received 8
Subs 2 Received 9
Subs 1 Received 2
Subs 1 Received 3
Subs 1 Received 4
Subs 1 Received 5
Subs 1 Received 6
Subs 1 Received 7
Subs 1 Received 8
Subs 1 Received 9
*/

 

배출의 츨력에서도 볼 수 있듯이 MyItem의 생성은 매우 빠르며 컨슈머로 알려진 옵저버가 인쇄를 시작하기 전에 완료됬다. 여기서 볼 수 있듯이 문제는 배출이 대기열에 쌓이고 있는데 컨슈머는 이전의 배출량을 처리하고 있다는 것이다.

fun main(args: Array<String>) {
    val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) // 1
           
    observable
           .map { MyItem(it) }                   // 2
           .observeOn(Schedulers.computation())  // 3
           .subscribe({                          // 4
               println("Received $it")
               runBlocking { delay(200) }        // 5
           })
    runBlocking {  delay(2000) }                 // 6
}

data class MyItem(val id: Int) {
    init {
        println("MyItem Created $id")             // 7
    }
}

/*
결과 

MyItem Created 1
MyItem Created 2
MyItem Created 3
MyItem Created 4
MyItem Created 4
MyItem Created 5
MyItem Created 6
MyItem Created 7
MyItem Created 8
MyItem Created 9
Received MyItem(id=1)
Received MyItem(id=2)
Received MyItem(id=3)
Received MyItem(id=4)
Received MyItem(id=5)
Received MyItem(id=6)
Received MyItem(id=7)
Received MyItem(id=8)
Received MyItem(id=9)
*/

컨슈머는 생산자에게 이전 배출의 처리가 완료될 때까지 기다려야한다고 전달할 수 있다. 생산자는 생성 속도를 줄이도록 결정할 수 있다. 이러한 피드백 채널을 백프레셔라고 한다. 

 

옵저버블과 옵저버는 백프레셔를 지원하지 않으므로 그 대신 플로어블 구독자를 사용할 수 있다. 

플로어블

플로어블(Flowables)은 옵저버블의 백프레셔 버전이다.

 

옵저버블로 구현

data class MyItem3(val id: Int) {
    init {
        println("MyItem Created $id;\t")
    }
}

/* 
결과를 보면 옵저버블(생산자)이 옵저버(컨슈머)와 보조를 맞추지 못한다.
이런 현상을 OutOfMemory 오류를 비롯해 많은 문제를 발생시킬 수 있다.
*/
fun main(args: Array<String>) {
    Observable.range(1, 1000) // 1
           .map { MyItem(it) }                   // 2
           .observeOn(Schedulers.computation())  
           .subscribe({                          // 3
               println("Received $it")
               runBlocking { delay(50) }         // 4
           },{ it.printStackTrace() })
    runBlocking {  delay(6000) }                 // 5
}

플로어블로 구현

/* 
플로어블은 모든 아이템을 한번에 배출하지 않고 
컨슈머가 처리를 시작할 수 있을때 까지 기다렸다가 다시 배출을 전달하며,
완료될 때까지 이 동작을 반복한다.
*/
fun main(args: Array<String>) {
    Flowable.range(1, 1000) // 1
           .map { MyItem(it) }                   // 2
           .observeOn(Schedulers.computation())  
           .subscribe({                          // 3
               println("Received $it")
               runBlocking { delay(50) }         // 4
           },{ it.printStackTrace() })
    runBlocking {  delay(6000) }                 // 5
}

플로어블과 옵저버블 사용 구분

플로어블은 옵저버블 보다 느리기 때문에 다음과 같은 상황에 사용한다.

플로어블을 언제 사용할까?

  • 더 많은 양의 데이터를 처리할때 (원천에서 10,000개 이상 아이템 배출 시)
  • 파일이나 데이터베이스 읽기, 파싱시
  • 결과를 반환하는 동안 IO 소스의 양을 조절할 수 있는 블로킹을 지원하는 네트워크 IO 작업/스트리빙 API에서 배출할 때 사용

옵저버블을 언제 사용할까?

  • 소량의 데이터(1,000개 이하)를 다룰 때
  • 동기방식 또는 제한된 동시성 작업시
  • UI 이벤트 발생시킬때 (안드로이드, 자바FX, 스윙으로 작업하는 동안)

플로어블과 구독자(Subscriber)

플로어블은 옵저버 대신 백프레셔 호환이 가능한 구독자를 사용한다. 옵저버 대신 구독자를 사용하는 이유는 일부 추가 기능과 백프레셔를 동시에 지원하기 때문이다.

  • 얼마나 많은 아이템을 받는지 메시지로 전달가능
  • 구독자를 사용하는 동안은 업스트림에서 수신(요청) 하고자 하는 항목의 수를 지정가능
  • 항목의 수를 지정하지 않으면 어떤 배출도 수신하지 못함

이전 프로그램을 구독자(Subscriber) 로 변경한 소스이다.

  1. 주석 3까지 이전 프로그램과 거의 동일하다.
  2. 구독자(Subscriber) 인스턴스는 초기에 원하는 배출량을 함께 호출해야 한다.
  3. 주석 4의 request 메소드는 Subscriber가 호출되고 나서 업스트림에서 대기해야 하는 배출량을 요청한다. Subscriber가 더 요청할 때까지 요청 이후의 더 이상의 모든 배출을 무시한다.
fun main(args: Array<String>) {
    Flowable.range(1, 1000)                              // 1
            .map { MyItem5(it) }                         // 2
            .observeOn(Schedulers.io())
            .subscribe(object : Subscriber<MyItem5> {    // 3
                override fun onSubscribe(subscription: Subscription) {
                    subscription.request(Long.MAX_VALUE) // 4
                }

                override fun onNext(s: MyItem5?) {
                    runBlocking { delay(50) }
                    println("Subscriber received " + s!!)
                }

                override fun onError(e: Throwable) {
                    e.printStackTrace()
                }

                override fun onComplete() {
                    println("Done!")
                }
            })
    runBlocking { delay(60000) }
}

data class MyItem5 (val id:Int) {
    init {
        println("MyItem Created $id")
    }
}

 

request 메서드를 더 알아보기 위해서 프로그램을 더 수정해보자

  1. lateinit 변수를 선언하고 onSubscibe 호출 될때 subscription 을 할당했다.
  2. request 메소드로 5개의 항목을 요청했다. 
  3. 매번 onNext가 호출될 때마다 초기 request로 지정한 5번째 항목인지를 확인 후 request 함수로 2를 더 지정했다.
  4. 결과를 보면 Flowable이 range 의 모든 항목을 배출했음에도 7 이후에는 Subscriber 에게 전달되지 않았다.
fun main(args: Array<String>) {
    Flowable.range(1, 15)
            .map { MyItem6(it) }
            .observeOn(Schedulers.io())
            .subscribe(object : Subscriber<MyItem6> {
                lateinit var subscription: Subscription                // 1
                override fun onSubscribe(subscription: Subscription) {
                    this.subscription = subscription
                    subscription.request(5)                            // 2
                }

                override fun onNext(s: MyItem6?) {
                    runBlocking { delay(50) }
                    println("Subscriber received " + s!!)
                    if(s.id == 5) {                                    // 3
                        println("Requesting two more")
                        subscription.request(2)                        // 4
                    }
                }

                override fun onError(e: Throwable) {
                    e.printStackTrace()
                }

                override fun onComplete() {
                    println("Done!")
                }
            })
    runBlocking { delay(10000) }
}

data class MyItem6 (val id:Int) {
    init {
        println("MyItem Created $id")
    }
}

/*
결과 

MyItem Created 1
MyItem Created 2
MyItem Created 3
MyItem Created 4
MyItem Created 4
MyItem Created 5
MyItem Created 6
MyItem Created 7
MyItem Created 8
MyItem Created 9
MyItem Created 10
MyItem Created 11
MyItem Created 12
MyItem Created 13
MyItem Created 14
MyItem Created 15
Received MyItem6(id=1)
Received MyItem6(id=2)
Received MyItem6(id=3)
Received MyItem6(id=4)
Received MyItem6(id=5)
Requesting two more
Received MyItem6(id=6)
Received MyItem6(id=7)
*/

처음부터 플로어블 생성하기

Observable.create 메서드에 대해서 먼저 정리해보자.

  • 옵저버블에서도 아이템을 배출하는 자체 규칙을 작성할 수 있지만 백프레셔를 지원하지 않는다.
fun main(args: Array<String>) {
    val observer: Observer<Int> = object : Observer<Int> {
        override fun onComplete() {
            println("All Completed")
        }

        override fun onNext(item: Int) {
            println("Next $item")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }

        override fun onSubscribe(d: Disposable) {
            println("New Subscription ")
        }
    }//Create Observer

    val observable: Observable<Int> = Observable.create<Int> {  // 1
        for(i in 1..10) {
            it.onNext(i)
        }
        it.onComplete()
    }

    observable.subscribe(observer)

}

/* 결과
New Subscription
Next 1
Next 2
Next 3
Next 4
Next 5
Next 6
Next 7
Next 8
Next 9
Next 10
All Completed
*/

이번엔 Flowable.create 를 보자.

  1. 구독자(Subscriber)의 인스턴스를 생성했다.
  2. create 메서드로 플로어블을 생성했다.
    • create 메서드로 플로어블 생성시 다음 인자 BackpressureStrategy.BUFFER 를 주었다.
  3. 플로어블에 구독자(Subscriber)를 구독했다.
fun main(args: Array<String>) {
    val subscriber: Subscriber<Int> = object : Subscriber<Int> {
        override fun onComplete() {
            println("All Completed")
        }

        override fun onNext(item: Int) {
            println("Next $item")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }

        override fun onSubscribe(subscription: Subscription) {
            println("New Subscription ")
            subscription.request(10)
        }
    }                                                      // 1

    val flowable: Flowable<Int> = Flowable.create<Int> ({
        for(i in 1..10) {
            it.onNext(i)
        }
        it.onComplete()
    },BackpressureStrategy.BUFFER)                         // 2

    flowable
            .observeOn(Schedulers.io())
            .subscribe(subscriber)                         // 3

    runBlocking { delay(10000) }

}

플로어블 create 의 정의

  • source : 배출의 원천이 되는 곳
  • mode : 백프레셔 전략(BackpressureStrategy)
    • BackpressureStrategy.MISSING : 이 전략은 백프레셔 구현을 사용하지 않으며 다운스트림이 스스로 오버플로우를 처리해야 한다. 이 옵션은 onBackpressureXXX() 연산자를 사용할때 유용하다.
    • BackpressureStrategy.ERROR : 이 전략은 어떤 백프레셔로도 구현하지 않는데 다운스트림이 소스를 따라 잡을 수 없는경우, MissingBackpressureException 예외를 발생시킨다.
    • BackpressureStrategy.BUFFER : 이 잔력안 다운스트림이 배출을 소비할 수 있게 될 때까지 제한이 없는 버퍼에 저장한다. 버퍼 크기를 넘어서는 경우 OutOfMemoryError가 발생할 수 있다.
    • BackpressureStrategy.DROP : 이 전략은 다운스트림이 바쁘고 소비 속도를 계속 유지할 수 없을 때 모든 배출량을 무시한다. 다운스트림이 이전 작업을 끝내고 나서 처음으로 배출된 것을 처리하고 그 사이의 값들은 모두 생략된다. 
      • 소스가 5개의 값 1, 2, 3, 4, 5를 가각 내보내고 있는 상황이다.
      • 다운스트림은 1을 처리하느라 바쁜데 소스는 2, 3, 4를 배출한상태이다.
      • 5가 배출되기 전에 다운스트림이 처리할 준비가 됬다면 5를 제외한 나머지 값은 무시된다.
    • BackpressureStrategy.LATEST : 이 전략은 다운스트림이 바쁘고 배출을 유지할 수 없는 경우 최신 배출량만을 유지하고 나머지는 모두 무시한다. 다운스트림이 이전작업을 마치면 작업이 끝나기 직전에 마지막으로 배출된 것을 수신한다.
      • 소스가 5개의 값 1, 2, 3, 4, 5를 각각 내보내고 있다.
      • 다운스트림이 1을 수신한 후에 처리 중인데, 소스는 2, 3, 4를 배출했고 5가 배출되기 바로 전에 처리할 준비가 됬다.
      • 다운스트림은 4와 5 두개의 값을 받게 된다. 하지만 4를 수신한 후에 바빠진다면 5를 수신할 수 없게 된다.
fun <T> create(source: FlowableOnSubscribe<T>, mode:BackPressureStrategy): Flowable<T> {
...
}

옵저버블로 플로어블 만들기

  • 주석 2  BUFFER  백프레셔 전략을 사용하면 다운 스트림이 소비될때까지 버퍼링 하기 때문에 모든 배출량을 처리 할 수 있다.
  • 주석 2-1 ERROR 백프레셔 전략을 사용하면 다운스트림이 업스트림을 따라갈 수 없기 때문에 오류가 발생한다.
  • 주석 2-2 DROP 백프레셔 전략을 사용하면 플로어블이 128 이후에 출력되지 않는다.
fun main(args: Array<String>) {
    val source = Observable.range(1, 1000)          // 1
    source.toFlowable(BackpressureStrategy.BUFFER)  // 2
    //source.toFlowable(BackpressureStrategy.ERROR)  // 2-1
    //source.toFlowable(BackpressureStrategy.DROP)  // 2-2
            .map { MyItem(it) }
            .observeOn(Schedulers.io())
            .subscribe {                            // 3
                print("Rec. $it;\t")
                runBlocking { delay(600) }
            }
    runBlocking { delay(700000) }
}

data class MyItem (val id:Int) {
    init {
        print("MyItem init $id;\t")
    }
}

BackpressureStrategy.MISSING와 onBackpressureXXX()

BackpressureStrategy.MISSING은 backpressure 전략을 구현 하지 않으므로 플로어블에 어떤 전략을 따를지 명시적으로 알려줄 필요가 있음을 의미한다. onBackpressureXXX() 연산자를 함께 사용하면 동일한 결과를 얻을 수 있으며 몇가지 추가 옵션이 제공된다.

onBackpressureBuffer() 

  • BackpressureStrategy.BUFFER의 용도로 사용된다.
  • 버퍼 크기, 크기 제한 여부와 같은 추가 구성 옵션을 얻을 수 있다.
  • 주석 2-1 에서와 같이 버퍼크기를 설정할 수 있다. (여기서는 20으로 설정, 버퍼오버플로우가 발생하는데 onError 에서 핸들링 가능하다.) 

onBackpressureDrop()

  • 이 연산자는 컨슈머 인스턴스에 전달되는 구성 옵션을 제공하는데, 주석 2-2 에는 해당 구성에 람다를 넘겨줬다.  주석 2-2에는 거부된 배출이 출력된다.
  • 버퍼의 기본 크기가 128이기 때문에 플로어블은 128 이후에 배출을 처리하지 못하고 있다.
  • onBackpressureDrop의 컨슈머 인스턴스는 Subscriber 인스턴스가 시작되기도 전에 처리를 완료했다.

onBackpressureLatest()

  • BackpressureStrategy.LATEST와 똑같은 방식으로 동작하는데, 해당 연산자는 추가 구성 옵션이 없다.
  • 보이는 대로 플로어블은 128 이후에 모든 배출을 무시했지만, 마지막 배출인 1000만 유지 됬다.
fun main(args: Array<String>) {
    val source = Observable.range(1, 1000)
    source.toFlowable(BackpressureStrategy.MISSING)             // 1
            .onBackpressureBuffer()                             // 2
            // .onBackpressureBuffer(20)                        // 2-1
            // .onBackpressureDrop { print("Dropped $it;\t") }  // 2-2
            // .onBackpressureLatest()                          // 2-3
            .map { MyItem11(it) }
            .observeOn(Schedulers.io())
            .subscribe{
                println(it)
                runBlocking { delay(100) }
            }
    runBlocking { delay(600000) }
}

data class MyItem11 (val id:Int) {
    init {
        println("MyItem Created $id")
    }
}

소스에서 백프레셔를 지원하는 플로어블 생성

백프레셔는 다운스트림에서 처리 할 뿐만 아니라 소스에서도 처리가 가능하다.

  1. Flowable.generate() 메서드로 플로어블을 생성했다.
    • Flowable.create() 와 Flowable.generate() 의 차이점
      • create : 플로어블이 항목을 내보내고 구독자(Subscriber)는  수신/대기/버퍼링/삭제
      • generate : 요청 시 아이템을 생성하고 이를 배출함. source로써 람다를 허용하고 create 와 달리 아이템을 요청할 때마다 이를 호출한다.
        • 람다 안에서 onComplete 메서드를 호출하면 플로어블은 한번만 호출
        • 람다 안에서 onNext를 여러번 호출 할 수 없다.
        • 람다 안에서 onError를 호출하면 첫 번째 호출에서 오류가 발생한다.
  2. GenerateFlowableItem 객체를 생성했다. 사용자 정의 게터를 사용해 var item 에 접근할 때마다 자동으로 값이 증가된다. 
  3. 출력에서 플로어블은 첫번 째로 128개의 항목을 배출한 다음, 다운스트림이 96개 아이템을 처리하기 위해 기다린 후 다시 Flowable이 128개의 아이템을 배출하는 주기가 계속 된다. 플로어블에서 가입 해지 하거나 프로그램 실행이 중지될 때까지 항목을 계속 배출한다.
fun main(args: Array<String>) {
    val flowable = Flowable.generate<Int> {
        it.onNext(GenerateFlowableItem.item)
    }                                               // 1

    flowable
            .map { MyItemFlowable(it) }
            .observeOn(Schedulers.io())
            .subscribe {
                runBlocking { delay(100) }
                println("Next $it")
            }                                       // 2

    runBlocking { delay(700000) }
}

data class MyItemFlowable(val id:Int) {
    init {
        println("MyItemFlowable Created $id")
    }
}

object GenerateFlowableItem {
    var item:Int = 0                                 // 3
        get() {
            field+=1
            return field                             // 4
        }
}

ConnectableFlowable

옵저버블의 모든 유형은 플로어블에 대응한다. 즉 ConnectableObservable 이 있으면 ConnectableFlowable이 있다는 말이다. 

 

앞에서 본 ConnectableObservable의 첫번 째 예제를 수정했다. 

Observable과 마찬가지로 Flowable.fromIterable() 대신 Iterable<T>.toFlowable() 확장함수를 사용할 있다.

  1. Iterable<T>.toFlowable() 확장함수를 사용해 List 에서 플로어블로 만든다.
  2. Flowable.publish() 연산자를 사용해 플로어블에서 ConnectableFlowable을 생성했다.
  3. Iterable<T>.toFlowable() 은 내부적으로 Flowable.fromIterable을 호출한다.
fun main(args: Array<String>) {
    val connectableFlowable = listOf("String 1","String 2","String 3","String 4","String 5")
            .toFlowable()                        // 1
            .publish()                           // 2
    connectableFlowable.
            subscribe({
                println("Subscription 1: $it")
                runBlocking { delay(1000) }
                println("Subscription 1 delay")
            })
    connectableFlowable
            .subscribe({ println("Subscription 2 $it")})
    connectableFlowable.connect()
}

 프로세서

프로세서(Processor)는 플로어블의 Subjects에 해당한다. Subjects 의 모든 유형은 프로세서에 대응한다.

다음은 PublishSubject 의 대응인 PublishProcessor 의 예제이다.

  • 프로세서는 구독자가 모두 완료될 때까지 다음 푸시를 대기하고 있다.
fun main(args: Array<String>) {
    val flowable = listOf("String 1","String 2","String 3","String 4","String 5").toFlowable() // 1

    val processor = PublishProcessor.create<String>()  // 2

    processor.                                         // 3
            subscribe({
                println("Subscription 1: $it")
                runBlocking { delay(1000) }
                println("Subscription 1 delay")
            })
    processor                                           // 4
            .subscribe({ println("Subscription 2 $it")})

    flowable.subscribe(processor)                       // 5

}

버퍼, 스로틀링, 윈도우 연산자

 지금까지 백프레셔를 알아봤다. 원천의 속도를 늦추거나 아이템을 생략하거나, 버퍼를 사용했다. 이는 모두 컨슈머가 소비할 때까지 아이템의 배출을 미룬다. 그러나 이것으로 충분한가? 다운스트림에서 백프레셔를 처리하는 것은 항상 좋은 해결책은 아니지만 그렇다고 항상 소스의 배출을 느리게 만들 수도 없다.

 

 Observable.interval / Flowable.interval 을 사용하면 소스의 배출 속도를 느리게 할 수 없다. 정지 간격은 아이템을 동시에 처리할 수 있게 해주는 일부 연산자가 될 수 있다.

 

도움이 되는 세가지 연산자가 있다.

  • buffer
  • throttle
  • window

buffer

컨슈머가 소비할 때까지 배출을 버퍼링하는 onBackPressureBuffer() 연산자와는 달리

buffer() 연산자는 배출을 모아서 리스트나 다른 컬렉션 유형으로 전달한다.

 

 

fun main(args: Array<String>) {
    val flowable = Flowable.range(1,111)  // 1
    flowable.buffer(10)
            .subscribe { println(it) }
}

/* 결과
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
[101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
[111]
*/

다음은 skip 의 설정 옵션이다. 

  1. 첫 번째 구독에 count 10, skip 15 버퍼를 사용했다.
  2. 두번째 구독에 count 15, skip 8 버퍼를 사용했다. 
  3. 첫번째 구독의 경우 구독당 5개 항목을 건너뛴다.(15-10)
  4. 두번째 구독의 경우 각 배출에서 8번째 항목 (15-7)부터 반복을 시작한다.
fun main(args: Array<String>) {
    val flowable = Flowable.range(1,111)
    flowable.buffer(10,15)    // 1
            .subscribe { println("Subscription 1 $it") }

    flowable.buffer(15,7)     // 2
            .subscribe { println("Subscription 2 $it") }
}
/* 결과
Subscription 1 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Subscription 1 [16, 17, 18, 19, 20, 21, 22, 23, 24, 25]
Subscription 1 [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
....
Subscription 1 [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
Subscription 1 [106, 107, 108, 109, 110, 111]
Subscription 2 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
Subscription 2 [8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
Subscription 2 [15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
....
Subscription 2 [99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111]
Subscription 2 [106, 107, 108, 109, 110, 111]
*/

 

  1. Flowable.interval 을 사용해 플로어블 인스턴스를 생성했다.
  2. 오버로드된 buffer(timestamp: Long, unit: TimeUnit)를 사용해 연산자가 모든 배출을 잠시동안 버퍼링 하고 목록으로 배출하도록 했다.
  3. 결과에서 각 배출에는 10개의 아이템이 포함되 있는데, interval이 100밀리초마다 하나씩 배출하고 버퍼가 두번째 시간 프레임 내에서 배출을 수집하기 때문이다.
fun main(args: Array<String>) {
    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS) // 1
    flowable.buffer(1,TimeUnit.SECONDS)                          // 2
            .subscribe { println(it) }

    runBlocking { delay(5, TimeUnit.SECONDS) }                   // 3
}

/* 결과
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[11, 12, 13, 14, 15, 16, 17, 18, 19]
[21, 22, 23, 24, 25, 26, 27, 28, 29]
[31, 32, 33, 34, 35, 36, 37, 38, 39]
[41, 42, 43, 44, 45, 46, 47, 48, 49]
*/

이번에는 다른 생산자를 경계로 취하는 예제이다.

  • 버퍼 연산자는 인접해 있는 생산자의 사이에서 모든 배출물을 모으고 각 생산자의 리스트로 배출한다.
fun main(args: Array<String>) {
    val boundaryFlowable = Flowable.interval(350, TimeUnit.MILLISECONDS)

    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)      // 1
    flowable.buffer(boundaryFlowable)                                 // 2
            .subscribe { println(it) }

    runBlocking { delay(5, TimeUnit.SECONDS) }                        // 3

}

/* 결과
[0, 1, 2]
[3, 4, 5]
[6, 7, 8, 9]
[10, 11, 12, 13]
[14, 15, 16
[17, 18, 19]
[20, 21, 22, 23]
[24, 25, 26, 27]
[28, 29, 30]
[31, 32, 33, 34]
[35, 36, 37]
[38, 39, 40, 41]
[42, 43, 44]
[45, 46, 47, 48]
*/

window

컬렉션 형태가 아닌 프로듀서 형태로 버퍼링을 한다.

  • 새로운 플로어블 인스턴스로 10개의 배출을 버퍼링 한다. 
  • 이 인스턴스는 flowable.subscribe람다에서 다시 구독 하고 쉼표를 접미사로 덧붙여 인쇄한다.
fun main(args: Array<String>) {
    val flowable = Flowable.range(1,111)//(1)
    flowable.window(10)
            .subscribe {
                flo->flo.subscribe {
                    print("$it, ")
                }
                println()
            }
}

/* 결과
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30,
31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60,
61, 62, 63, 64, 65, 66, 67, 68, 69, 70,
71, 72, 73, 74, 75, 76, 77, 78, 79, 80,
81, 82, 83, 84, 85, 86, 87, 88, 89, 90,
91, 92, 93, 94, 95, 96, 97, 98, 99, 100,
101, 102, 103, 104, 105, 106, 107, 108, 109, 110,
111,
*/

throttle

위 두 연산자는 배출을 수집하는 반면 throttle 연산자는 배출을 생략한다. 

  • throttleFirst는 200밀리초마다 발생하는 첫 번째 배출을 생략한다.
  • throttleLast, throttleWithTimeout 연산자도 있다.
fun main(args: Array<String>) {
    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)//(1)
    flowable.throttleFirst(200,TimeUnit.MILLISECONDS)//(2)
            .subscribe { println(it) }

    runBlocking { delay(1,TimeUnit.SECONDS) }
}

/* 결과
0
3
5
7
*/