RxJava Concurrency and Flowable - Threading and Backpressure
RxJava의 스레딩과 Flowable을 사용한 Backpressure 처리를 알아봅니다.
Schedulers
기본 Schedulers
// Unbounded worker thread pool, 재사용
Schedulers.io()
// CPU 코어 수만큼 제한된 스레드
Schedulers.computation()
// 항상 새 스레드 생성
Schedulers.newThread()
// 단일 스레드
Schedulers.single()
// 호출 스레드 (기본값)
Schedulers.trampoline()
// Custom Executor
val executor = Executors.newFixedThreadPool(2)
val scheduler = Schedulers.from(executor)
subscribeOn vs observeOn
Observable.range(1, 10)
.subscribeOn(Schedulers.computation()) // emit 스레드
.observeOn(Schedulers.io()) // 이후 연산 스레드
.subscribe { println(it) }
- subscribeOn: 전체 subscription(emit 포함)에 사용할 스레드
- observeOn: observeOn 이후의 연산에 사용할 스레드
Android Main Thread
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { updateUI(it) }
Custom Scheduler 예시
val executor = Executors.newFixedThreadPool(2)
val scheduler = Schedulers.from(executor)
Observable.range(1, 10)
.subscribeOn(scheduler)
.subscribe {
runBlocking { delay(200) }
println("Observable1 Item Received $it - ${Thread.currentThread().name}")
}
Observable.range(21, 10)
.subscribeOn(scheduler)
.subscribe {
runBlocking { delay(100) }
println("Observable2 Item Received $it - ${Thread.currentThread().name}")
}
Flowable
Observable의 대안으로, OutOfMemory를 방지합니다.
특징
- 128개의 요소를 emit하고 consumer가 처리할 때까지 대기
- Observable보다 느리지만 메모리 안전
- 사용 시점: 10,000개 이상의 아이템 & 비동기 처리
Backpressure Strategy
버퍼가 가득 찼을 때의 처리 전략:
// BUFFER (기본): 버퍼에 쌓고 기다림
BackpressureStrategy.BUFFER
// ERROR: MissingBackpressureException 발생
BackpressureStrategy.ERROR
// DROP: 버퍼 초과분 버림
BackpressureStrategy.DROP
// LATEST: DROP과 같지만 마지막 값은 보존
BackpressureStrategy.LATEST
// MISSING: 커스터마이징
BackpressureStrategy.MISSING
Flowable 생성
range
Flowable.range(1, 1000)
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe({
println("Received $it")
runBlocking { delay(50) }
}, { it.printStackTrace() })
create
val flowable = Flowable.create<Int>({
for (i in 1..10) {
it.onNext(i)
}
it.onComplete()
}, BackpressureStrategy.BUFFER)
flowable.observeOn(Schedulers.io())
.subscribe(subscriber)
Observable to Flowable
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.BUFFER)
.subscribe { println(it) }
Backpressure 커스터마이징
onBackpressureBuffer
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureBuffer() // BUFFER와 동일
.subscribe { println(it) }
// 용량 제한
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureBuffer(20) // 20개 초과시 ERROR
.subscribe { println(it) }
onBackpressureDrop
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureDrop { println("Dropped $it") }
.subscribe { println("Received $it") }
onBackpressureLatest
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureLatest()
.subscribe { println("Received $it") }
generate
람다를 반복 실행하며 버퍼 backpressure 적용:
object GenerateFlowableItem {
var item: Int = 0
get() {
field += 1
return field
}
}
val flowable = Flowable.generate<Int> {
it.onNext(GenerateFlowableItem.item)
}
flowable.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
runBlocking { delay(100) }
println("Next $it")
}
ConnectableFlowable (Hot Flowable)
val connectableFlowable = listOf("String 1", "String 2", "String 3")
.toFlowable()
.publish()
connectableFlowable.subscribe({
println("Subscription 1: $it")
runBlocking { delay(1000) }
println("Subscription 1 delay")
})
connectableFlowable.subscribe { println("Subscription 2: $it") }
connectableFlowable.connect()
Error Handling
기본 에러 처리
Observable.just(1, 2, 3, 5, 6, 7, "Error", 8, 9, 10)
.map { it.toIntOrError() }
.subscribeBy(
onNext = { println("Next $it") },
onError = { println("Error $it") }
)
onErrorReturn
에러 발생 시 대체 값 반환 후 complete:
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.onErrorReturn { -1 } // 에러 시 -1 반환
.subscribe { println("Received $it") }
onErrorResumeNext
에러 발생 시 다른 Observable로 전환:
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.onErrorResumeNext(Observable.range(10, 5))
.subscribe { println("Received $it") }
retry
에러 발생 시 처음부터 재시도:
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.retry(3) // 3번 재시도
.subscribeBy(
onNext = { println("Received $it") },
onError = { println("Error") }
)
// Predicate 사용
var retryCount = 0
Observable.just(1, 2, 3, 4, 5)
.map { it / (3 - it) }
.retry { _, _ -> (++retryCount) < 3 }
.subscribeBy(
onNext = { println("Received $it") },
onError = { println("Error") }
)
Resource Management
리소스 생성, 사용, 해제를 관리합니다:
Observable.using({
// 리소스 생성 (예: Cursor)
Resource()
}, { resource: Resource ->
// 데이터 가져오기
Observable.just(resource)
}, { resource: Resource ->
// 리소스 해제
resource.close()
}).subscribe {
println("Resource Data ${it.data}")
}
Warning
Main Activity에서 create할 경우, Observable이 반환되기 전까지 Main Activity가 반환되지 않아 메모리 릭이 발생할 수 있습니다. subscribe() 호출 시 다른 스레드에서 emit 작업 중에는 반환되지 않는 문제가 있습니다.
Comments