diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt index 3ac318147..49cc98534 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt @@ -2,7 +2,6 @@ package eu.kanade.tachiyomi.data.download import android.content.Context import com.hippo.unifile.UniFile -import com.jakewharton.rxrelay.PublishRelay import eu.kanade.domain.chapter.model.toSChapter import eu.kanade.domain.manga.model.getComicInfo import eu.kanade.tachiyomi.R @@ -17,26 +16,31 @@ import eu.kanade.tachiyomi.util.storage.DiskUtil import eu.kanade.tachiyomi.util.storage.DiskUtil.NOMEDIA_FILE import eu.kanade.tachiyomi.util.storage.saveTo import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.retryWhen +import kotlinx.coroutines.flow.transformLatest import kotlinx.coroutines.flow.update -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope import logcat.LogPriority import nl.adaptivity.xmlutil.serialization.XML import okhttp3.Response -import rx.Observable -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers import tachiyomi.core.metadata.comicinfo.COMIC_INFO_FILE import tachiyomi.core.metadata.comicinfo.ComicInfo import tachiyomi.core.util.lang.awaitSingle @@ -61,11 +65,7 @@ import java.util.zip.ZipOutputStream /** * This class is the one in charge of downloading chapters. * - * Its queue contains the list of chapters to download. In order to download them, the downloader - * subscription must be running and the list of chapters must be sent to them by [downloadsRelay]. - * - * The queue manipulation must be done in one thread (currently the main thread) to avoid unexpected - * behavior, but it's safe to read it from multiple threads. + * Its queue contains the list of chapters to download. */ class Downloader( private val context: Context, @@ -93,21 +93,14 @@ class Downloader( */ private val notifier by lazy { DownloadNotifier(context) } - /** - * Downloader subscription. - */ - private var subscription: Subscription? = null - - /** - * Relay to send a list of downloads to the downloader. - */ - private val downloadsRelay = PublishRelay.create>() + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + private var downloaderJob: Job? = null /** * Whether the downloader is running. */ val isRunning: Boolean - get() = subscription != null + get() = downloaderJob?.isActive ?: false /** * Whether the downloader is paused @@ -129,18 +122,17 @@ class Downloader( * @return true if the downloader is started, false otherwise. */ fun start(): Boolean { - if (subscription != null || queueState.value.isEmpty()) { + if (isRunning || queueState.value.isEmpty()) { return false } - initializeSubscription() - val pending = queueState.value.filter { it.status != Download.State.DOWNLOADED } pending.forEach { if (it.status != Download.State.QUEUE) it.status = Download.State.QUEUE } isPaused = false - downloadsRelay.call(pending) + launchDownloaderJob() + return pending.isNotEmpty() } @@ -148,7 +140,7 @@ class Downloader( * Stops the downloader. */ fun stop(reason: String? = null) { - destroySubscription() + cancelDownloaderJob() queueState.value .filter { it.status == Download.State.DOWNLOADING } .forEach { it.status = Download.State.ERROR } @@ -176,7 +168,7 @@ class Downloader( * Pauses the downloader */ fun pause() { - destroySubscription() + cancelDownloaderJob() queueState.value .filter { it.status == Download.State.DOWNLOADING } .forEach { it.status = Download.State.QUEUE } @@ -187,7 +179,7 @@ class Downloader( * Removes everything from the queue. */ fun clearQueue() { - destroySubscription() + cancelDownloaderJob() _clearQueue() notifier.dismissProgress() @@ -196,49 +188,74 @@ class Downloader( /** * Prepares the subscriptions to start downloading. */ - private fun initializeSubscription() { - if (subscription != null) return + private fun launchDownloaderJob() { + if (isRunning) return - subscription = downloadsRelay.concatMapIterable { it } - // Concurrently download from 5 different sources - .groupBy { it.source } - .flatMap( - { bySource -> - bySource.concatMap { download -> - Observable.fromCallable { - runBlocking { downloadChapter(download) } - download - }.subscribeOn(Schedulers.io()) + downloaderJob = scope.launch { + val activeDownloadsFlow = queueState.transformLatest { queue -> + while (true) { + val activeDownloads = queue.asSequence() + .filter { it.status.value <= Download.State.DOWNLOADING.value } // Ignore completed downloads, leave them in the queue + .groupBy { it.source } + .toList().take(5) // Concurrently download from 5 different sources + .map { (_, downloads) -> downloads.first() } + emit(activeDownloads) + + if (activeDownloads.isEmpty()) break + // Suspend until a download enters the ERROR state + val activeDownloadsErroredFlow = + combine(activeDownloads.map(Download::statusFlow)) { states -> + states.contains(Download.State.ERROR) + }.filter { it } + activeDownloadsErroredFlow.first() + } + }.distinctUntilChanged() + + // Use supervisorScope to cancel child jobs when the downloader job is cancelled + supervisorScope { + val downloadJobs = mutableMapOf() + + activeDownloadsFlow.collectLatest { activeDownloads -> + val downloadJobsToStop = downloadJobs.filter { it.key !in activeDownloads } + downloadJobsToStop.forEach { (download, job) -> + job.cancel() + downloadJobs.remove(download) } - }, - 5, - ) - .onBackpressureLatest() - .observeOn(AndroidSchedulers.mainThread()) - .subscribe( - { - // Remove successful download from queue - if (it.status == Download.State.DOWNLOADED) { - removeFromQueue(it) + + val downloadsToStart = activeDownloads.filter { it !in downloadJobs } + downloadsToStart.forEach { download -> + downloadJobs[download] = launchDownloadJob(download) } - if (areAllDownloadsFinished()) { - stop() - } - }, - { error -> - logcat(LogPriority.ERROR, error) - notifier.onError(error.message) - stop() - }, - ) + } + } + } + } + + private fun CoroutineScope.launchDownloadJob(download: Download) = launchIO { + try { + downloadChapter(download) + + // Remove successful download from queue + if (download.status == Download.State.DOWNLOADED) { + removeFromQueue(download) + } + if (areAllDownloadsFinished()) { + stop() + } + } catch (e: Throwable) { + if (e is CancellationException) throw e + logcat(LogPriority.ERROR, e) + notifier.onError(e.message) + stop() + } } /** * Destroys the downloader subscriptions. */ - private fun destroySubscription() { - subscription?.unsubscribe() - subscription = null + private fun cancelDownloaderJob() { + downloaderJob?.cancel() + downloaderJob = null } /** @@ -255,17 +272,13 @@ class Downloader( val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchIO val wasEmpty = queueState.value.isEmpty() - // Called in background thread, the operation can be slow with SAF. - val chaptersWithoutDir = async { - chapters - // Filter out those already downloaded. - .filter { provider.findChapterDir(it.name, it.scanlator, manga.title, source) == null } - // Add chapters to queue from the start. - .sortedByDescending { it.sourceOrder } - } + val chaptersWithoutDir = chapters + // Filter out those already downloaded. + .filter { provider.findChapterDir(it.name, it.scanlator, manga.title, source) == null } + // Add chapters to queue from the start. + .sortedByDescending { it.sourceOrder } - // Runs in main thread (synchronization needed). - val chaptersToQueue = chaptersWithoutDir.await() + val chaptersToQueue = chaptersWithoutDir // Filter out those already enqueued. .filter { chapter -> queueState.value.none { it.chapter.id == chapter.id } } // Create a download for each one. @@ -274,11 +287,6 @@ class Downloader( if (chaptersToQueue.isNotEmpty()) { addAllToQueue(chaptersToQueue) - if (isRunning) { - // Send the list of downloads to the downloader. - downloadsRelay.call(chaptersToQueue) - } - // Start downloader if needed if (autoStart && wasEmpty) { val queuedDownloads = queueState.value.count { it.source !is UnmeteredSource } @@ -656,7 +664,7 @@ class Downloader( } } - private inline fun removeFromQueueByPredicate(predicate: (Download) -> Boolean) { + private inline fun removeFromQueueIf(predicate: (Download) -> Boolean) { _queueState.update { queue -> val downloads = queue.filter { predicate(it) } store.removeAll(downloads) @@ -671,11 +679,11 @@ class Downloader( fun removeFromQueue(chapters: List) { val chapterIds = chapters.map { it.id } - removeFromQueueByPredicate { it.chapter.id in chapterIds } + removeFromQueueIf { it.chapter.id in chapterIds } } fun removeFromQueue(manga: Manga) { - removeFromQueueByPredicate { it.manga.id == manga.id } + removeFromQueueIf { it.manga.id == manga.id } } private fun _clearQueue() { diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt index 3dafdc061..564a33633 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt @@ -22,8 +22,8 @@ data class Download( val source: HttpSource, val manga: Manga, val chapter: Chapter, - var pages: List? = null, ) { + var pages: List? = null val totalProgress: Int get() = pages?.sumOf(Page::progress) ?: 0