Replace onProcessRestart with the new startables.

This commit is contained in:
inorichi 2016-02-03 21:09:40 +01:00
parent 3deac86bbe
commit aada373a0c
6 changed files with 152 additions and 89 deletions

View file

@ -213,6 +213,137 @@ public class RxPresenter<View> extends Presenter<View> {
restartableReplay(restartableId, observableFactory, onNext, null); restartableReplay(restartableId, observableFactory, onNext, null);
} }
/**
* A startable behaves the same as a restartable but it does not resubscribe on process restart
*
* @param startableId an id of the restartable.
* @param observableFactory a factory that should return an Observable when the startable should run.
*/
public <T> void startable(int startableId, final Func0<Observable<T>> observableFactory) {
restartables.put(startableId, () -> observableFactory.call().subscribe());
}
/**
* A startable behaves the same as a restartable but it does not resubscribe on process restart
*
* @param startableId an id of the restartable.
* @param observableFactory a factory that should return an Observable when the startable should run.
* @param onNext a callback that will be called when received data should be delivered to view.
* @param onError a callback that will be called if the source observable emits onError.
*/
public <T> void startable(int startableId, final Func0<Observable<T>> observableFactory,
final Action1<T> onNext, final Action1<Throwable> onError) {
restartables.put(startableId, () -> observableFactory.call().subscribe(onNext, onError));
}
/**
* A startable behaves the same as a restartable but it does not resubscribe on process restart
*
* @param startableId an id of the restartable.
* @param observableFactory a factory that should return an Observable when the startable should run.
* @param onNext a callback that will be called when received data should be delivered to view.
*/
public <T> void startable(int startableId, final Func0<Observable<T>> observableFactory, final Action1<T> onNext) {
restartables.put(startableId, () -> observableFactory.call().subscribe(onNext));
}
/**
* This is a shortcut that can be used instead of combining together
* {@link #startable(int, Func0)},
* {@link #deliverFirst()},
* {@link #split(Action2, Action2)}.
*
* @param startableId an id of the startable.
* @param observableFactory a factory that should return an Observable when the startable should run.
* @param onNext a callback that will be called when received data should be delivered to view.
* @param onError a callback that will be called if the source observable emits onError.
* @param <T> the type of the observable.
*/
public <T> void startableFirst(int startableId, final Func0<Observable<T>> observableFactory,
final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
restartables.put(startableId, new Func0<Subscription>() {
@Override
public Subscription call() {
return observableFactory.call()
.compose(RxPresenter.this.<T>deliverFirst())
.subscribe(split(onNext, onError));
}
});
}
/**
* This is a shortcut for calling {@link #startableFirst(int, Func0, Action2, Action2)} with the last parameter = null.
*/
public <T> void startableFirst(int startableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
startableFirst(startableId, observableFactory, onNext, null);
}
/**
* This is a shortcut that can be used instead of combining together
* {@link #startable(int, Func0)},
* {@link #deliverLatestCache()},
* {@link #split(Action2, Action2)}.
*
* @param startableId an id of the startable.
* @param observableFactory a factory that should return an Observable when the startable should run.
* @param onNext a callback that will be called when received data should be delivered to view.
* @param onError a callback that will be called if the source observable emits onError.
* @param <T> the type of the observable.
*/
public <T> void startableLatestCache(int startableId, final Func0<Observable<T>> observableFactory,
final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
restartables.put(startableId, new Func0<Subscription>() {
@Override
public Subscription call() {
return observableFactory.call()
.compose(RxPresenter.this.<T>deliverLatestCache())
.subscribe(split(onNext, onError));
}
});
}
/**
* This is a shortcut for calling {@link #startableLatestCache(int, Func0, Action2, Action2)} with the last parameter = null.
*/
public <T> void startableLatestCache(int startableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
startableLatestCache(startableId, observableFactory, onNext, null);
}
/**
* This is a shortcut that can be used instead of combining together
* {@link #startable(int, Func0)},
* {@link #deliverReplay()},
* {@link #split(Action2, Action2)}.
*
* @param startableId an id of the startable.
* @param observableFactory a factory that should return an Observable when the startable should run.
* @param onNext a callback that will be called when received data should be delivered to view.
* @param onError a callback that will be called if the source observable emits onError.
* @param <T> the type of the observable.
*/
public <T> void startableReplay(int startableId, final Func0<Observable<T>> observableFactory,
final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
restartables.put(startableId, new Func0<Subscription>() {
@Override
public Subscription call() {
return observableFactory.call()
.compose(RxPresenter.this.<T>deliverReplay())
.subscribe(split(onNext, onError));
}
});
}
/**
* This is a shortcut for calling {@link #startableReplay(int, Func0, Action2, Action2)} with the last parameter = null.
*/
public <T> void startableReplay(int startableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
startableReplay(startableId, observableFactory, onNext, null);
}
/** /**
* Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by * Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by
* the source {@link rx.Observable}. * the source {@link rx.Observable}.

View file

@ -53,23 +53,23 @@ public class CataloguePresenter extends BasePresenter<CatalogueFragment> {
super.onCreate(savedState); super.onCreate(savedState);
if (savedState != null) { if (savedState != null) {
onProcessRestart(); source = sourceManager.get(sourceId);
} }
mangaDetailSubject = PublishSubject.create(); mangaDetailSubject = PublishSubject.create();
pager = new RxPager<>(); pager = new RxPager<>();
restartableReplay(GET_MANGA_LIST, startableReplay(GET_MANGA_LIST,
pager::results, pager::results,
(view, pair) -> view.onAddPage(pair.first, pair.second)); (view, pair) -> view.onAddPage(pair.first, pair.second));
restartableFirst(GET_MANGA_PAGE, startableFirst(GET_MANGA_PAGE,
() -> pager.request(page -> getMangasPageObservable(page + 1)), () -> pager.request(page -> getMangasPageObservable(page + 1)),
(view, next) -> {}, (view, next) -> {},
(view, error) -> view.onAddPageError()); (view, error) -> view.onAddPageError());
restartableLatestCache(GET_MANGA_DETAIL, startableLatestCache(GET_MANGA_DETAIL,
() -> mangaDetailSubject () -> mangaDetailSubject
.observeOn(Schedulers.io()) .observeOn(Schedulers.io())
.flatMap(Observable::from) .flatMap(Observable::from)
@ -84,13 +84,6 @@ public class CataloguePresenter extends BasePresenter<CatalogueFragment> {
.subscribe(this::setDisplayMode)); .subscribe(this::setDisplayMode));
} }
private void onProcessRestart() {
source = sourceManager.get(sourceId);
stop(GET_MANGA_LIST);
stop(GET_MANGA_DETAIL);
stop(GET_MANGA_PAGE);
}
private void setDisplayMode(boolean asList) { private void setDisplayMode(boolean asList) {
this.isListMode = asList; this.isListMode = asList;
if (asList) { if (asList) {

View file

@ -52,26 +52,22 @@ public class ChaptersPresenter extends BasePresenter<ChaptersFragment> {
protected void onCreate(Bundle savedState) { protected void onCreate(Bundle savedState) {
super.onCreate(savedState); super.onCreate(savedState);
if (savedState != null) {
onProcessRestart();
}
chaptersSubject = PublishSubject.create(); chaptersSubject = PublishSubject.create();
restartableLatestCache(GET_MANGA, startableLatestCache(GET_MANGA,
() -> Observable.just(manga), () -> Observable.just(manga),
ChaptersFragment::onNextManga); ChaptersFragment::onNextManga);
restartableLatestCache(DB_CHAPTERS, startableLatestCache(DB_CHAPTERS,
this::getDbChaptersObs, this::getDbChaptersObs,
ChaptersFragment::onNextChapters); ChaptersFragment::onNextChapters);
restartableFirst(FETCH_CHAPTERS, startableFirst(FETCH_CHAPTERS,
this::getOnlineChaptersObs, this::getOnlineChaptersObs,
(view, result) -> view.onFetchChaptersDone(), (view, result) -> view.onFetchChaptersDone(),
(view, error) -> view.onFetchChaptersError()); (view, error) -> view.onFetchChaptersError());
restartableLatestCache(CHAPTER_STATUS_CHANGES, startableLatestCache(CHAPTER_STATUS_CHANGES,
this::getChapterStatusObs, this::getChapterStatusObs,
(view, download) -> view.onChapterStatusChange(download), (view, download) -> view.onChapterStatusChange(download),
(view, error) -> Timber.e(error.getCause(), error.getMessage())); (view, error) -> Timber.e(error.getCause(), error.getMessage()));
@ -79,13 +75,6 @@ public class ChaptersPresenter extends BasePresenter<ChaptersFragment> {
registerForStickyEvents(); registerForStickyEvents();
} }
private void onProcessRestart() {
stop(GET_MANGA);
stop(DB_CHAPTERS);
stop(FETCH_CHAPTERS);
stop(CHAPTER_STATUS_CHANGES);
}
@Override @Override
protected void onDestroy() { protected void onDestroy() {
unregisterForEvents(); unregisterForEvents();

View file

@ -64,22 +64,18 @@ public class MangaInfoPresenter extends BasePresenter<MangaInfoFragment> {
protected void onCreate(Bundle savedState) { protected void onCreate(Bundle savedState) {
super.onCreate(savedState); super.onCreate(savedState);
if (savedState != null) { // Notify the view a manga is available or has changed
onProcessRestart(); startableLatestCache(GET_MANGA,
}
// Update manga cache
restartableLatestCache(GET_MANGA,
() -> Observable.just(manga), () -> Observable.just(manga),
(view, manga) -> view.onNextManga(manga, source)); (view, manga) -> view.onNextManga(manga, source));
// Update chapter count // Update chapter count
restartableLatestCache(GET_CHAPTER_COUNT, startableLatestCache(GET_CHAPTER_COUNT,
() -> Observable.just(count), () -> Observable.just(count),
MangaInfoFragment::setChapterCount); MangaInfoFragment::setChapterCount);
// Fetch manga info from source // Fetch manga info from source
restartableFirst(FETCH_MANGA_INFO, startableFirst(FETCH_MANGA_INFO,
this::fetchMangaObs, this::fetchMangaObs,
(view, manga) -> view.onFetchMangaDone(), (view, manga) -> view.onFetchMangaDone(),
(view, error) -> view.onFetchMangaError()); (view, error) -> view.onFetchMangaError());
@ -88,15 +84,6 @@ public class MangaInfoPresenter extends BasePresenter<MangaInfoFragment> {
registerForStickyEvents(); registerForStickyEvents();
} }
/**
* Called when savedState not null
*/
private void onProcessRestart() {
stop(GET_MANGA);
stop(GET_CHAPTER_COUNT);
stop(FETCH_MANGA_INFO);
}
@Override @Override
protected void onDestroy() { protected void onDestroy() {
unregisterForEvents(); unregisterForEvents();

View file

@ -44,20 +44,16 @@ public class MyAnimeListPresenter extends BasePresenter<MyAnimeListFragment> {
protected void onCreate(Bundle savedState) { protected void onCreate(Bundle savedState) {
super.onCreate(savedState); super.onCreate(savedState);
if (savedState != null) {
onProcessRestart();
}
myAnimeList = syncManager.getMyAnimeList(); myAnimeList = syncManager.getMyAnimeList();
restartableLatestCache(GET_MANGA_SYNC, startableLatestCache(GET_MANGA_SYNC,
() -> db.getMangaSync(manga, myAnimeList).asRxObservable() () -> db.getMangaSync(manga, myAnimeList).asRxObservable()
.doOnNext(mangaSync -> this.mangaSync = mangaSync) .doOnNext(mangaSync -> this.mangaSync = mangaSync)
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()), .observeOn(AndroidSchedulers.mainThread()),
MyAnimeListFragment::setMangaSync); MyAnimeListFragment::setMangaSync);
restartableLatestCache(GET_SEARCH_RESULTS, startableLatestCache(GET_SEARCH_RESULTS,
this::getSearchResultsObservable, this::getSearchResultsObservable,
(view, results) -> { (view, results) -> {
view.setSearchResults(results); view.setSearchResults(results);
@ -66,7 +62,7 @@ public class MyAnimeListPresenter extends BasePresenter<MyAnimeListFragment> {
view.setSearchResultsError(); view.setSearchResultsError();
}); });
restartableFirst(REFRESH, startableFirst(REFRESH,
() -> myAnimeList.getList() () -> myAnimeList.getList()
.flatMap(myList -> { .flatMap(myList -> {
for (MangaSync myManga : myList) { for (MangaSync myManga : myList) {
@ -86,12 +82,6 @@ public class MyAnimeListPresenter extends BasePresenter<MyAnimeListFragment> {
} }
private void onProcessRestart() {
stop(GET_MANGA_SYNC);
stop(GET_SEARCH_RESULTS);
stop(REFRESH);
}
@Override @Override
protected void onTakeView(MyAnimeListFragment view) { protected void onTakeView(MyAnimeListFragment view) {
super.onTakeView(view); super.onTakeView(view);

View file

@ -30,7 +30,6 @@ import rx.Observable;
import rx.android.schedulers.AndroidSchedulers; import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers; import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject; import rx.subjects.PublishSubject;
import timber.log.Timber;
public class ReaderPresenter extends BasePresenter<ReaderActivity> { public class ReaderPresenter extends BasePresenter<ReaderActivity> {
@ -66,30 +65,17 @@ public class ReaderPresenter extends BasePresenter<ReaderActivity> {
super.onCreate(savedState); super.onCreate(savedState);
if (savedState != null) { if (savedState != null) {
onProcessRestart(); source = sourceManager.get(sourceId);
} }
retryPageSubject = PublishSubject.create(); retryPageSubject = PublishSubject.create();
restartableLatestCache(PRELOAD_NEXT_CHAPTER, startable(PRELOAD_NEXT_CHAPTER, this::getPreloadNextChapterObservable);
this::getPreloadNextChapterObservable, startable(GET_PAGE_IMAGES, this::getPageImagesObservable);
(view, pages) -> {}, startable(GET_ADJACENT_CHAPTERS, this::getAdjacentChaptersObservable);
(view, error) -> Timber.e("An error occurred while preloading a chapter")); startable(RETRY_IMAGES, this::getRetryPageObservable);
restartableLatestCache(GET_PAGE_IMAGES, restartable(GET_MANGA_SYNC, () -> getMangaSyncObservable().subscribe());
this::getPageImagesObservable,
(view, page) -> {},
(view, error) -> Timber.e("An error occurred while downloading an image"));
restartableLatestCache(GET_ADJACENT_CHAPTERS,
this::getAdjacentChaptersObservable,
(view, pair) -> view.onAdjacentChapters(pair.first, pair.second),
(view, error) -> Timber.e("An error occurred while getting adjacent chapters"));
restartableLatestCache(RETRY_IMAGES,
this::getRetryPageObservable,
(view, page) -> {},
(view, error) -> Timber.e("An error occurred while downloading an image"));
restartableLatestCache(GET_PAGE_LIST, restartableLatestCache(GET_PAGE_LIST,
() -> getPageListObservable() () -> getPageListObservable()
@ -102,9 +88,6 @@ public class ReaderPresenter extends BasePresenter<ReaderActivity> {
(view, pages) -> view.onChapterReady(pages, manga, chapter, currentPage), (view, pages) -> view.onChapterReady(pages, manga, chapter, currentPage),
(view, error) -> view.onChapterError()); (view, error) -> view.onChapterError());
restartableFirst(GET_MANGA_SYNC, this::getMangaSyncObservable,
(view, mangaSync) -> {},
(view, error) -> {});
registerForStickyEvents(); registerForStickyEvents();
} }
@ -121,16 +104,6 @@ public class ReaderPresenter extends BasePresenter<ReaderActivity> {
super.onSave(state); super.onSave(state);
} }
private void onProcessRestart() {
source = sourceManager.get(sourceId);
// These are started by GET_PAGE_LIST, so we don't let them restart itselves
stop(GET_PAGE_IMAGES);
stop(GET_ADJACENT_CHAPTERS);
stop(RETRY_IMAGES);
stop(PRELOAD_NEXT_CHAPTER);
}
@EventBusHook @EventBusHook
public void onEventMainThread(ReaderEvent event) { public void onEventMainThread(ReaderEvent event) {
EventBus.getDefault().removeStickyEvent(event); EventBus.getDefault().removeStickyEvent(event);