Remove usage of RxJava from LibraryUpdateService

This commit is contained in:
arkon 2021-01-23 11:20:16 -05:00
parent 628bd5d6b4
commit 86b9d7e843

View file

@ -27,15 +27,16 @@ import eu.kanade.tachiyomi.source.model.toSChapter
import eu.kanade.tachiyomi.source.model.toSManga
import eu.kanade.tachiyomi.util.chapter.NoChaptersException
import eu.kanade.tachiyomi.util.chapter.syncChaptersWithSource
import eu.kanade.tachiyomi.util.lang.runAsObservable
import eu.kanade.tachiyomi.util.lang.launchIO
import eu.kanade.tachiyomi.util.prepUpdateCover
import eu.kanade.tachiyomi.util.shouldDownloadNewChapters
import eu.kanade.tachiyomi.util.storage.getUriCompat
import eu.kanade.tachiyomi.util.system.acquireWakeLock
import eu.kanade.tachiyomi.util.system.isServiceRunning
import rx.Observable
import rx.Subscription
import rx.schedulers.Schedulers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.cancel
import timber.log.Timber
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
@ -59,17 +60,11 @@ class LibraryUpdateService(
val coverCache: CoverCache = Injekt.get()
) : Service() {
/**
* Wake lock that will be held until the service is destroyed.
*/
private lateinit var wakeLock: PowerManager.WakeLock
private lateinit var notifier: LibraryUpdateNotifier
private lateinit var scope: CoroutineScope
/**
* Subscription where the update is done.
*/
private var subscription: Subscription? = null
private var updateJob: Job? = null
/**
* Defines what should be updated within a service execution.
@ -144,6 +139,7 @@ class LibraryUpdateService(
override fun onCreate() {
super.onCreate()
scope = MainScope()
notifier = LibraryUpdateNotifier(this)
wakeLock = acquireWakeLock(javaClass.name)
@ -155,7 +151,8 @@ class LibraryUpdateService(
* lock.
*/
override fun onDestroy() {
subscription?.unsubscribe()
scope?.cancel()
updateJob?.cancel()
if (wakeLock.isHeld) {
wakeLock.release()
}
@ -183,34 +180,27 @@ class LibraryUpdateService(
?: return START_NOT_STICKY
// Unsubscribe from any previous subscription if needed.
subscription?.unsubscribe()
updateJob?.cancel()
// Update favorite manga. Destroy service when completed or in case of an error.
subscription = Observable
.defer {
val selectedScheme = preferences.libraryUpdatePrioritization().get()
val mangaList = getMangaToUpdate(intent, target)
.sortedWith(rankingScheme[selectedScheme])
// Update either chapter list or manga details.
updateJob = scope.launchIO {
try {
when (target) {
Target.CHAPTERS -> updateChapterList(mangaList)
Target.COVERS -> updateCovers(mangaList)
Target.TRACKING -> updateTrackings(mangaList)
}
}
.subscribeOn(Schedulers.io())
.subscribe(
{
},
{
Timber.e(it)
} catch (e: Throwable) {
Timber.e(e)
stopSelf(startId)
},
{
} finally {
stopSelf(startId)
}
)
}
return START_REDELIVER_INTENT
}
@ -253,7 +243,7 @@ class LibraryUpdateService(
* @param mangaToUpdate the list to update
* @return an observable delivering the progress of each update.
*/
fun updateChapterList(mangaToUpdate: List<LibraryManga>): Observable<LibraryManga> {
suspend fun updateChapterList(mangaToUpdate: List<LibraryManga>) {
// Initialize the variables holding the progress of the updates.
val count = AtomicInteger(0)
// List containing new updates
@ -263,49 +253,44 @@ class LibraryUpdateService(
// Boolean to determine if DownloadManager has downloads
var hasDownloads = false
// Emit each manga and update it sequentially.
return Observable.from(mangaToUpdate)
mangaToUpdate
.map { manga ->
// Notify manga that will update.
.doOnNext { notifier.showProgressNotification(it, count.andIncrement, mangaToUpdate.size) }
notifier.showProgressNotification(manga, count.andIncrement, mangaToUpdate.size)
// Update the chapters of the manga
.concatMap { manga ->
updateManga(manga)
try {
val newChapters = updateManga(manga).first
Pair(manga, newChapters)
} catch (e: Throwable) {
// If there's any error, return empty update and continue.
.onErrorReturn {
val errorMessage = if (it is NoChaptersException) {
val errorMessage = if (e is NoChaptersException) {
getString(R.string.no_chapters_error)
} else {
it.message
e.message
}
failedUpdates.add(Pair(manga, errorMessage))
Pair(emptyList(), emptyList())
Pair(manga, emptyList())
}
}
// Filter out mangas without new chapters (or failed).
.filter { (first) -> first.isNotEmpty() }
.doOnNext {
.filter { (_, newChapters) -> newChapters.isNotEmpty() }
.forEach { (manga, newChapters) ->
if (manga.shouldDownloadNewChapters(db, preferences)) {
downloadChapters(manga, it.first)
downloadChapters(manga, newChapters)
hasDownloads = true
}
}
// Convert to the manga that contains new chapters.
.map {
newUpdates.add(
Pair(
manga,
(
it.first.sortedByDescending { ch -> ch.source_order }
.toTypedArray()
newChapters.sortedByDescending { ch -> ch.source_order }.toTypedArray()
)
)
}
}
// Add manga with new chapters to the list.
.doOnNext { manga ->
// Add to the list
newUpdates.add(manga)
}
// Notify result of the overall update.
.doOnCompleted {
notifier.cancelProgressNotification()
if (newUpdates.isNotEmpty()) {
@ -323,8 +308,6 @@ class LibraryUpdateService(
)
}
}
.map { (first) -> first }
}
private fun downloadChapters(manga: Manga, chapters: List<Chapter>) {
// We don't want to start downloading while the library is updating, because websites
@ -338,12 +321,11 @@ class LibraryUpdateService(
* @param manga the manga to update.
* @return a pair of the inserted and removed chapters.
*/
fun updateManga(manga: Manga): Observable<Pair<List<Chapter>, List<Chapter>>> {
suspend fun updateManga(manga: Manga): Pair<List<Chapter>, List<Chapter>> {
val source = sourceManager.getOrStub(manga.source)
// Update manga details metadata in the background
if (preferences.autoUpdateMetadata()) {
runAsObservable({
val updatedManga = source.getMangaDetails(manga.toMangaInfo())
val sManga = updatedManga.toSManga()
// Avoid "losing" existing cover
@ -355,32 +337,22 @@ class LibraryUpdateService(
manga.copyFrom(sManga)
db.insertManga(manga).executeAsBlocking()
manga
})
.onErrorResumeNext { Observable.just(manga) }
.subscribeOn(Schedulers.io())
.subscribe()
}
return runAsObservable({
source.getChapterList(manga.toMangaInfo())
val chapters = source.getChapterList(manga.toMangaInfo())
.map { it.toSChapter() }
})
.map { syncChaptersWithSource(db, it, manga, source) }
return syncChaptersWithSource(db, chapters, manga, source)
}
private fun updateCovers(mangaToUpdate: List<LibraryManga>): Observable<LibraryManga> {
private suspend fun updateCovers(mangaToUpdate: List<LibraryManga>) {
var count = 0
return Observable.from(mangaToUpdate)
.doOnNext {
notifier.showProgressNotification(it, count++, mangaToUpdate.size)
}
.flatMap { manga ->
val source = sourceManager.get(manga.source)
?: return@flatMap Observable.empty<LibraryManga>()
mangaToUpdate.forEach { manga ->
notifier.showProgressNotification(manga, count++, mangaToUpdate.size)
runAsObservable({
sourceManager.get(manga.source)?.let { source ->
try {
val networkManga = source.getMangaDetails(manga.toMangaInfo())
val sManga = networkManga.toSManga()
manga.prepUpdateCover(coverCache, sManga, true)
@ -388,50 +360,47 @@ class LibraryUpdateService(
manga.thumbnail_url = it
db.insertManga(manga).executeAsBlocking()
}
manga
})
.onErrorReturn { manga }
} catch (e: Throwable) {
// Ignore errors and continue
Timber.e(e)
}
.doOnCompleted {
}
}
notifier.cancelProgressNotification()
}
}
/**
* Method that updates the metadata of the connected tracking services. It's called in a
* background thread, so it's safe to do heavy operations or network calls here.
*/
private fun updateTrackings(mangaToUpdate: List<LibraryManga>): Observable<LibraryManga> {
private suspend fun updateTrackings(mangaToUpdate: List<LibraryManga>) {
// Initialize the variables holding the progress of the updates.
var count = 0
val loggedServices = trackManager.services.filter { it.isLogged }
// Emit each manga and update it sequentially.
return Observable.from(mangaToUpdate)
mangaToUpdate.forEach { manga ->
// Notify manga that will update.
.doOnNext { notifier.showProgressNotification(it, count++, mangaToUpdate.size) }
// Update the tracking details.
.concatMap { manga ->
val tracks = db.getTracks(manga).executeAsBlocking()
notifier.showProgressNotification(manga, count++, mangaToUpdate.size)
Observable.from(tracks)
.concatMap { track ->
// Update the tracking details.
db.getTracks(manga).executeAsBlocking().forEach { track ->
val service = trackManager.getService(track.sync_id)
if (service != null && service in loggedServices) {
runAsObservable({ service.refresh(track) })
.doOnNext { db.insertTrack(it).executeAsBlocking() }
.onErrorReturn { track }
} else {
Observable.empty()
try {
val updatedTrack = service.refresh(track)
db.insertTrack(updatedTrack).executeAsBlocking()
} catch (e: Throwable) {
// Ignore errors and continue
Timber.e(e)
}
}
.map { manga }
}
.doOnCompleted {
}
notifier.cancelProgressNotification()
}
}
/**
* Writes basic file of update errors to cache dir.