diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DirectoryPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DirectoryPageLoader.kt index 8d4e19e7c..cff8e38a6 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DirectoryPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DirectoryPageLoader.kt @@ -4,7 +4,6 @@ import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.ui.reader.model.ReaderPage import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import eu.kanade.tachiyomi.util.system.ImageUtil -import rx.Observable import java.io.File import java.io.FileInputStream @@ -30,9 +29,7 @@ class DirectoryPageLoader(val file: File) : PageLoader() { } /** - * Returns an observable that emits a ready state. + * No additional action required to load the page */ - override fun getPage(page: ReaderPage): Observable { - return Observable.just(Page.State.READY) - } + override suspend fun loadPage(page: ReaderPage) {} } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DownloadPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DownloadPageLoader.kt index 684c5f6f7..bd47b0915 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DownloadPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/DownloadPageLoader.kt @@ -10,7 +10,6 @@ import eu.kanade.tachiyomi.source.Source import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter import eu.kanade.tachiyomi.ui.reader.model.ReaderPage -import rx.Observable import tachiyomi.domain.manga.model.Manga import uy.kohesive.injekt.injectLazy import java.io.File @@ -65,7 +64,7 @@ class DownloadPageLoader( } } - override fun getPage(page: ReaderPage): Observable { - return zipPageLoader?.getPage(page) ?: Observable.just(Page.State.READY) + override suspend fun loadPage(page: ReaderPage) { + zipPageLoader?.loadPage(page) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/EpubPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/EpubPageLoader.kt index 93f931c50..6d581f2ba 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/EpubPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/EpubPageLoader.kt @@ -3,7 +3,6 @@ package eu.kanade.tachiyomi.ui.reader.loader import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.ui.reader.model.ReaderPage import eu.kanade.tachiyomi.util.storage.EpubFile -import rx.Observable import java.io.File /** @@ -39,15 +38,9 @@ class EpubPageLoader(file: File) : PageLoader() { } /** - * Returns an observable that emits a ready state unless the loader was recycled. + * No additional action required to load the page */ - override fun getPage(page: ReaderPage): Observable { - return Observable.just( - if (isRecycled) { - Page.State.ERROR - } else { - Page.State.READY - }, - ) + override suspend fun loadPage(page: ReaderPage) { + check(!isRecycled) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt index c6db6f12b..4d5480fa6 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt @@ -8,6 +8,7 @@ import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter import eu.kanade.tachiyomi.ui.reader.model.ReaderPage import eu.kanade.tachiyomi.util.lang.awaitSingle import eu.kanade.tachiyomi.util.lang.launchIO +import eu.kanade.tachiyomi.util.lang.withIOContext import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -16,10 +17,7 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runInterruptible -import rx.Observable -import rx.schedulers.Schedulers -import rx.subjects.PublishSubject -import rx.subjects.SerializedSubject +import kotlinx.coroutines.suspendCancellableCoroutine import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.util.concurrent.PriorityBlockingQueue @@ -53,7 +51,7 @@ class HttpPageLoader( } .filter { it.status == Page.State.QUEUE } .collect { - loadPage(it) + _loadPage(it) } } } @@ -103,11 +101,10 @@ class HttpPageLoader( } /** - * Returns an observable that loads a page through the queue and listens to its result to - * emit new states. It handles re-enqueueing pages if they were evicted from the cache. + * Loads a page through the queue. Handles re-enqueueing pages if they were evicted from the cache. */ - override fun getPage(page: ReaderPage): Observable { - return Observable.defer { + override suspend fun loadPage(page: ReaderPage) { + withIOContext { val imageUrl = page.imageUrl // Check if the image has been deleted @@ -120,26 +117,22 @@ class HttpPageLoader( page.status = Page.State.QUEUE } - val statusSubject = SerializedSubject(PublishSubject.create()) - page.statusSubject = statusSubject - val queuedPages = mutableListOf() if (page.status == Page.State.QUEUE) { queuedPages += PriorityPage(page, 1).also { queue.offer(it) } } queuedPages += preloadNextPages(page, preloadSize) - statusSubject.startWith(page.status) - .doOnUnsubscribe { + suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { queuedPages.forEach { if (it.page.status == Page.State.QUEUE) { queue.remove(it) } } } + } } - .subscribeOn(Schedulers.io()) - .unsubscribeOn(Schedulers.io()) } /** @@ -197,7 +190,7 @@ class HttpPageLoader( * * @param page the page whose source image has to be downloaded. */ - private suspend fun loadPage(page: ReaderPage) { + private suspend fun _loadPage(page: ReaderPage) { try { if (page.imageUrl.isNullOrEmpty()) { page.status = Page.State.LOAD_PAGE diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/PageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/PageLoader.kt index c9417ae70..720e81a43 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/PageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/PageLoader.kt @@ -1,9 +1,7 @@ package eu.kanade.tachiyomi.ui.reader.loader import androidx.annotation.CallSuper -import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.ui.reader.model.ReaderPage -import rx.Observable /** * A loader used to load pages into the reader. Any open resources must be cleaned up when the @@ -32,9 +30,11 @@ abstract class PageLoader { abstract suspend fun getPages(): List /** - * Returns an observable that should inform of the progress of the page + * Loads the page. May also preload other pages. + * Progress of the page loading should be followed via [page.statusFlow]. + * [loadPage] is not currently guaranteed to complete, so it should be launched asynchronously. */ - abstract fun getPage(page: ReaderPage): Observable + abstract suspend fun loadPage(page: ReaderPage) /** * Retries the given [page] in case it failed to load. This method only makes sense when an diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/RarPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/RarPageLoader.kt index bf3fd0178..58debe3b1 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/RarPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/RarPageLoader.kt @@ -6,7 +6,6 @@ import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.ui.reader.model.ReaderPage import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import eu.kanade.tachiyomi.util.system.ImageUtil -import rx.Observable import java.io.File import java.io.InputStream import java.io.PipedInputStream @@ -55,16 +54,10 @@ class RarPageLoader(file: File) : PageLoader() { } /** - * Returns an observable that emits a ready state unless the loader was recycled. + * No additional action required to load the page */ - override fun getPage(page: ReaderPage): Observable { - return Observable.just( - if (isRecycled) { - Page.State.ERROR - } else { - Page.State.READY - }, - ) + override suspend fun loadPage(page: ReaderPage) { + check(!isRecycled) } /** diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/ZipPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/ZipPageLoader.kt index 62b11bfc8..543241b30 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/ZipPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/ZipPageLoader.kt @@ -5,7 +5,6 @@ import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.ui.reader.model.ReaderPage import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import eu.kanade.tachiyomi.util.system.ImageUtil -import rx.Observable import java.io.File import java.nio.charset.StandardCharsets import java.util.zip.ZipFile @@ -49,15 +48,9 @@ class ZipPageLoader(file: File) : PageLoader() { } /** - * Returns an observable that emits a ready state unless the loader was recycled. + * No additional action required to load the page */ - override fun getPage(page: ReaderPage): Observable { - return Observable.just( - if (isRecycled) { - Page.State.ERROR - } else { - Page.State.READY - }, - ) + override suspend fun loadPage(page: ReaderPage) { + check(!isRecycled) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/pager/PagerPageHolder.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/pager/PagerPageHolder.kt index e3d44bf83..aca49cd24 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/pager/PagerPageHolder.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/pager/PagerPageHolder.kt @@ -60,9 +60,14 @@ class PagerPageHolder( private val scope = MainScope() /** - * Subscription for status changes of the page. + * Job for loading the page. */ - private var statusSubscription: Subscription? = null + private var loadJob: Job? = null + + /** + * Job for status changes of the page. + */ + private var statusJob: Job? = null /** * Job for progress changes of the page. @@ -77,7 +82,7 @@ class PagerPageHolder( init { addView(progressIndicator) - observeStatus() + launchLoadJob() } /** @@ -87,22 +92,26 @@ class PagerPageHolder( override fun onDetachedFromWindow() { super.onDetachedFromWindow() cancelProgressJob() - unsubscribeStatus() + cancelLoadJob() unsubscribeReadImageHeader() } /** - * Observes the status of the page and notify the changes. + * Starts loading the page and processing changes to the page's status. * * @see processStatus */ - private fun observeStatus() { - statusSubscription?.unsubscribe() + private fun launchLoadJob() { + loadJob?.cancel() + statusJob?.cancel() val loader = page.chapter.pageLoader ?: return - statusSubscription = loader.getPage(page) - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { processStatus(it) } + loadJob = scope.launch { + loader.loadPage(page) + } + statusJob = scope.launch { + page.statusFlow.collectLatest { processStatus(it) } + } } private fun launchProgressJob() { @@ -137,11 +146,13 @@ class PagerPageHolder( } /** - * Unsubscribes from the status subscription. + * Cancels loading the page and processing changes to the page's status. */ - private fun unsubscribeStatus() { - statusSubscription?.unsubscribe() - statusSubscription = null + private fun cancelLoadJob() { + loadJob?.cancel() + loadJob = null + statusJob?.cancel() + statusJob = null } private fun cancelProgressJob() { diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/webtoon/WebtoonPageHolder.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/webtoon/WebtoonPageHolder.kt index 908fac4bc..06cb6f74c 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/webtoon/WebtoonPageHolder.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/viewer/webtoon/WebtoonPageHolder.kt @@ -73,9 +73,14 @@ class WebtoonPageHolder( private val scope = MainScope() /** - * Subscription for status changes of the page. + * Job for loading the page. */ - private var statusSubscription: Subscription? = null + private var loadJob: Job? = null + + /** + * Job for status changes of the page. + */ + private var statusJob: Job? = null /** * Job for progress changes of the page. @@ -101,7 +106,7 @@ class WebtoonPageHolder( */ fun bind(page: ReaderPage) { this.page = page - observeStatus() + launchLoadJob() refreshLayoutParams() } @@ -121,7 +126,7 @@ class WebtoonPageHolder( * Called when the view is recycled and added to the view pool. */ override fun recycle() { - unsubscribeStatus() + cancelLoadJob() cancelProgressJob() unsubscribeReadImageHeader() @@ -131,20 +136,21 @@ class WebtoonPageHolder( } /** - * Observes the status of the page and notify the changes. + * Starts loading the page and processing changes to the page's status. * * @see processStatus */ - private fun observeStatus() { - unsubscribeStatus() + private fun launchLoadJob() { + cancelLoadJob() val page = page ?: return val loader = page.chapter.pageLoader ?: return - statusSubscription = loader.getPage(page) - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { processStatus(it) } - - addSubscription(statusSubscription) + loadJob = scope.launch { + loader.loadPage(page) + } + statusJob = scope.launch { + page.statusFlow.collectLatest { processStatus(it) } + } } /** @@ -185,11 +191,13 @@ class WebtoonPageHolder( } /** - * Unsubscribes from the status subscription. + * Cancels loading the page and processing changes to the page's status. */ - private fun unsubscribeStatus() { - removeSubscription(statusSubscription) - statusSubscription = null + private fun cancelLoadJob() { + loadJob?.cancel() + loadJob = null + statusJob?.cancel() + statusJob = null } /** diff --git a/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt b/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt index a43918f55..efd169e88 100644 --- a/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt +++ b/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt @@ -20,10 +20,14 @@ open class Page( get() = index + 1 @Transient - @Volatile - var status: State = State.QUEUE + private val _statusFlow = MutableStateFlow(State.QUEUE) + + @Transient + val statusFlow = _statusFlow.asStateFlow() + var status: State + get() = _statusFlow.value set(value) { - field = value + _statusFlow.value = value statusSubject?.onNext(value) }