Replace some usages of RxJava

This commit is contained in:
arkon 2022-07-10 19:48:00 -04:00
parent cbcab5a545
commit 788583e66f
6 changed files with 144 additions and 145 deletions

View file

@ -1,10 +1,12 @@
package eu.kanade.tachiyomi.data.download.model
import com.jakewharton.rxrelay.PublishRelay
import eu.kanade.core.util.asFlow
import eu.kanade.domain.manga.model.Manga
import eu.kanade.tachiyomi.data.database.models.Chapter
import eu.kanade.tachiyomi.data.download.DownloadStore
import eu.kanade.tachiyomi.source.model.Page
import kotlinx.coroutines.flow.Flow
import rx.Observable
import rx.subjects.PublishSubject
import java.util.concurrent.CopyOnWriteArrayList
@ -72,8 +74,11 @@ class DownloadQueue(
fun getActiveDownloads(): Observable<Download> =
Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING }
@Deprecated("Use getStatusAsFlow instead")
fun getStatusObservable(): Observable<Download> = statusSubject.onBackpressureBuffer()
fun getStatusAsFlow(): Flow<Download> = getStatusObservable().asFlow()
fun getUpdatedObservable(): Observable<List<Download>> = updatedRelay.onBackpressureBuffer()
.startWith(Unit)
.map { this }
@ -84,6 +89,7 @@ class DownloadQueue(
}
}
@Deprecated("Use getProgressAsFlow instead")
fun getProgressObservable(): Observable<Download> {
return statusSubject.onBackpressureBuffer()
.startWith(getActiveDownloads())
@ -103,6 +109,10 @@ class DownloadQueue(
.filter { it.status == Download.State.DOWNLOADING }
}
fun getProgressAsFlow(): Flow<Download> {
return getProgressObservable().asFlow()
}
private fun setPagesSubject(pages: List<Page>?, subject: PublishSubject<Int>?) {
pages?.forEach { it.setStatusSubject(subject) }
}

View file

@ -57,13 +57,4 @@ open class BasePresenter<V> : RxPresenter<V>() {
* @param onError function to execute when the observable throws an error.
*/
fun <T> Observable<T>.subscribeLatestCache(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverLatestCache<T>()).subscribe(split(onNext, onError)).apply { add(this) }
/**
* Subscribes an observable with [deliverReplay] and adds it to the presenter's lifecycle
* subscription list.
*
* @param onNext function to execute when the observable emits an item.
* @param onError function to execute when the observable throws an error.
*/
fun <T> Observable<T>.subscribeReplay(onNext: (V, T) -> Unit, onError: ((V, Throwable) -> Unit) = { _, _ -> }) = compose(deliverReplay<T>()).subscribe(split(onNext, onError)).apply { add(this) }
}

View file

@ -51,15 +51,13 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import logcat.LogPriority
import rx.Subscription
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.util.Date
@ -112,7 +110,7 @@ open class BrowseSourcePresenter(
/**
* Subscription for the pager.
*/
private var pagerSubscription: Subscription? = null
private var pagerJob: Job? = null
/**
* Subscription for one request from the pager.
@ -129,7 +127,6 @@ open class BrowseSourcePresenter(
super.onCreate(savedState)
source = sourceManager.get(sourceId) as? CatalogueSource ?: return
sourceFilters = source.getFilterList()
if (savedState != null) {
@ -158,25 +155,37 @@ open class BrowseSourcePresenter(
pager = createPager(query, filters)
val sourceId = source.id
val sourceDisplayMode = prefs.sourceDisplayMode()
// Prepare the pager.
pagerSubscription?.let { remove(it) }
pagerSubscription = pager.results()
.observeOn(Schedulers.io())
.map { (first, second) -> first to second.map { networkToLocalManga(it, sourceId).toDomainManga()!! } }
.doOnNext { initializeMangas(it.second) }
.map { (first, second) -> first to second.map { SourceItem(it, sourceDisplayMode) } }
.observeOn(AndroidSchedulers.mainThread())
.subscribeReplay(
{ view, (page, mangas) ->
view.onAddPage(page, mangas)
},
{ _, error ->
pagerJob?.cancel()
pagerJob = presenterScope.launchIO {
pager.asFlow()
.map { (first, second) ->
first to second.map {
networkToLocalManga(
it,
sourceId,
).toDomainManga()!!
}
}
.onEach { initializeMangas(it.second) }
.map { (first, second) ->
first to second.map {
SourceItem(
it,
sourceDisplayMode,
)
}
}
.catch { error ->
logcat(LogPriority.ERROR, error)
},
)
}
.collectLatest { (page, mangas) ->
withUIContext {
view?.onAddPage(page, mangas)
}
}
}
// Request first page.
requestNext()

View file

@ -1,9 +1,10 @@
package eu.kanade.tachiyomi.ui.browse.source.browse
import com.jakewharton.rxrelay.PublishRelay
import eu.kanade.core.util.asFlow
import eu.kanade.tachiyomi.source.model.MangasPage
import eu.kanade.tachiyomi.source.model.SManga
import rx.Observable
import kotlinx.coroutines.flow.Flow
/**
* A general pager for source requests (latest updates, popular, search)
@ -15,8 +16,8 @@ abstract class Pager(var currentPage: Int = 1) {
protected val results: PublishRelay<Pair<Int, List<SManga>>> = PublishRelay.create()
fun results(): Observable<Pair<Int, List<SManga>>> {
return results.asObservable()
fun asFlow(): Flow<Pair<Int, List<SManga>>> {
return results.asObservable().asFlow()
}
abstract suspend fun requestNextPage()

View file

@ -59,6 +59,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.update
@ -66,9 +67,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.withContext
import logcat.LogPriority
import rx.Subscription
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.text.DateFormat
@ -119,8 +117,8 @@ class MangaPresenter(
/**
* Subscription to observe download status changes.
*/
private var observeDownloadsStatusSubscription: Subscription? = null
private var observeDownloadsPageSubscription: Subscription? = null
private var observeDownloadsStatusJob: Job? = null
private var observeDownloadsPageJob: Job? = null
private var _trackList: List<TrackItem> = emptyList()
val trackList get() = _trackList
@ -401,29 +399,29 @@ class MangaPresenter(
// Chapters list - start
private fun observeDownloads() {
observeDownloadsStatusSubscription?.let { remove(it) }
observeDownloadsStatusSubscription = downloadManager.queue.getStatusObservable()
.observeOn(Schedulers.io())
.onBackpressureBuffer()
.filter { download -> download.manga.id == successState?.manga?.id }
.observeOn(AndroidSchedulers.mainThread())
.subscribeLatestCache(
{ _, it -> updateDownloadState(it) },
{ _, error ->
logcat(LogPriority.ERROR, error)
},
)
observeDownloadsStatusJob?.cancel()
observeDownloadsStatusJob = presenterScope.launchIO {
downloadManager.queue.getStatusAsFlow()
.filter { it.manga.id == successState?.manga?.id }
.catch { error -> logcat(LogPriority.ERROR, error) }
.collectLatest {
withUIContext {
updateDownloadState(it)
}
}
}
observeDownloadsPageSubscription?.let { remove(it) }
observeDownloadsPageSubscription = downloadManager.queue.getProgressObservable()
.observeOn(Schedulers.io())
.onBackpressureBuffer()
.filter { download -> download.manga.id == successState?.manga?.id }
.observeOn(AndroidSchedulers.mainThread())
.subscribeLatestCache(
{ _, download -> updateDownloadState(download) },
{ _, error -> logcat(LogPriority.ERROR, error) },
)
observeDownloadsPageJob?.cancel()
observeDownloadsPageJob = presenterScope.launchIO {
downloadManager.queue.getProgressAsFlow()
.filter { it.manga.id == successState?.manga?.id }
.catch { error -> logcat(LogPriority.ERROR, error) }
.collectLatest {
withUIContext {
updateDownloadState(it)
}
}
}
}
private fun updateDownloadState(download: Download) {

View file

@ -17,31 +17,30 @@ import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
import eu.kanade.tachiyomi.ui.recent.DateSectionItem
import eu.kanade.tachiyomi.util.lang.launchIO
import eu.kanade.tachiyomi.util.lang.toDateKey
import eu.kanade.tachiyomi.util.lang.withUIContext
import eu.kanade.tachiyomi.util.system.logcat
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.map
import logcat.LogPriority
import rx.Observable
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import uy.kohesive.injekt.injectLazy
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.text.DateFormat
import java.util.Calendar
import java.util.Date
import java.util.TreeMap
class UpdatesPresenter : BasePresenter<UpdatesController>() {
val preferences: PreferencesHelper by injectLazy()
private val downloadManager: DownloadManager by injectLazy()
private val sourceManager: SourceManager by injectLazy()
private val handler: DatabaseHandler by injectLazy()
private val updateChapter: UpdateChapter by injectLazy()
private val setReadStatus: SetReadStatus by injectLazy()
class UpdatesPresenter(
private val preferences: PreferencesHelper = Injekt.get(),
private val downloadManager: DownloadManager = Injekt.get(),
private val sourceManager: SourceManager = Injekt.get(),
private val handler: DatabaseHandler = Injekt.get(),
private val updateChapter: UpdateChapter = Injekt.get(),
private val setReadStatus: SetReadStatus = Injekt.get(),
) : BasePresenter<UpdatesController>() {
private val relativeTime: Int = preferences.relativeTime().get()
private val dateFormat: DateFormat = preferences.dateFormat()
@ -52,77 +51,70 @@ class UpdatesPresenter : BasePresenter<UpdatesController>() {
override fun onCreate(savedState: Bundle?) {
super.onCreate(savedState)
getUpdatesObservable()
presenterScope.launchIO {
subscribeToUpdates()
downloadManager.queue.getStatusObservable()
.observeOn(Schedulers.io())
.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribeLatestCache(
{ view, it ->
onDownloadStatusChange(it)
view.onChapterDownloadUpdate(it)
},
{ _, error ->
logcat(LogPriority.ERROR, error)
},
)
downloadManager.queue.getStatusAsFlow()
.catch { error -> logcat(LogPriority.ERROR, error) }
.collectLatest {
withUIContext {
onDownloadStatusChange(it)
view?.onChapterDownloadUpdate(it)
}
}
downloadManager.queue.getProgressObservable()
.observeOn(Schedulers.io())
.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribeLatestCache(UpdatesController::onChapterDownloadUpdate) { _, error ->
logcat(LogPriority.ERROR, error)
}
downloadManager.queue.getProgressAsFlow()
.catch { error -> logcat(LogPriority.ERROR, error) }
.collectLatest {
withUIContext {
view?.onChapterDownloadUpdate(it)
}
}
}
}
/**
* Get observable containing recent chapters and date
*
* @return observable containing recent chapters and date
*/
private fun getUpdatesObservable() {
private suspend fun subscribeToUpdates() {
// Set date limit for recent chapters
presenterScope.launchIO {
val cal = Calendar.getInstance().apply {
time = Date()
add(Calendar.MONTH, -3)
}
handler
.subscribeToList {
mangasQueries.getRecentlyUpdated(after = cal.timeInMillis, mangaChapterMapper)
}
.map { mangaChapter ->
val map = TreeMap<Date, MutableList<Pair<Manga, Chapter>>> { d1, d2 -> d2.compareTo(d1) }
val byDate = mangaChapter.groupByTo(map) { it.second.dateFetch.toDateKey() }
byDate.flatMap { entry ->
val dateItem = DateSectionItem(entry.key, relativeTime, dateFormat)
entry.value
.sortedWith(compareBy({ it.second.dateFetch }, { it.second.chapterNumber })).asReversed()
.map { UpdatesItem(it.second, it.first, dateItem) }
}
}
.collectLatest { list ->
list.forEach { item ->
// Find an active download for this chapter.
val download = downloadManager.queue.find { it.chapter.id == item.chapter.id }
// If there's an active download, assign it, otherwise ask the manager if
// the chapter is downloaded and assign it to the status.
if (download != null) {
item.download = download
}
}
setDownloadedChapters(list)
_updates.value = list
// Set unread chapter count for bottom bar badge
preferences.unreadUpdatesCount().set(list.count { !it.chapter.read })
}
val cal = Calendar.getInstance().apply {
time = Date()
add(Calendar.MONTH, -3)
}
handler
.subscribeToList {
mangasQueries.getRecentlyUpdated(after = cal.timeInMillis, mangaChapterMapper)
}
.map { mangaChapter ->
val map = TreeMap<Date, MutableList<Pair<Manga, Chapter>>> { d1, d2 -> d2.compareTo(d1) }
val byDate = mangaChapter.groupByTo(map) { it.second.dateFetch.toDateKey() }
byDate.flatMap { entry ->
val dateItem = DateSectionItem(entry.key, relativeTime, dateFormat)
entry.value
.sortedWith(compareBy({ it.second.dateFetch }, { it.second.chapterNumber })).asReversed()
.map { UpdatesItem(it.second, it.first, dateItem) }
}
}
.collectLatest { list ->
list.forEach { item ->
// Find an active download for this chapter.
val download = downloadManager.queue.find { it.chapter.id == item.chapter.id }
// If there's an active download, assign it, otherwise ask the manager if
// the chapter is downloaded and assign it to the status.
if (download != null) {
item.download = download
}
}
setDownloadedChapters(list)
_updates.value = list
// Set unread chapter count for bottom bar badge
preferences.unreadUpdatesCount().set(list.count { !it.chapter.read })
}
}
/**
@ -184,16 +176,14 @@ class UpdatesPresenter : BasePresenter<UpdatesController>() {
* @param chapters list of chapters
*/
fun deleteChapters(chapters: List<UpdatesItem>) {
Observable.just(chapters)
.doOnNext { deleteChaptersInternal(it) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeFirst(
{ view, _ ->
view.onChaptersDeleted()
},
UpdatesController::onChaptersDeletedError,
)
launchIO {
try {
deleteChaptersInternal(chapters)
withUIContext { view?.onChaptersDeleted() }
} catch (e: Throwable) {
withUIContext { view?.onChaptersDeletedError(e) }
}
}
}
/**