/* * Copyright 2019 New Vector Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package im.vector.matrix.rx import androidx.lifecycle.LiveData import androidx.lifecycle.Observer import io.reactivex.Observable import io.reactivex.android.MainThreadDisposable private class LiveDataObservable( private val liveData: LiveData, private val valueIfNull: T? = null ) : Observable() { override fun subscribeActual(observer: io.reactivex.Observer) { val relay = RemoveObserverInMainThread(observer) observer.onSubscribe(relay) liveData.observeForever(relay) } private inner class RemoveObserverInMainThread(private val observer: io.reactivex.Observer) : MainThreadDisposable(), Observer { override fun onChanged(t: T?) { if (!isDisposed) { if (t == null) { if (valueIfNull != null) { observer.onNext(valueIfNull) } else { observer.onError(NullPointerException( "convert liveData value t to RxJava onNext(t), t cannot be null")) } } else { observer.onNext(t) } } } override fun onDispose() { liveData.removeObserver(this) } } } fun LiveData.asObservable(defaultValue: T? = null): Observable { return LiveDataObservable(this, defaultValue) }