Replace RxJava in HttpPageLoader downloader (#8955)
* Convert downloader Observable to flow Uses `runInterruptible` to turn the blocking call to `queue.take()` into a cancellable call. Flow collection is ended by cancelling the scope in `recycle`. This means the `HttpPageLoader` can't be reused after calling `recycle`, but this was true with the `Observable` as well.) * Convert load Observables to suspending function Inlining the Observables allows for some simplification of the error handling. Behavior should be otherwise identical. * Convert cleanup Completable to coroutine Uses global `launchIO`, not ideal but similar to previous behavior. Can't be scheduled on the local `scope` as this runs after `scope` is cancelled.
This commit is contained in:
parent
a179327d9d
commit
e4bc8990fb
1 changed files with 53 additions and 82 deletions
|
@ -6,16 +6,20 @@ import eu.kanade.tachiyomi.source.model.Page
|
|||
import eu.kanade.tachiyomi.source.online.HttpSource
|
||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
|
||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
|
||||
import eu.kanade.tachiyomi.util.lang.plusAssign
|
||||
import eu.kanade.tachiyomi.util.system.logcat
|
||||
import eu.kanade.tachiyomi.util.lang.awaitSingle
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import logcat.LogPriority
|
||||
import rx.Completable
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.runInterruptible
|
||||
import rx.Observable
|
||||
import rx.schedulers.Schedulers
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.subjects.SerializedSubject
|
||||
import rx.subscriptions.CompositeSubscription
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.util.concurrent.PriorityBlockingQueue
|
||||
|
@ -31,33 +35,27 @@ class HttpPageLoader(
|
|||
private val chapterCache: ChapterCache = Injekt.get(),
|
||||
) : PageLoader() {
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
|
||||
/**
|
||||
* A queue used to manage requests one by one while allowing priorities.
|
||||
*/
|
||||
private val queue = PriorityBlockingQueue<PriorityPage>()
|
||||
|
||||
/**
|
||||
* Current active subscriptions.
|
||||
*/
|
||||
private val subscriptions = CompositeSubscription()
|
||||
|
||||
private val preloadSize = 4
|
||||
|
||||
init {
|
||||
subscriptions += Observable.defer { Observable.just(queue.take().page) }
|
||||
.filter { it.status == Page.State.QUEUE }
|
||||
.concatMap { source.fetchImageFromCacheThenNet(it) }
|
||||
.repeat()
|
||||
.subscribeOn(Schedulers.io())
|
||||
.subscribe(
|
||||
{
|
||||
},
|
||||
{ error ->
|
||||
if (error !is InterruptedException) {
|
||||
logcat(LogPriority.ERROR, error)
|
||||
}
|
||||
},
|
||||
)
|
||||
scope.launchIO {
|
||||
flow {
|
||||
while (true) {
|
||||
emit(runInterruptible { queue.take() }.page)
|
||||
}
|
||||
}
|
||||
.filter { it.status == Page.State.QUEUE }
|
||||
.collect {
|
||||
loadPage(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -65,21 +63,23 @@ class HttpPageLoader(
|
|||
*/
|
||||
override fun recycle() {
|
||||
super.recycle()
|
||||
subscriptions.unsubscribe()
|
||||
scope.cancel()
|
||||
queue.clear()
|
||||
|
||||
// Cache current page list progress for online chapters to allow a faster reopen
|
||||
val pages = chapter.pages
|
||||
if (pages != null) {
|
||||
Completable
|
||||
.fromAction {
|
||||
launchIO {
|
||||
try {
|
||||
// Convert to pages without reader information
|
||||
val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) }
|
||||
chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
|
||||
} catch (e: Throwable) {
|
||||
if (e is CancellationException) {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
.onErrorComplete()
|
||||
.subscribeOn(Schedulers.io())
|
||||
.subscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -192,61 +192,32 @@ class HttpPageLoader(
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns an observable of the page with the downloaded image.
|
||||
* Loads the page, retrieving the image URL and downloading the image if necessary.
|
||||
* Downloaded images are stored in the chapter cache.
|
||||
*
|
||||
* @param page the page whose source image has to be downloaded.
|
||||
*/
|
||||
private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable<ReaderPage> {
|
||||
return if (page.imageUrl.isNullOrEmpty()) {
|
||||
getImageUrl(page).flatMap { getCachedImage(it) }
|
||||
} else {
|
||||
getCachedImage(page)
|
||||
private suspend fun loadPage(page: ReaderPage) {
|
||||
try {
|
||||
if (page.imageUrl.isNullOrEmpty()) {
|
||||
page.status = Page.State.LOAD_PAGE
|
||||
page.imageUrl = source.fetchImageUrl(page).awaitSingle()
|
||||
}
|
||||
val imageUrl = page.imageUrl!!
|
||||
|
||||
if (!chapterCache.isImageInCache(imageUrl)) {
|
||||
page.status = Page.State.DOWNLOAD_IMAGE
|
||||
val imageResponse = source.fetchImage(page).awaitSingle()
|
||||
chapterCache.putImageToCache(imageUrl, imageResponse)
|
||||
}
|
||||
|
||||
page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
|
||||
page.status = Page.State.READY
|
||||
} catch (e: Throwable) {
|
||||
page.status = Page.State.ERROR
|
||||
if (e is CancellationException) {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun HttpSource.getImageUrl(page: ReaderPage): Observable<ReaderPage> {
|
||||
page.status = Page.State.LOAD_PAGE
|
||||
return fetchImageUrl(page)
|
||||
.doOnError { page.status = Page.State.ERROR }
|
||||
.onErrorReturn { null }
|
||||
.doOnNext { page.imageUrl = it }
|
||||
.map { page }
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an observable of the page that gets the image from the chapter or fallbacks to
|
||||
* network and copies it to the cache calling [cacheImage].
|
||||
*
|
||||
* @param page the page.
|
||||
*/
|
||||
private fun HttpSource.getCachedImage(page: ReaderPage): Observable<ReaderPage> {
|
||||
val imageUrl = page.imageUrl ?: return Observable.just(page)
|
||||
|
||||
return Observable.just(page)
|
||||
.flatMap {
|
||||
if (!chapterCache.isImageInCache(imageUrl)) {
|
||||
cacheImage(page)
|
||||
} else {
|
||||
Observable.just(page)
|
||||
}
|
||||
}
|
||||
.doOnNext {
|
||||
page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
|
||||
page.status = Page.State.READY
|
||||
}
|
||||
.doOnError { page.status = Page.State.ERROR }
|
||||
.onErrorReturn { page }
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an observable of the page that downloads the image to [ChapterCache].
|
||||
*
|
||||
* @param page the page.
|
||||
*/
|
||||
private fun HttpSource.cacheImage(page: ReaderPage): Observable<ReaderPage> {
|
||||
page.status = Page.State.DOWNLOAD_IMAGE
|
||||
return fetchImage(page)
|
||||
.doOnNext { chapterCache.putImageToCache(page.imageUrl!!, it) }
|
||||
.map { page }
|
||||
}
|
||||
}
|
||||
|
|
Reference in a new issue