RxJava Operators Complete Guide
RxJava의 다양한 연산자들을 상세히 알아봅니다.
Transforming Operators
map
데이터를 변환합니다.
Observable.range(1, 5)
.map { it * 2 }
.subscribe { println(it) } // 2, 4, 6, 8, 10
cast
타입을 캐스팅합니다.
list.toObservable()
.cast(MyItem::class.java)
.subscribe { println(it) }
flatMap
Observable을 반환하며, 순서를 보장하지 않습니다 (merge 사용).
val observable = listOf(10, 9, 8, 7, 6).toObservable()
observable.flatMap { number ->
Observable.just("Transforming Int to String $number")
}.subscribe { item ->
println("Received $item")
}
concatMap
flatMap과 같지만 순서를 보장합니다.
Observable.range(1, 10)
.concatMap {
val randDelay = Random().nextInt(10)
Observable.just(it)
.delay(randDelay.toLong(), TimeUnit.MILLISECONDS)
}
.blockingSubscribe { println("Received $it") }
switchMap
새로운 emit이 발생하면 이전 작업을 취소합니다.
Observable.range(1, 10)
.switchMap {
val randDelay = Random().nextInt(10)
if (it % 3 == 0)
Observable.just(it)
else
Observable.just(it).delay(randDelay.toLong(), TimeUnit.MILLISECONDS)
}
.blockingSubscribe { println("Received $it") }
defaultIfEmpty
비어있으면 기본값을 반환합니다.
Observable.range(0, 10)
.filter { it > 15 }
.defaultIfEmpty(15)
.subscribe { println("Received $it") } // 15
switchIfEmpty
비어있으면 다른 Observable로 전환합니다.
Observable.range(0, 10)
.filter { it > 15 }
.switchIfEmpty(Observable.range(11, 10))
.subscribe { println("Received $it") }
sorted
정렬합니다 (x - y는 오름차순).
listOf(2, 6, 7, 1, 3, 4, 5)
.toObservable()
.sorted()
.subscribe { println("Received $it") }
// Custom 정렬
listOf(2, 6, 7, 1, 3, 4, 5)
.toObservable()
.sorted { item1, item2 -> if (item1 > item2) -1 else 1 } // 내림차순
.subscribe { println("Received $it") }
scan
이전 값과 현재 값을 사용해 누적 계산합니다.
Observable.range(1, 10)
.scan { previousAccumulation, newEmission ->
previousAccumulation + newEmission
}
.subscribe { println("Received $it") }
// 1, 3, 6, 10, 15, 21, 28, 36, 45, 55
Filter Operators
debounce
딜레이 동안 새로운 emit이 없을 때만 처리합니다.
createObservable()
.debounce(200, TimeUnit.MILLISECONDS)
.subscribe { println(it) }
distinct
중복을 필터링합니다.
listOf(1, 2, 2, 3, 4, 5, 5, 5, 6, 7, 8, 9, 3, 10)
.toObservable()
.distinct()
.subscribe { println("Received $it") }
distinctUntilChanged
연속된 중복만 필터링합니다.
listOf(1, 2, 2, 3, 4, 5, 5, 5, 6, 7, 8, 9, 3, 10)
.toObservable()
.distinctUntilChanged()
.subscribe { println("Received $it") }
// 1, 2, 3, 4, 5, 6, 7, 8, 9, 3, 10 (마지막 3은 출력됨)
elementAt
n번째 요소를 가져옵니다.
val observable = listOf(10, 1, 2, 5, 8, 6, 9).toObservable()
observable.elementAt(5).subscribe { println("Received $it") } // 6
filter
조건에 맞는 것만 통과시킵니다.
Observable.range(1, 20)
.filter { it % 2 == 0 }
.subscribe { println("Received $it") }
first / last
첫 번째/마지막 요소를 가져옵니다.
val observable = Observable.range(1, 10)
observable.first(2).subscribeBy { println("Received $it") } // 1
observable.last(2).subscribeBy { println("Received $it") } // 10
// 비어있을 때 기본값 사용
Observable.empty<Int>().first(2).subscribeBy { println("Received $it") } // 2
ignoreElements
모든 요소를 무시하고 Completable을 반환합니다.
val observable = Observable.range(1, 10)
observable.ignoreElements()
.subscribe { println("Completed") }
take / takeLast
처음/마지막 n개를 가져옵니다.
Observable.range(1, 10)
.take(3)
.subscribe { println(it) } // 1, 2, 3
skip / skipLast
처음/마지막 n개를 건너뜁니다.
Observable.range(1, 20)
.skip(5)
.subscribe { println(it) } // 6~20
// 시간 기반
Observable.interval(100, TimeUnit.MILLISECONDS)
.skip(400, TimeUnit.MILLISECONDS)
.subscribe { println(it) }
skipWhile
조건이 true인 동안 건너뜁니다 (첫 false 이후는 모두 통과).
Observable.range(1, 20)
.skipWhile { it < 10 }
.subscribe { println(it) } // 10~20
skipUntil
다른 Observable이 emit할 때까지 건너뜁니다.
val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
val observable2 = Observable.timer(500, TimeUnit.MILLISECONDS)
observable1.skipUntil(observable2)
.subscribe { println(it) } // 500ms 이후부터 출력
Combining Operators
startWith
맨 앞에 요소를 추가합니다.
Observable.range(0, 10)
.startWith(-1)
.subscribe { println("Received $it") }
// 리스트나 Observable도 가능
Observable.range(5, 10)
.startWith(listOf(1, 2, 3, 4))
.subscribe { println("Received $it") }
merge
순서 없이 합칩니다 (emit되는 대로).
val observable1 = listOf("Kotlin", "Scala", "Groovy").toObservable()
val observable2 = listOf("Python", "Java", "C++").toObservable()
Observable.merge(observable1, observable2)
.subscribe { println("Received $it") }
// mergeArray, mergeWith도 사용 가능
concat
순서를 유지하며 합칩니다.
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
.take(2)
.map { "Observable 1 $it" }
val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { "Observable 2 $it" }
Observable.concat(observable1, observable2)
.subscribe { println("Received $it") }
amb (Ambiguous)
먼저 emit하는 Observable만 사용합니다.
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
.map { "Observable 1 $it" }
val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { "Observable 2 $it" }
Observable.amb(listOf(observable1, observable2))
.subscribe { println("Received $it") } // Observable 2만 출력
zip
같은 인덱스의 요소들을 결합합니다 (최대 9개).
val observable1 = Observable.range(1, 10)
val observable2 = Observable.range(11, 10)
Observable.zip(observable1, observable2,
BiFunction<Int, Int, Int> { e1, e2 -> e1 + e2 }
).subscribe { println("Received $it") }
zipWith
val observable1 = Observable.range(1, 10)
val observable2 = listOf("String 1", "String 2", ...).toObservable()
observable1.zipWith(observable2) { e1: Int, e2: String -> "$e2 $e1" }
.subscribe { println("Received $it") }
combineLatest
한쪽이 emit하면 다른 쪽의 최신 값과 결합합니다.
val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
val observable2 = Observable.interval(250, TimeUnit.MILLISECONDS)
Observable.combineLatest(observable1, observable2,
BiFunction { t1: Long, t2: Long -> "t1: $t1, t2: $t2" }
).subscribe { println("Received $it") }
Reducing Operators
count
개수를 반환합니다.
listOf(1, 5, 9, 7, 6, 4, 3, 2).toObservable()
.count()
.subscribeBy { println("count $it") }
reduce
누적 계산 후 최종 값만 반환합니다.
Observable.range(1, 10)
.reduce { prev, curr -> prev + curr }
.subscribeBy { println("accumulation $it") } // 55
Buffer and Window Operators
buffer
요소들을 그룹으로 묶습니다.
val flowable = Flowable.range(1, 111)
flowable.buffer(10)
.subscribe { println(it) }
// [1..10], [11..20], ...
시간 기반:
Flowable.interval(100, TimeUnit.MILLISECONDS)
.buffer(1, TimeUnit.SECONDS)
.subscribe { println(it) }
window
buffer와 같지만 Observable을 반환합니다.
Flowable.range(1, 111)
.window(10)
.subscribe { flo ->
flo.subscribe { print("$it, ") }
println()
}
Grouping Operators
groupBy
키를 기준으로 그룹화합니다.
Observable.range(1, 30)
.groupBy { it % 5 }
.blockingSubscribe {
println("Key ${it.key}")
it.subscribe { println("Received $it") }
}
Throttle Operators
throttleFirst / throttleLast
일정 시간 동안 첫/마지막 emit만 통과시킵니다.
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleFirst(200, TimeUnit.MILLISECONDS)
.subscribe { println(it) }
Comments