From b49280e347d857c7f1dd17805f601ed2766e650e Mon Sep 17 00:00:00 2001 From: arkon Date: Sat, 18 Feb 2023 10:16:05 -0500 Subject: [PATCH] Remove unused Rx/Coroutines converters --- .../eu/kanade/core/util/RxJavaExtensions.kt | 61 ------------------- .../tachiyomi/data/download/Downloader.kt | 4 +- .../tachiyomi/util/lang/RxExtensions.kt | 6 -- .../core/util/lang/RxCoroutineBridge.kt | 35 +---------- 4 files changed, 4 insertions(+), 102 deletions(-) delete mode 100644 app/src/main/java/eu/kanade/core/util/RxJavaExtensions.kt delete mode 100644 app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt diff --git a/app/src/main/java/eu/kanade/core/util/RxJavaExtensions.kt b/app/src/main/java/eu/kanade/core/util/RxJavaExtensions.kt deleted file mode 100644 index b54fa63ab..000000000 --- a/app/src/main/java/eu/kanade/core/util/RxJavaExtensions.kt +++ /dev/null @@ -1,61 +0,0 @@ -package eu.kanade.core.util - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.launch -import rx.Emitter -import rx.Observable -import rx.Observer -import kotlin.coroutines.CoroutineContext - -fun Observable.asFlow(): Flow = callbackFlow { - val observer = object : Observer { - override fun onNext(t: T) { - trySend(t) - } - - override fun onError(e: Throwable) { - close(e) - } - - override fun onCompleted() { - close() - } - } - val subscription = subscribe(observer) - awaitClose { subscription.unsubscribe() } -} - -fun Flow.asObservable( - context: CoroutineContext = Dispatchers.Unconfined, - backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE, -): Observable { - return Observable.create( - { emitter -> - /* - * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if - * asObservable is already invoked from unconfined - */ - val job = GlobalScope.launch(context = context, start = CoroutineStart.ATOMIC) { - try { - collect { emitter.onNext(it) } - emitter.onCompleted() - } catch (e: Throwable) { - // Ignore `CancellationException` as error, since it indicates "normal cancellation" - if (e !is CancellationException) { - emitter.onError(e) - } else { - emitter.onCompleted() - } - } - } - emitter.setCancellation { job.cancel() } - }, - backpressureMode, - ) -} diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt index 887e62ef8..283c23a71 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt @@ -20,7 +20,6 @@ import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.source.online.fetchAllImageUrlsFromPageList import eu.kanade.tachiyomi.util.lang.RetryWithDelay -import eu.kanade.tachiyomi.util.lang.plusAssign import eu.kanade.tachiyomi.util.storage.DiskUtil import eu.kanade.tachiyomi.util.storage.DiskUtil.NOMEDIA_FILE import eu.kanade.tachiyomi.util.storage.saveTo @@ -30,6 +29,7 @@ import logcat.LogPriority import nl.adaptivity.xmlutil.serialization.XML import okhttp3.Response import rx.Observable +import rx.Subscription import rx.android.schedulers.AndroidSchedulers import rx.schedulers.Schedulers import rx.subscriptions.CompositeSubscription @@ -637,6 +637,8 @@ class Downloader( return queue.none { it.status.value <= Download.State.DOWNLOADING.value } } + private operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription) + companion object { const val TMP_DIR_SUFFIX = "_tmp" const val WARNING_NOTIF_TIMEOUT_MS = 30_000L diff --git a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt deleted file mode 100644 index dc162bf2c..000000000 --- a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt +++ /dev/null @@ -1,6 +0,0 @@ -package eu.kanade.tachiyomi.util.lang - -import rx.Subscription -import rx.subscriptions.CompositeSubscription - -operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription) diff --git a/core/src/main/java/tachiyomi/core/util/lang/RxCoroutineBridge.kt b/core/src/main/java/tachiyomi/core/util/lang/RxCoroutineBridge.kt index 8cba4caac..56f58e1d4 100644 --- a/core/src/main/java/tachiyomi/core/util/lang/RxCoroutineBridge.kt +++ b/core/src/main/java/tachiyomi/core/util/lang/RxCoroutineBridge.kt @@ -1,15 +1,8 @@ package tachiyomi.core.util.lang import kotlinx.coroutines.CancellableContinuation -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine -import rx.Emitter import rx.Observable import rx.Subscriber import rx.Subscription @@ -61,31 +54,5 @@ private suspend fun Observable.awaitOne(): T = suspendCancellableCoroutin ) } -internal fun CancellableContinuation.unsubscribeOnCancellation(sub: Subscription) = +private fun CancellableContinuation.unsubscribeOnCancellation(sub: Subscription) = invokeOnCancellation { sub.unsubscribe() } - -@OptIn(ExperimentalCoroutinesApi::class) -fun runAsObservable( - backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE, - block: suspend () -> T, -): Observable { - return Observable.create( - { emitter -> - val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { - try { - emitter.onNext(block()) - emitter.onCompleted() - } catch (e: Throwable) { - // Ignore `CancellationException` as error, since it indicates "normal cancellation" - if (e !is CancellationException) { - emitter.onError(e) - } else { - emitter.onCompleted() - } - } - } - emitter.setCancellation { job.cancel() } - }, - backpressureMode, - ) -}