diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt index 33cc2d52d..042c8ae52 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt @@ -1,10 +1,12 @@ package eu.kanade.tachiyomi.data.download.model import com.jakewharton.rxrelay.PublishRelay +import eu.kanade.core.util.asFlow import eu.kanade.domain.manga.model.Manga import eu.kanade.tachiyomi.data.database.models.Chapter import eu.kanade.tachiyomi.data.download.DownloadStore import eu.kanade.tachiyomi.source.model.Page +import kotlinx.coroutines.flow.Flow import rx.Observable import rx.subjects.PublishSubject import java.util.concurrent.CopyOnWriteArrayList @@ -72,8 +74,11 @@ class DownloadQueue( fun getActiveDownloads(): Observable = Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING } + @Deprecated("Use getStatusAsFlow instead") fun getStatusObservable(): Observable = statusSubject.onBackpressureBuffer() + fun getStatusAsFlow(): Flow = getStatusObservable().asFlow() + fun getUpdatedObservable(): Observable> = updatedRelay.onBackpressureBuffer() .startWith(Unit) .map { this } @@ -84,6 +89,7 @@ class DownloadQueue( } } + @Deprecated("Use getProgressAsFlow instead") fun getProgressObservable(): Observable { return statusSubject.onBackpressureBuffer() .startWith(getActiveDownloads()) @@ -103,6 +109,10 @@ class DownloadQueue( .filter { it.status == Download.State.DOWNLOADING } } + fun getProgressAsFlow(): Flow { + return getProgressObservable().asFlow() + } + private fun setPagesSubject(pages: List?, subject: PublishSubject?) { pages?.forEach { it.setStatusSubject(subject) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt index d55d169cc..f1da3b693 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt @@ -57,13 +57,4 @@ open class BasePresenter : RxPresenter() { * @param onError function to execute when the observable throws an error. */ fun Observable.subscribeLatestCache(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverLatestCache()).subscribe(split(onNext, onError)).apply { add(this) } - - /** - * Subscribes an observable with [deliverReplay] and adds it to the presenter's lifecycle - * subscription list. - * - * @param onNext function to execute when the observable emits an item. - * @param onError function to execute when the observable throws an error. - */ - fun Observable.subscribeReplay(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverReplay()).subscribe(split(onNext, onError)).apply { add(this) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt index 9d77dede5..d96cca0ae 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/BrowseSourcePresenter.kt @@ -51,15 +51,13 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.runBlocking import logcat.LogPriority -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.util.Date @@ -112,7 +110,7 @@ open class BrowseSourcePresenter( /** * Subscription for the pager. */ - private var pagerSubscription: Subscription? = null + private var pagerJob: Job? = null /** * Subscription for one request from the pager. @@ -129,7 +127,6 @@ open class BrowseSourcePresenter( super.onCreate(savedState) source = sourceManager.get(sourceId) as? CatalogueSource ?: return - sourceFilters = source.getFilterList() if (savedState != null) { @@ -158,25 +155,37 @@ open class BrowseSourcePresenter( pager = createPager(query, filters) val sourceId = source.id - val sourceDisplayMode = prefs.sourceDisplayMode() - // Prepare the pager. - pagerSubscription?.let { remove(it) } - pagerSubscription = pager.results() - .observeOn(Schedulers.io()) - .map { (first, second) -> first to second.map { networkToLocalManga(it, sourceId).toDomainManga()!! } } - .doOnNext { initializeMangas(it.second) } - .map { (first, second) -> first to second.map { SourceItem(it, sourceDisplayMode) } } - .observeOn(AndroidSchedulers.mainThread()) - .subscribeReplay( - { view, (page, mangas) -> - view.onAddPage(page, mangas) - }, - { _, error -> + pagerJob?.cancel() + pagerJob = presenterScope.launchIO { + pager.asFlow() + .map { (first, second) -> + first to second.map { + networkToLocalManga( + it, + sourceId, + ).toDomainManga()!! + } + } + .onEach { initializeMangas(it.second) } + .map { (first, second) -> + first to second.map { + SourceItem( + it, + sourceDisplayMode, + ) + } + } + .catch { error -> logcat(LogPriority.ERROR, error) - }, - ) + } + .collectLatest { (page, mangas) -> + withUIContext { + view?.onAddPage(page, mangas) + } + } + } // Request first page. requestNext() diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt index 0c3d9e858..0c8a895a9 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/browse/Pager.kt @@ -1,9 +1,10 @@ package eu.kanade.tachiyomi.ui.browse.source.browse import com.jakewharton.rxrelay.PublishRelay +import eu.kanade.core.util.asFlow import eu.kanade.tachiyomi.source.model.MangasPage import eu.kanade.tachiyomi.source.model.SManga -import rx.Observable +import kotlinx.coroutines.flow.Flow /** * A general pager for source requests (latest updates, popular, search) @@ -15,8 +16,8 @@ abstract class Pager(var currentPage: Int = 1) { protected val results: PublishRelay>> = PublishRelay.create() - fun results(): Observable>> { - return results.asObservable() + fun asFlow(): Flow>> { + return results.asObservable().asFlow() } abstract suspend fun requestNextPage() diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt index 96b235ed0..d7bf55bf3 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt @@ -59,6 +59,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.update @@ -66,9 +67,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.withContext import logcat.LogPriority -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.text.DateFormat @@ -119,8 +117,8 @@ class MangaPresenter( /** * Subscription to observe download status changes. */ - private var observeDownloadsStatusSubscription: Subscription? = null - private var observeDownloadsPageSubscription: Subscription? = null + private var observeDownloadsStatusJob: Job? = null + private var observeDownloadsPageJob: Job? = null private var _trackList: List = emptyList() val trackList get() = _trackList @@ -401,29 +399,29 @@ class MangaPresenter( // Chapters list - start private fun observeDownloads() { - observeDownloadsStatusSubscription?.let { remove(it) } - observeDownloadsStatusSubscription = downloadManager.queue.getStatusObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .filter { download -> download.manga.id == successState?.manga?.id } - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache( - { _, it -> updateDownloadState(it) }, - { _, error -> - logcat(LogPriority.ERROR, error) - }, - ) + observeDownloadsStatusJob?.cancel() + observeDownloadsStatusJob = presenterScope.launchIO { + downloadManager.queue.getStatusAsFlow() + .filter { it.manga.id == successState?.manga?.id } + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + updateDownloadState(it) + } + } + } - observeDownloadsPageSubscription?.let { remove(it) } - observeDownloadsPageSubscription = downloadManager.queue.getProgressObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .filter { download -> download.manga.id == successState?.manga?.id } - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache( - { _, download -> updateDownloadState(download) }, - { _, error -> logcat(LogPriority.ERROR, error) }, - ) + observeDownloadsPageJob?.cancel() + observeDownloadsPageJob = presenterScope.launchIO { + downloadManager.queue.getProgressAsFlow() + .filter { it.manga.id == successState?.manga?.id } + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + updateDownloadState(it) + } + } + } } private fun updateDownloadState(download: Download) { diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt index 530aae9d9..ddfd65a5f 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt @@ -17,31 +17,30 @@ import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter import eu.kanade.tachiyomi.ui.recent.DateSectionItem import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.lang.toDateKey +import eu.kanade.tachiyomi.util.lang.withUIContext import eu.kanade.tachiyomi.util.system.logcat import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.map import logcat.LogPriority -import rx.Observable -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers -import uy.kohesive.injekt.injectLazy +import uy.kohesive.injekt.Injekt +import uy.kohesive.injekt.api.get import java.text.DateFormat import java.util.Calendar import java.util.Date import java.util.TreeMap -class UpdatesPresenter : BasePresenter() { - - val preferences: PreferencesHelper by injectLazy() - private val downloadManager: DownloadManager by injectLazy() - private val sourceManager: SourceManager by injectLazy() - - private val handler: DatabaseHandler by injectLazy() - private val updateChapter: UpdateChapter by injectLazy() - private val setReadStatus: SetReadStatus by injectLazy() +class UpdatesPresenter( + private val preferences: PreferencesHelper = Injekt.get(), + private val downloadManager: DownloadManager = Injekt.get(), + private val sourceManager: SourceManager = Injekt.get(), + private val handler: DatabaseHandler = Injekt.get(), + private val updateChapter: UpdateChapter = Injekt.get(), + private val setReadStatus: SetReadStatus = Injekt.get(), +) : BasePresenter() { private val relativeTime: Int = preferences.relativeTime().get() private val dateFormat: DateFormat = preferences.dateFormat() @@ -52,77 +51,70 @@ class UpdatesPresenter : BasePresenter() { override fun onCreate(savedState: Bundle?) { super.onCreate(savedState) - getUpdatesObservable() + presenterScope.launchIO { + subscribeToUpdates() - downloadManager.queue.getStatusObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache( - { view, it -> - onDownloadStatusChange(it) - view.onChapterDownloadUpdate(it) - }, - { _, error -> - logcat(LogPriority.ERROR, error) - }, - ) + downloadManager.queue.getStatusAsFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + onDownloadStatusChange(it) + view?.onChapterDownloadUpdate(it) + } + } - downloadManager.queue.getProgressObservable() - .observeOn(Schedulers.io()) - .onBackpressureBuffer() - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache(UpdatesController::onChapterDownloadUpdate) { _, error -> - logcat(LogPriority.ERROR, error) - } + downloadManager.queue.getProgressAsFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collectLatest { + withUIContext { + view?.onChapterDownloadUpdate(it) + } + } + } } /** * Get observable containing recent chapters and date - * - * @return observable containing recent chapters and date */ - private fun getUpdatesObservable() { + private suspend fun subscribeToUpdates() { // Set date limit for recent chapters - presenterScope.launchIO { - val cal = Calendar.getInstance().apply { - time = Date() - add(Calendar.MONTH, -3) - } - - handler - .subscribeToList { - mangasQueries.getRecentlyUpdated(after = cal.timeInMillis, mangaChapterMapper) - } - .map { mangaChapter -> - val map = TreeMap>> { d1, d2 -> d2.compareTo(d1) } - val byDate = mangaChapter.groupByTo(map) { it.second.dateFetch.toDateKey() } - byDate.flatMap { entry -> - val dateItem = DateSectionItem(entry.key, relativeTime, dateFormat) - entry.value - .sortedWith(compareBy({ it.second.dateFetch }, { it.second.chapterNumber })).asReversed() - .map { UpdatesItem(it.second, it.first, dateItem) } - } - } - .collectLatest { list -> - list.forEach { item -> - // Find an active download for this chapter. - val download = downloadManager.queue.find { it.chapter.id == item.chapter.id } - - // If there's an active download, assign it, otherwise ask the manager if - // the chapter is downloaded and assign it to the status. - if (download != null) { - item.download = download - } - } - setDownloadedChapters(list) - - _updates.value = list - - // Set unread chapter count for bottom bar badge - preferences.unreadUpdatesCount().set(list.count { !it.chapter.read }) - } + val cal = Calendar.getInstance().apply { + time = Date() + add(Calendar.MONTH, -3) } + + handler + .subscribeToList { + mangasQueries.getRecentlyUpdated(after = cal.timeInMillis, mangaChapterMapper) + } + .map { mangaChapter -> + val map = TreeMap>> { d1, d2 -> d2.compareTo(d1) } + val byDate = mangaChapter.groupByTo(map) { it.second.dateFetch.toDateKey() } + byDate.flatMap { entry -> + val dateItem = DateSectionItem(entry.key, relativeTime, dateFormat) + entry.value + .sortedWith(compareBy({ it.second.dateFetch }, { it.second.chapterNumber })).asReversed() + .map { UpdatesItem(it.second, it.first, dateItem) } + } + } + .collectLatest { list -> + list.forEach { item -> + // Find an active download for this chapter. + val download = downloadManager.queue.find { it.chapter.id == item.chapter.id } + + // If there's an active download, assign it, otherwise ask the manager if + // the chapter is downloaded and assign it to the status. + if (download != null) { + item.download = download + } + } + setDownloadedChapters(list) + + _updates.value = list + + // Set unread chapter count for bottom bar badge + preferences.unreadUpdatesCount().set(list.count { !it.chapter.read }) + } } /** @@ -184,16 +176,14 @@ class UpdatesPresenter : BasePresenter() { * @param chapters list of chapters */ fun deleteChapters(chapters: List) { - Observable.just(chapters) - .doOnNext { deleteChaptersInternal(it) } - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .subscribeFirst( - { view, _ -> - view.onChaptersDeleted() - }, - UpdatesController::onChaptersDeletedError, - ) + launchIO { + try { + deleteChaptersInternal(chapters) + withUIContext { view?.onChaptersDeleted() } + } catch (e: Throwable) { + withUIContext { view?.onChaptersDeletedError(e) } + } + } } /**