Inline DownloadQueue into Downloader (#9159)
* Move statusFlow and progressFlow to DownloadManager * Inline DownloadQueue into Downloader * Move reorderQueue implementation to Downloader
This commit is contained in:
parent
f03a834136
commit
b41565f879
7 changed files with 134 additions and 151 deletions
|
@ -4,10 +4,17 @@ import android.content.Context
|
|||
import eu.kanade.domain.download.service.DownloadPreferences
|
||||
import eu.kanade.tachiyomi.R
|
||||
import eu.kanade.tachiyomi.data.download.model.Download
|
||||
import eu.kanade.tachiyomi.data.download.model.DownloadQueue
|
||||
import eu.kanade.tachiyomi.source.Source
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
import eu.kanade.tachiyomi.source.model.Page
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.drop
|
||||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flatMapLatest
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import logcat.LogPriority
|
||||
import tachiyomi.core.util.lang.launchIO
|
||||
|
@ -42,11 +49,8 @@ class DownloadManager(
|
|||
*/
|
||||
private val pendingDeleter = DownloadPendingDeleter(context)
|
||||
|
||||
/**
|
||||
* Downloads queue, where the pending chapters are stored.
|
||||
*/
|
||||
val queue: DownloadQueue
|
||||
get() = downloader.queue
|
||||
val queueState
|
||||
get() = downloader.queueState
|
||||
|
||||
// For use by DownloadService only
|
||||
fun downloaderStart() = downloader.start()
|
||||
|
@ -85,7 +89,7 @@ class DownloadManager(
|
|||
* @param chapterId the chapter to check.
|
||||
*/
|
||||
fun getQueuedDownloadOrNull(chapterId: Long): Download? {
|
||||
return queue.find { it.chapter.id == chapterId }
|
||||
return queueState.value.find { it: Download -> it.chapter.id == chapterId }
|
||||
}
|
||||
|
||||
fun startDownloadNow(chapterId: Long?) {
|
||||
|
@ -93,7 +97,7 @@ class DownloadManager(
|
|||
val download = getQueuedDownloadOrNull(chapterId)
|
||||
// If not in queue try to start a new download
|
||||
val toAdd = download ?: runBlocking { Download.fromChapterId(chapterId) } ?: return
|
||||
val queue = queue.toMutableList()
|
||||
val queue = queueState.value.toMutableList()
|
||||
download?.let { queue.remove(it) }
|
||||
queue.add(0, toAdd)
|
||||
reorderQueue(queue)
|
||||
|
@ -112,21 +116,7 @@ class DownloadManager(
|
|||
* @param downloads value to set the download queue to
|
||||
*/
|
||||
fun reorderQueue(downloads: List<Download>) {
|
||||
val wasRunning = downloader.isRunning
|
||||
|
||||
if (downloads.isEmpty()) {
|
||||
downloader.clearQueue()
|
||||
downloader.stop()
|
||||
return
|
||||
}
|
||||
|
||||
downloader.pause()
|
||||
queue.clear()
|
||||
queue.addAll(downloads)
|
||||
|
||||
if (wasRunning) {
|
||||
downloader.start()
|
||||
}
|
||||
downloader.updateQueue(downloads)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -147,7 +137,7 @@ class DownloadManager(
|
|||
*/
|
||||
fun addDownloadsToStartOfQueue(downloads: List<Download>) {
|
||||
if (downloads.isEmpty()) return
|
||||
queue.toMutableList().apply {
|
||||
queueState.value.toMutableList().apply {
|
||||
addAll(0, downloads)
|
||||
reorderQueue(this)
|
||||
}
|
||||
|
@ -251,7 +241,7 @@ class DownloadManager(
|
|||
fun deleteManga(manga: Manga, source: Source, removeQueued: Boolean = true) {
|
||||
launchIO {
|
||||
if (removeQueued) {
|
||||
queue.remove(manga)
|
||||
downloader.removeFromQueue(manga)
|
||||
}
|
||||
provider.findMangaDir(manga.title, source)?.delete()
|
||||
cache.removeManga(manga)
|
||||
|
@ -271,12 +261,12 @@ class DownloadManager(
|
|||
downloader.pause()
|
||||
}
|
||||
|
||||
queue.remove(chapters)
|
||||
downloader.removeFromQueue(chapters)
|
||||
|
||||
if (wasRunning) {
|
||||
if (queue.isEmpty()) {
|
||||
if (queueState.value.isEmpty()) {
|
||||
downloader.stop()
|
||||
} else if (queue.isNotEmpty()) {
|
||||
} else if (queueState.value.isNotEmpty()) {
|
||||
downloader.start()
|
||||
}
|
||||
}
|
||||
|
@ -374,4 +364,33 @@ class DownloadManager(
|
|||
chapters
|
||||
}
|
||||
}
|
||||
|
||||
fun statusFlow(): Flow<Download> = queueState
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.statusFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart {
|
||||
emitAll(
|
||||
queueState.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow(),
|
||||
)
|
||||
}
|
||||
|
||||
fun progressFlow(): Flow<Download> = queueState
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.progressFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart {
|
||||
emitAll(
|
||||
queueState.value.filter { download -> download.status == Download.State.DOWNLOADING }
|
||||
.asFlow(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ import eu.kanade.domain.manga.model.getComicInfo
|
|||
import eu.kanade.tachiyomi.R
|
||||
import eu.kanade.tachiyomi.data.cache.ChapterCache
|
||||
import eu.kanade.tachiyomi.data.download.model.Download
|
||||
import eu.kanade.tachiyomi.data.download.model.DownloadQueue
|
||||
import eu.kanade.tachiyomi.data.library.LibraryUpdateNotifier
|
||||
import eu.kanade.tachiyomi.data.notification.NotificationHandler
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
|
@ -25,12 +24,15 @@ import kotlinx.coroutines.CancellationException
|
|||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.flatMapMerge
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.retryWhen
|
||||
import kotlinx.coroutines.flow.update
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import logcat.LogPriority
|
||||
import nl.adaptivity.xmlutil.serialization.XML
|
||||
|
@ -59,7 +61,7 @@ import java.util.zip.ZipOutputStream
|
|||
/**
|
||||
* This class is the one in charge of downloading chapters.
|
||||
*
|
||||
* Its [queue] contains the list of chapters to download. In order to download them, the downloader
|
||||
* Its queue contains the list of chapters to download. In order to download them, the downloader
|
||||
* subscription must be running and the list of chapters must be sent to them by [downloadsRelay].
|
||||
*
|
||||
* The queue manipulation must be done in one thread (currently the main thread) to avoid unexpected
|
||||
|
@ -88,7 +90,8 @@ class Downloader(
|
|||
/**
|
||||
* Queue where active downloads are kept.
|
||||
*/
|
||||
val queue = DownloadQueue(store)
|
||||
val _queueState = MutableStateFlow<List<Download>>(emptyList())
|
||||
val queueState = _queueState.asStateFlow()
|
||||
|
||||
/**
|
||||
* Notifier for the downloader state and progress.
|
||||
|
@ -120,7 +123,7 @@ class Downloader(
|
|||
init {
|
||||
launchNow {
|
||||
val chapters = async { store.restore() }
|
||||
queue.addAll(chapters.await())
|
||||
addAllToQueue(chapters.await())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -131,13 +134,13 @@ class Downloader(
|
|||
* @return true if the downloader is started, false otherwise.
|
||||
*/
|
||||
fun start(): Boolean {
|
||||
if (subscription != null || queue.isEmpty()) {
|
||||
if (subscription != null || queueState.value.isEmpty()) {
|
||||
return false
|
||||
}
|
||||
|
||||
initializeSubscription()
|
||||
|
||||
val pending = queue.filter { it.status != Download.State.DOWNLOADED }
|
||||
val pending = queueState.value.filter { it: Download -> it.status != Download.State.DOWNLOADED }
|
||||
pending.forEach { if (it.status != Download.State.QUEUE) it.status = Download.State.QUEUE }
|
||||
|
||||
isPaused = false
|
||||
|
@ -151,7 +154,7 @@ class Downloader(
|
|||
*/
|
||||
fun stop(reason: String? = null) {
|
||||
destroySubscription()
|
||||
queue
|
||||
queueState.value
|
||||
.filter { it.status == Download.State.DOWNLOADING }
|
||||
.forEach { it.status = Download.State.ERROR }
|
||||
|
||||
|
@ -160,7 +163,7 @@ class Downloader(
|
|||
return
|
||||
}
|
||||
|
||||
if (isPaused && queue.isNotEmpty()) {
|
||||
if (isPaused && queueState.value.isNotEmpty()) {
|
||||
notifier.onPaused()
|
||||
} else {
|
||||
notifier.onComplete()
|
||||
|
@ -179,7 +182,7 @@ class Downloader(
|
|||
*/
|
||||
fun pause() {
|
||||
destroySubscription()
|
||||
queue
|
||||
queueState.value
|
||||
.filter { it.status == Download.State.DOWNLOADING }
|
||||
.forEach { it.status = Download.State.QUEUE }
|
||||
isPaused = true
|
||||
|
@ -191,7 +194,7 @@ class Downloader(
|
|||
fun clearQueue() {
|
||||
destroySubscription()
|
||||
|
||||
queue.clear()
|
||||
_clearQueue()
|
||||
notifier.dismissProgress()
|
||||
}
|
||||
|
||||
|
@ -250,7 +253,7 @@ class Downloader(
|
|||
}
|
||||
|
||||
val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchIO
|
||||
val wasEmpty = queue.isEmpty()
|
||||
val wasEmpty = queueState.value.isEmpty()
|
||||
// Called in background thread, the operation can be slow with SAF.
|
||||
val chaptersWithoutDir = async {
|
||||
chapters
|
||||
|
@ -263,12 +266,12 @@ class Downloader(
|
|||
// Runs in main thread (synchronization needed).
|
||||
val chaptersToQueue = chaptersWithoutDir.await()
|
||||
// Filter out those already enqueued.
|
||||
.filter { chapter -> queue.none { it.chapter.id == chapter.id } }
|
||||
.filter { chapter -> queueState.value.none { it: Download -> it.chapter.id == chapter.id } }
|
||||
// Create a download for each one.
|
||||
.map { Download(source, manga, it) }
|
||||
|
||||
if (chaptersToQueue.isNotEmpty()) {
|
||||
queue.addAll(chaptersToQueue)
|
||||
addAllToQueue(chaptersToQueue)
|
||||
|
||||
if (isRunning) {
|
||||
// Send the list of downloads to the downloader.
|
||||
|
@ -277,8 +280,8 @@ class Downloader(
|
|||
|
||||
// Start downloader if needed
|
||||
if (autoStart && wasEmpty) {
|
||||
val queuedDownloads = queue.count { it.source !is UnmeteredSource }
|
||||
val maxDownloadsFromSource = queue
|
||||
val queuedDownloads = queueState.value.count { it: Download -> it.source !is UnmeteredSource }
|
||||
val maxDownloadsFromSource = queueState.value
|
||||
.groupBy { it.source }
|
||||
.filterKeys { it !is UnmeteredSource }
|
||||
.maxOfOrNull { it.value.size }
|
||||
|
@ -636,7 +639,7 @@ class Downloader(
|
|||
// Delete successful downloads from queue
|
||||
if (download.status == Download.State.DOWNLOADED) {
|
||||
// Remove downloaded chapter from queue
|
||||
queue.remove(download)
|
||||
removeFromQueue(download)
|
||||
}
|
||||
if (areAllDownloadsFinished()) {
|
||||
stop()
|
||||
|
@ -647,7 +650,67 @@ class Downloader(
|
|||
* Returns true if all the queued downloads are in DOWNLOADED or ERROR state.
|
||||
*/
|
||||
private fun areAllDownloadsFinished(): Boolean {
|
||||
return queue.none { it.status.value <= Download.State.DOWNLOADING.value }
|
||||
return queueState.value.none { it: Download -> it.status.value <= Download.State.DOWNLOADING.value }
|
||||
}
|
||||
|
||||
fun addAllToQueue(downloads: List<Download>) {
|
||||
_queueState.update {
|
||||
downloads.forEach { download ->
|
||||
download.status = Download.State.QUEUE
|
||||
}
|
||||
store.addAll(downloads)
|
||||
it + downloads
|
||||
}
|
||||
}
|
||||
|
||||
fun removeFromQueue(download: Download) {
|
||||
_queueState.update {
|
||||
store.remove(download)
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
it - download
|
||||
}
|
||||
}
|
||||
|
||||
fun removeFromQueue(chapters: List<Chapter>) {
|
||||
chapters.forEach { chapter ->
|
||||
queueState.value.find { it.chapter.id == chapter.id }?.let { removeFromQueue(it) }
|
||||
}
|
||||
}
|
||||
|
||||
fun removeFromQueue(manga: Manga) {
|
||||
queueState.value.filter { it.manga.id == manga.id }.forEach { removeFromQueue(it) }
|
||||
}
|
||||
|
||||
fun _clearQueue() {
|
||||
_queueState.update {
|
||||
it.forEach { download ->
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
}
|
||||
store.clear()
|
||||
emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
fun updateQueue(downloads: List<Download>) {
|
||||
val wasRunning = isRunning
|
||||
|
||||
if (downloads.isEmpty()) {
|
||||
clearQueue()
|
||||
stop()
|
||||
return
|
||||
}
|
||||
|
||||
pause()
|
||||
_clearQueue()
|
||||
addAllToQueue(downloads)
|
||||
|
||||
if (wasRunning) {
|
||||
start()
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
|
|
@ -1,99 +0,0 @@
|
|||
package eu.kanade.tachiyomi.data.download.model
|
||||
|
||||
import eu.kanade.tachiyomi.data.download.DownloadStore
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.drop
|
||||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flatMapLatest
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.flow.update
|
||||
import tachiyomi.domain.chapter.model.Chapter
|
||||
import tachiyomi.domain.manga.model.Manga
|
||||
|
||||
class DownloadQueue(
|
||||
private val store: DownloadStore,
|
||||
) {
|
||||
private val _state = MutableStateFlow<List<Download>>(emptyList())
|
||||
val state = _state.asStateFlow()
|
||||
|
||||
fun addAll(downloads: List<Download>) {
|
||||
_state.update {
|
||||
downloads.forEach { download ->
|
||||
download.status = Download.State.QUEUE
|
||||
}
|
||||
store.addAll(downloads)
|
||||
it + downloads
|
||||
}
|
||||
}
|
||||
|
||||
fun remove(download: Download) {
|
||||
_state.update {
|
||||
store.remove(download)
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
it - download
|
||||
}
|
||||
}
|
||||
|
||||
fun remove(chapter: Chapter) {
|
||||
_state.value.find { it.chapter.id == chapter.id }?.let { remove(it) }
|
||||
}
|
||||
|
||||
fun remove(chapters: List<Chapter>) {
|
||||
chapters.forEach(::remove)
|
||||
}
|
||||
|
||||
fun remove(manga: Manga) {
|
||||
_state.value.filter { it.manga.id == manga.id }.forEach { remove(it) }
|
||||
}
|
||||
|
||||
fun clear() {
|
||||
_state.update {
|
||||
it.forEach { download ->
|
||||
if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
|
||||
download.status = Download.State.NOT_DOWNLOADED
|
||||
}
|
||||
}
|
||||
store.clear()
|
||||
emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
fun statusFlow(): Flow<Download> = state
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.statusFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart { emitAll(getActiveDownloads()) }
|
||||
|
||||
fun progressFlow(): Flow<Download> = state
|
||||
.flatMapLatest { downloads ->
|
||||
downloads
|
||||
.map { download ->
|
||||
download.progressFlow.drop(1).map { download }
|
||||
}
|
||||
.merge()
|
||||
}
|
||||
.onStart { emitAll(getActiveDownloads()) }
|
||||
|
||||
private fun getActiveDownloads(): Flow<Download> =
|
||||
_state.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow()
|
||||
|
||||
fun count(predicate: (Download) -> Boolean) = _state.value.count(predicate)
|
||||
fun filter(predicate: (Download) -> Boolean) = _state.value.filter(predicate)
|
||||
fun find(predicate: (Download) -> Boolean) = _state.value.find(predicate)
|
||||
fun <K> groupBy(keySelector: (Download) -> K) = _state.value.groupBy(keySelector)
|
||||
fun isEmpty() = _state.value.isEmpty()
|
||||
fun isNotEmpty() = _state.value.isNotEmpty()
|
||||
fun none(predicate: (Download) -> Boolean) = _state.value.none(predicate)
|
||||
fun toMutableList() = _state.value.toMutableList()
|
||||
}
|
|
@ -111,7 +111,7 @@ class DownloadQueueScreenModel(
|
|||
|
||||
init {
|
||||
coroutineScope.launch {
|
||||
downloadManager.queue.state
|
||||
downloadManager.queueState
|
||||
.map { downloads ->
|
||||
downloads
|
||||
.groupBy { it.source }
|
||||
|
@ -136,8 +136,8 @@ class DownloadQueueScreenModel(
|
|||
val isDownloaderRunning
|
||||
get() = downloadManager.isDownloaderRunning
|
||||
|
||||
fun getDownloadStatusFlow() = downloadManager.queue.statusFlow()
|
||||
fun getDownloadProgressFlow() = downloadManager.queue.progressFlow()
|
||||
fun getDownloadStatusFlow() = downloadManager.statusFlow()
|
||||
fun getDownloadProgressFlow() = downloadManager.progressFlow()
|
||||
|
||||
fun startDownloads() {
|
||||
downloadManager.startDownloads()
|
||||
|
|
|
@ -427,7 +427,7 @@ class MangaInfoScreenModel(
|
|||
|
||||
private fun observeDownloads() {
|
||||
coroutineScope.launchIO {
|
||||
downloadManager.queue.statusFlow()
|
||||
downloadManager.statusFlow()
|
||||
.filter { it.manga.id == successState?.manga?.id }
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collect {
|
||||
|
@ -438,7 +438,7 @@ class MangaInfoScreenModel(
|
|||
}
|
||||
|
||||
coroutineScope.launchIO {
|
||||
downloadManager.queue.progressFlow()
|
||||
downloadManager.progressFlow()
|
||||
.filter { it.manga.id == successState?.manga?.id }
|
||||
.catch { error -> logcat(LogPriority.ERROR, error) }
|
||||
.collect {
|
||||
|
|
|
@ -94,7 +94,7 @@ private class MoreScreenModel(
|
|||
coroutineScope.launchIO {
|
||||
combine(
|
||||
downloadManager.isDownloaderRunning,
|
||||
downloadManager.queue.state,
|
||||
downloadManager.queueState,
|
||||
) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) }
|
||||
.collectLatest { (isDownloading, downloadQueueSize) ->
|
||||
val pendingDownloadExists = downloadQueueSize != 0
|
||||
|
|
|
@ -99,7 +99,7 @@ class UpdatesScreenModel(
|
|||
}
|
||||
|
||||
coroutineScope.launchIO {
|
||||
merge(downloadManager.queue.statusFlow(), downloadManager.queue.progressFlow())
|
||||
merge(downloadManager.statusFlow(), downloadManager.progressFlow())
|
||||
.catch { logcat(LogPriority.ERROR, it) }
|
||||
.collect(this@UpdatesScreenModel::updateDownloadState)
|
||||
}
|
||||
|
|
Reference in a new issue