From 37118088d44d6597e7a3c01687d8570fe4bb3c4f Mon Sep 17 00:00:00 2001 From: arkon Date: Sat, 26 Nov 2022 10:07:51 -0500 Subject: [PATCH] Remove usage of PublishRelay in DownloadQueue --- .../tachiyomi/data/download/DownloadCache.kt | 4 +-- .../data/download/model/DownloadQueue.kt | 31 ++++++++++++------- .../ui/download/DownloadPresenter.kt | 2 +- .../eu/kanade/tachiyomi/ui/more/MoreScreen.kt | 2 +- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt index 9d2df3e28..b000625d0 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt @@ -44,11 +44,11 @@ class DownloadCache( private val downloadPreferences: DownloadPreferences = Injekt.get(), ) { + private val scope = CoroutineScope(Dispatchers.IO) + private val _changes: Channel = Channel(Channel.UNLIMITED) val changes = _changes.receiveAsFlow().onStart { emit(Unit) } - private val scope = CoroutineScope(Dispatchers.IO) - private val notifier by lazy { DownloadNotifier(context) } /** diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt index 9e83baace..2701c2c5d 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt @@ -1,12 +1,18 @@ package eu.kanade.tachiyomi.data.download.model -import com.jakewharton.rxrelay.PublishRelay import eu.kanade.core.util.asFlow import eu.kanade.domain.chapter.model.Chapter import eu.kanade.domain.manga.model.Manga import eu.kanade.tachiyomi.data.download.DownloadStore import eu.kanade.tachiyomi.source.model.Page +import eu.kanade.tachiyomi.util.lang.launchNonCancellable +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.receiveAsFlow import rx.Observable import rx.subjects.PublishSubject import java.util.concurrent.CopyOnWriteArrayList @@ -16,9 +22,12 @@ class DownloadQueue( private val queue: MutableList = CopyOnWriteArrayList(), ) : List by queue { + private val scope = CoroutineScope(Dispatchers.IO) + private val statusSubject = PublishSubject.create() - private val updatedRelay = PublishRelay.create() + private val _updates: Channel = Channel(Channel.UNLIMITED) + val updates = _updates.receiveAsFlow().onStart { emit(Unit) }.map { queue } fun addAll(downloads: List) { downloads.forEach { download -> @@ -28,7 +37,9 @@ class DownloadQueue( } queue.addAll(downloads) store.addAll(downloads) - updatedRelay.call(Unit) + scope.launchNonCancellable { + _updates.send(Unit) + } } fun remove(download: Download) { @@ -40,7 +51,9 @@ class DownloadQueue( download.status = Download.State.NOT_DOWNLOADED } if (removed) { - updatedRelay.call(Unit) + scope.launchNonCancellable { + _updates.send(Unit) + } } } @@ -68,7 +81,9 @@ class DownloadQueue( } queue.clear() store.clear() - updatedRelay.call(Unit) + scope.launchNonCancellable { + _updates.send(Unit) + } } private fun getActiveDownloads(): Observable = @@ -80,12 +95,6 @@ class DownloadQueue( fun statusFlow(): Flow = getStatusObservable().asFlow() - private fun getUpdatedObservable(): Observable> = updatedRelay.onBackpressureBuffer() - .startWith(Unit) - .map { this } - - fun updatedFlow(): Flow> = getUpdatedObservable().asFlow() - private fun setPagesFor(download: Download) { if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) { setPagesSubject(download.pages, null) diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt index 19f80b786..2f42538bd 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt @@ -34,7 +34,7 @@ class DownloadPresenter : BasePresenter() { super.onCreate(savedState) presenterScope.launch { - downloadQueue.updatedFlow() + downloadQueue.updates .catch { logcat(LogPriority.ERROR, it) } .map { downloads -> downloads diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreScreen.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreScreen.kt index 32a32e25e..b0b14353f 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreScreen.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreScreen.kt @@ -69,7 +69,7 @@ private class MoreScreenModel( coroutineScope.launchIO { combine( DownloadService.isRunning, - downloadManager.queue.updatedFlow(), + downloadManager.queue.updates, ) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) } .collectLatest { (isDownloading, downloadQueueSize) -> val pendingDownloadExists = downloadQueueSize != 0