RxJava의 Observer, Subject, Disposable, Subscriber에 대해 알아봅니다.

Observer

Lambda로 Subscribe

observable.subscribeBy(
    onNext = { i -> println("Next: $i") },
    onComplete = { println("Complete") },
    onError = { throwable -> println("Error: $throwable") }
)

Observer 객체

val observer = object : Observer<Any> {
    override fun onComplete() {
        println("All Completed")
    }

    override fun onNext(item: Any) {
        println("Next $item")
    }

    override fun onError(e: Throwable) {
        println("Error Occurred $e")
    }

    override fun onSubscribe(d: Disposable) {
        println("Subscribed to $d")
    }
}

observable.subscribe(observer)

Maybe Observer

세 가지 중 하나만 호출됩니다:

  • onComplete: 에러 없이 값도 없음
  • onError: 에러 발생
  • onSuccess: 값이 있음
val maybeEmpty = Maybe.empty<Int>()
maybeEmpty.subscribeBy(
    onComplete = { println("Completed Empty") },
    onError = { println("Error $it") },
    onSuccess = { println("Completed with value $it") }
)

Disposable

구독을 관리하고 해제하는 객체입니다.

interface Disposable {
    fun dispose()
    val isDisposed: Boolean
}

사용 예시

val disposable = observable.subscribe { println(it) }

// 구독 해제
disposable.dispose()

CompositeDisposable

여러 구독을 한 번에 관리:

val compositeDisposable = CompositeDisposable()

compositeDisposable.add(observable1.subscribe { ... })
compositeDisposable.add(observable2.subscribe { ... })

// 모든 구독 해제
compositeDisposable.dispose()

Subscription

Observable과 Subscriber를 연결합니다.

// subscribe 호출 시 subscription/disposable 반환
val subscription = observable.subscribe()

// 중단 시
subscription.unsubscribe()  // RxJava 1.x
// 또는
disposable.dispose()  // RxJava 2.x

// 여러 구독 관리
compositeSubscription.unsubscribe()

Subscriber (Flowable용)

Flowable과 함께 사용하며 backpressure를 제어합니다.

Flowable.range(1, 15)
    .map { MyItem(it) }
    .observeOn(Schedulers.io())
    .subscribe(object : Subscriber<MyItem> {
        lateinit var subscription: Subscription

        override fun onSubscribe(subscription: Subscription) {
            this.subscription = subscription
            subscription.request(5)  // 5개만 요청
        }

        override fun onNext(s: MyItem?) {
            runBlocking { delay(50) }
            println("Subscriber received $s")
            if (s?.id == 5) {
                println("Requesting two more")
                subscription.request(2)  // 추가 2개 요청
            }
        }

        override fun onError(e: Throwable) {
            e.printStackTrace()
        }

        override fun onComplete() {
            println("Done!")
        }
    })

모든 데이터를 받으려면:

subscription.request(Long.MAX_VALUE)

Subject

Subject는 Observable과 Observer 역할을 동시에 수행합니다.

  • Cold Observable을 subscribe하고 Hot Observable로 emit
  • 여러 Observer에게 동시에 데이터 전달

AsyncSubject

마지막 값만 emit하고 complete합니다.

val subject = AsyncSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)

subject.subscribe({
    println("Received $it")  // 4만 출력
}, {
    it.printStackTrace()
}, {
    println("Complete")
})

subject.onComplete()  // 반드시 호출해야 값 전달

Use Case: AsyncTask와 유사하게 백그라운드 작업 후 완료 값 전달

BehaviorSubject

가장 최근 값을 새 구독자에게 전달합니다.

val subject = BehaviorSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)

subject.subscribe { println("Observer 1: $it") }  // 2 출력

subject.onNext(3)  // 둘 다 3 출력

subject.subscribe { println("Observer 2: $it") }  // 3 출력

Use Case: 위치 정보 - 새 구독자가 즉시 현재 위치를 받아야 할 때

PublishSubject

구독 시점부터의 값만 전달합니다.

val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()

observable.subscribe(subject)

subject.subscribe { println("Subscription 1: $it") }

runBlocking { delay(1100) }

// 이 구독자는 11부터 시작 (1100ms 후)
subject.subscribe { println("Subscription 2: $it") }

runBlocking { delay(1100) }

Use Case: Listener - 이전 이벤트는 필요 없고 현재부터의 이벤트만 필요할 때

ReplaySubject

모든 값을 버퍼링하여 새 구독자에게 전체 히스토리를 전달합니다.

val subject = ReplaySubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

subject.subscribe { println("Observer 1: $it") }  // 1, 2, 3 모두 출력

subject.onNext(4)

subject.subscribe { println("Observer 2: $it") }  // 1, 2, 3, 4 모두 출력

Processor

Flowable용 Subject입니다.

PublishProcessor

val flowable = listOf("String 1", "String 2", "String 3").toFlowable()
val processor = PublishProcessor.create<String>()

processor.subscribe({
    println("Subscription 1: $it")
    runBlocking { delay(1000) }
    println("Subscription 1 delay")
})

processor.subscribe { println("Subscription 2: $it") }

flowable.subscribe(processor)