Replace RxJava in HttpPageLoader downloader (#8955)
* Convert downloader Observable to flow Uses `runInterruptible` to turn the blocking call to `queue.take()` into a cancellable call. Flow collection is ended by cancelling the scope in `recycle`. This means the `HttpPageLoader` can't be reused after calling `recycle`, but this was true with the `Observable` as well.) * Convert load Observables to suspending function Inlining the Observables allows for some simplification of the error handling. Behavior should be otherwise identical. * Convert cleanup Completable to coroutine Uses global `launchIO`, not ideal but similar to previous behavior. Can't be scheduled on the local `scope` as this runs after `scope` is cancelled.
This commit is contained in:
parent
a179327d9d
commit
e4bc8990fb
1 changed files with 53 additions and 82 deletions
|
@ -6,16 +6,20 @@ import eu.kanade.tachiyomi.source.model.Page
|
||||||
import eu.kanade.tachiyomi.source.online.HttpSource
|
import eu.kanade.tachiyomi.source.online.HttpSource
|
||||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
|
import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
|
||||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
|
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
|
||||||
import eu.kanade.tachiyomi.util.lang.plusAssign
|
import eu.kanade.tachiyomi.util.lang.awaitSingle
|
||||||
import eu.kanade.tachiyomi.util.system.logcat
|
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||||
import kotlinx.coroutines.CancellationException
|
import kotlinx.coroutines.CancellationException
|
||||||
import logcat.LogPriority
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import rx.Completable
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.SupervisorJob
|
||||||
|
import kotlinx.coroutines.cancel
|
||||||
|
import kotlinx.coroutines.flow.filter
|
||||||
|
import kotlinx.coroutines.flow.flow
|
||||||
|
import kotlinx.coroutines.runInterruptible
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.schedulers.Schedulers
|
import rx.schedulers.Schedulers
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import rx.subjects.SerializedSubject
|
import rx.subjects.SerializedSubject
|
||||||
import rx.subscriptions.CompositeSubscription
|
|
||||||
import uy.kohesive.injekt.Injekt
|
import uy.kohesive.injekt.Injekt
|
||||||
import uy.kohesive.injekt.api.get
|
import uy.kohesive.injekt.api.get
|
||||||
import java.util.concurrent.PriorityBlockingQueue
|
import java.util.concurrent.PriorityBlockingQueue
|
||||||
|
@ -31,33 +35,27 @@ class HttpPageLoader(
|
||||||
private val chapterCache: ChapterCache = Injekt.get(),
|
private val chapterCache: ChapterCache = Injekt.get(),
|
||||||
) : PageLoader() {
|
) : PageLoader() {
|
||||||
|
|
||||||
|
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A queue used to manage requests one by one while allowing priorities.
|
* A queue used to manage requests one by one while allowing priorities.
|
||||||
*/
|
*/
|
||||||
private val queue = PriorityBlockingQueue<PriorityPage>()
|
private val queue = PriorityBlockingQueue<PriorityPage>()
|
||||||
|
|
||||||
/**
|
|
||||||
* Current active subscriptions.
|
|
||||||
*/
|
|
||||||
private val subscriptions = CompositeSubscription()
|
|
||||||
|
|
||||||
private val preloadSize = 4
|
private val preloadSize = 4
|
||||||
|
|
||||||
init {
|
init {
|
||||||
subscriptions += Observable.defer { Observable.just(queue.take().page) }
|
scope.launchIO {
|
||||||
.filter { it.status == Page.State.QUEUE }
|
flow {
|
||||||
.concatMap { source.fetchImageFromCacheThenNet(it) }
|
while (true) {
|
||||||
.repeat()
|
emit(runInterruptible { queue.take() }.page)
|
||||||
.subscribeOn(Schedulers.io())
|
}
|
||||||
.subscribe(
|
}
|
||||||
{
|
.filter { it.status == Page.State.QUEUE }
|
||||||
},
|
.collect {
|
||||||
{ error ->
|
loadPage(it)
|
||||||
if (error !is InterruptedException) {
|
}
|
||||||
logcat(LogPriority.ERROR, error)
|
}
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,21 +63,23 @@ class HttpPageLoader(
|
||||||
*/
|
*/
|
||||||
override fun recycle() {
|
override fun recycle() {
|
||||||
super.recycle()
|
super.recycle()
|
||||||
subscriptions.unsubscribe()
|
scope.cancel()
|
||||||
queue.clear()
|
queue.clear()
|
||||||
|
|
||||||
// Cache current page list progress for online chapters to allow a faster reopen
|
// Cache current page list progress for online chapters to allow a faster reopen
|
||||||
val pages = chapter.pages
|
val pages = chapter.pages
|
||||||
if (pages != null) {
|
if (pages != null) {
|
||||||
Completable
|
launchIO {
|
||||||
.fromAction {
|
try {
|
||||||
// Convert to pages without reader information
|
// Convert to pages without reader information
|
||||||
val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) }
|
val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) }
|
||||||
chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
|
chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
|
||||||
|
} catch (e: Throwable) {
|
||||||
|
if (e is CancellationException) {
|
||||||
|
throw e
|
||||||
|
}
|
||||||
}
|
}
|
||||||
.onErrorComplete()
|
}
|
||||||
.subscribeOn(Schedulers.io())
|
|
||||||
.subscribe()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,61 +192,32 @@ class HttpPageLoader(
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an observable of the page with the downloaded image.
|
* Loads the page, retrieving the image URL and downloading the image if necessary.
|
||||||
|
* Downloaded images are stored in the chapter cache.
|
||||||
*
|
*
|
||||||
* @param page the page whose source image has to be downloaded.
|
* @param page the page whose source image has to be downloaded.
|
||||||
*/
|
*/
|
||||||
private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable<ReaderPage> {
|
private suspend fun loadPage(page: ReaderPage) {
|
||||||
return if (page.imageUrl.isNullOrEmpty()) {
|
try {
|
||||||
getImageUrl(page).flatMap { getCachedImage(it) }
|
if (page.imageUrl.isNullOrEmpty()) {
|
||||||
} else {
|
page.status = Page.State.LOAD_PAGE
|
||||||
getCachedImage(page)
|
page.imageUrl = source.fetchImageUrl(page).awaitSingle()
|
||||||
|
}
|
||||||
|
val imageUrl = page.imageUrl!!
|
||||||
|
|
||||||
|
if (!chapterCache.isImageInCache(imageUrl)) {
|
||||||
|
page.status = Page.State.DOWNLOAD_IMAGE
|
||||||
|
val imageResponse = source.fetchImage(page).awaitSingle()
|
||||||
|
chapterCache.putImageToCache(imageUrl, imageResponse)
|
||||||
|
}
|
||||||
|
|
||||||
|
page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
|
||||||
|
page.status = Page.State.READY
|
||||||
|
} catch (e: Throwable) {
|
||||||
|
page.status = Page.State.ERROR
|
||||||
|
if (e is CancellationException) {
|
||||||
|
throw e
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun HttpSource.getImageUrl(page: ReaderPage): Observable<ReaderPage> {
|
|
||||||
page.status = Page.State.LOAD_PAGE
|
|
||||||
return fetchImageUrl(page)
|
|
||||||
.doOnError { page.status = Page.State.ERROR }
|
|
||||||
.onErrorReturn { null }
|
|
||||||
.doOnNext { page.imageUrl = it }
|
|
||||||
.map { page }
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an observable of the page that gets the image from the chapter or fallbacks to
|
|
||||||
* network and copies it to the cache calling [cacheImage].
|
|
||||||
*
|
|
||||||
* @param page the page.
|
|
||||||
*/
|
|
||||||
private fun HttpSource.getCachedImage(page: ReaderPage): Observable<ReaderPage> {
|
|
||||||
val imageUrl = page.imageUrl ?: return Observable.just(page)
|
|
||||||
|
|
||||||
return Observable.just(page)
|
|
||||||
.flatMap {
|
|
||||||
if (!chapterCache.isImageInCache(imageUrl)) {
|
|
||||||
cacheImage(page)
|
|
||||||
} else {
|
|
||||||
Observable.just(page)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.doOnNext {
|
|
||||||
page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
|
|
||||||
page.status = Page.State.READY
|
|
||||||
}
|
|
||||||
.doOnError { page.status = Page.State.ERROR }
|
|
||||||
.onErrorReturn { page }
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an observable of the page that downloads the image to [ChapterCache].
|
|
||||||
*
|
|
||||||
* @param page the page.
|
|
||||||
*/
|
|
||||||
private fun HttpSource.cacheImage(page: ReaderPage): Observable<ReaderPage> {
|
|
||||||
page.status = Page.State.DOWNLOAD_IMAGE
|
|
||||||
return fetchImage(page)
|
|
||||||
.doOnNext { chapterCache.putImageToCache(page.imageUrl!!, it) }
|
|
||||||
.map { page }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue