Download queue threads are now updated when the setting change

This commit is contained in:
inorichi 2015-11-05 20:01:37 +01:00
parent c73779ea3b
commit 2683cad5b5
4 changed files with 218 additions and 12 deletions

View file

@ -72,6 +72,7 @@ dependencies {
compile 'com.jakewharton:disklrucache:2.0.2' compile 'com.jakewharton:disklrucache:2.0.2'
compile 'org.jsoup:jsoup:1.8.3' compile 'org.jsoup:jsoup:1.8.3'
compile 'io.reactivex:rxandroid:1.0.1' compile 'io.reactivex:rxandroid:1.0.1'
compile 'com.f2prateek.rx.preferences:rx-preferences:1.0.1'
compile "com.pushtorefresh.storio:sqlite:$STORIO_VERSION" compile "com.pushtorefresh.storio:sqlite:$STORIO_VERSION"
compile "com.pushtorefresh.storio:sqlite-annotations:$STORIO_VERSION" compile "com.pushtorefresh.storio:sqlite-annotations:$STORIO_VERSION"
compile "info.android15.nucleus:nucleus:$NUCLEUS_VERSION" compile "info.android15.nucleus:nucleus:$NUCLEUS_VERSION"

View file

@ -22,15 +22,18 @@ import eu.kanade.mangafeed.data.models.Page;
import eu.kanade.mangafeed.events.DownloadChapterEvent; import eu.kanade.mangafeed.events.DownloadChapterEvent;
import eu.kanade.mangafeed.sources.base.Source; import eu.kanade.mangafeed.sources.base.Source;
import eu.kanade.mangafeed.util.DiskUtils; import eu.kanade.mangafeed.util.DiskUtils;
import eu.kanade.mangafeed.util.DynamicConcurrentMergeOperator;
import rx.Observable; import rx.Observable;
import rx.Subscription; import rx.Subscription;
import rx.schedulers.Schedulers; import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject; import rx.subjects.PublishSubject;
public class DownloadManager { public class DownloadManager {
private PublishSubject<DownloadChapterEvent> downloadsSubject; private PublishSubject<DownloadChapterEvent> downloadsSubject;
private Subscription downloadSubscription; private Subscription downloadSubscription;
private Subscription threadNumberSubscription;
private Context context; private Context context;
private SourceManager sourceManager; private SourceManager sourceManager;
@ -61,14 +64,21 @@ public class DownloadManager {
downloadSubscription.unsubscribe(); downloadSubscription.unsubscribe();
} }
if (threadNumberSubscription != null && !threadNumberSubscription.isUnsubscribed())
threadNumberSubscription.unsubscribe();
downloadsSubject = PublishSubject.create(); downloadsSubject = PublishSubject.create();
BehaviorSubject<Integer> threads = BehaviorSubject.create();
threadNumberSubscription = preferences.getDownloadTheadsObs()
.subscribe(threads::onNext);
// Listen for download events, add them to queue and download // Listen for download events, add them to queue and download
downloadSubscription = downloadsSubject downloadSubscription = downloadsSubject
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.filter(event -> !isChapterDownloaded(event)) .filter(event -> !isChapterDownloaded(event))
.flatMap(this::prepareDownload) .flatMap(this::prepareDownload)
.flatMap(this::downloadChapter, preferences.getDownloadThreads()) .lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threads))
.onBackpressureBuffer() .onBackpressureBuffer()
.subscribe(); .subscribe();
} }
@ -117,7 +127,6 @@ public class DownloadManager {
private Observable<Page> downloadChapter(Download download) { private Observable<Page> downloadChapter(Download download) {
return download.source return download.source
.pullPageListFromNetwork(download.chapter.url) .pullPageListFromNetwork(download.chapter.url)
.subscribeOn(Schedulers.io())
// Add resulting pages to download object // Add resulting pages to download object
.doOnNext(pages -> { .doOnNext(pages -> {
download.pages = pages; download.pages = pages;

View file

@ -4,14 +4,18 @@ import android.content.Context;
import android.content.SharedPreferences; import android.content.SharedPreferences;
import android.preference.PreferenceManager; import android.preference.PreferenceManager;
import com.f2prateek.rx.preferences.RxSharedPreferences;
import eu.kanade.mangafeed.R; import eu.kanade.mangafeed.R;
import eu.kanade.mangafeed.sources.base.Source; import eu.kanade.mangafeed.sources.base.Source;
import eu.kanade.mangafeed.util.DiskUtils; import eu.kanade.mangafeed.util.DiskUtils;
import rx.Observable;
public class PreferencesHelper { public class PreferencesHelper {
private static SharedPreferences mPref;
private Context context; private Context context;
private SharedPreferences prefs;
private RxSharedPreferences rxPrefs;
private static final String SOURCE_ACCOUNT_USERNAME = "pref_source_username_"; private static final String SOURCE_ACCOUNT_USERNAME = "pref_source_username_";
private static final String SOURCE_ACCOUNT_PASSWORD = "pref_source_password_"; private static final String SOURCE_ACCOUNT_PASSWORD = "pref_source_password_";
@ -20,7 +24,8 @@ public class PreferencesHelper {
this.context = context; this.context = context;
PreferenceManager.setDefaultValues(context, R.xml.pref_reader, false); PreferenceManager.setDefaultValues(context, R.xml.pref_reader, false);
mPref = PreferenceManager.getDefaultSharedPreferences(context); prefs = PreferenceManager.getDefaultSharedPreferences(context);
rxPrefs = RxSharedPreferences.create(prefs);
} }
private String getKey(int keyResource) { private String getKey(int keyResource) {
@ -28,39 +33,44 @@ public class PreferencesHelper {
} }
public void clear() { public void clear() {
mPref.edit().clear().apply(); prefs.edit().clear().apply();
} }
public boolean useFullscreenSet() { public boolean useFullscreenSet() {
return mPref.getBoolean(getKey(R.string.pref_fullscreen_key), false); return prefs.getBoolean(getKey(R.string.pref_fullscreen_key), false);
} }
public int getDefaultViewer() { public int getDefaultViewer() {
return Integer.parseInt(mPref.getString(getKey(R.string.pref_default_viewer_key), "1")); return Integer.parseInt(prefs.getString(getKey(R.string.pref_default_viewer_key), "1"));
} }
public String getSourceUsername(Source source) { public String getSourceUsername(Source source) {
return mPref.getString(SOURCE_ACCOUNT_USERNAME + source.getSourceId(), ""); return prefs.getString(SOURCE_ACCOUNT_USERNAME + source.getSourceId(), "");
} }
public String getSourcePassword(Source source) { public String getSourcePassword(Source source) {
return mPref.getString(SOURCE_ACCOUNT_PASSWORD + source.getSourceId(), ""); return prefs.getString(SOURCE_ACCOUNT_PASSWORD + source.getSourceId(), "");
} }
public void setSourceCredentials(Source source, String username, String password) { public void setSourceCredentials(Source source, String username, String password) {
mPref.edit() prefs.edit()
.putString(SOURCE_ACCOUNT_USERNAME + source.getSourceId(), username) .putString(SOURCE_ACCOUNT_USERNAME + source.getSourceId(), username)
.putString(SOURCE_ACCOUNT_PASSWORD + source.getSourceId(), password) .putString(SOURCE_ACCOUNT_PASSWORD + source.getSourceId(), password)
.apply(); .apply();
} }
public String getDownloadsDirectory() { public String getDownloadsDirectory() {
return mPref.getString(getKey(R.string.pref_download_directory_key), return prefs.getString(getKey(R.string.pref_download_directory_key),
DiskUtils.getStorageDirectories(context)[0]); DiskUtils.getStorageDirectories(context)[0]);
} }
public int getDownloadThreads() { public int getDownloadThreads() {
return Integer.parseInt(mPref.getString(getKey(R.string.pref_download_threads_key), "1")); return Integer.parseInt(prefs.getString(getKey(R.string.pref_download_threads_key), "1"));
}
public Observable<Integer> getDownloadTheadsObs() {
return rxPrefs.getString(getKey(R.string.pref_download_threads_key), "1")
.asObservable().map(Integer::parseInt);
} }
} }

View file

@ -0,0 +1,186 @@
package eu.kanade.mangafeed.util;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
public class DynamicConcurrentMergeOperator<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends Observable<? extends R>> mapper;
final Observable<Integer> workerCount;
public DynamicConcurrentMergeOperator(
Func1<? super T, ? extends Observable<? extends R>> mapper,
Observable<Integer> workerCount) {
this.mapper = mapper;
this.workerCount = workerCount;
}
@Override
public Subscriber<? super T> call(Subscriber<? super R> t) {
DynamicConcurrentMerge<T, R> parent = new DynamicConcurrentMerge<>(t, mapper);
t.add(parent);
parent.init(workerCount);
return parent;
}
static final class DynamicConcurrentMerge<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends Observable<? extends R>> mapper;
final Queue<T> queue;
final CopyOnWriteArrayList<DynamicWorker<T, R>> workers;
final CompositeSubscription composite;
final AtomicInteger wipActive;
final AtomicBoolean once;
long id;
public DynamicConcurrentMerge(Subscriber<? super R> actual,
Func1<? super T, ? extends Observable<? extends R>> mapper) {
this.actual = actual;
this.mapper = mapper;
this.queue = new ConcurrentLinkedQueue<>();
this.workers = new CopyOnWriteArrayList<>();
this.composite = new CompositeSubscription();
this.wipActive = new AtomicInteger(1);
this.once = new AtomicBoolean();
this.add(composite);
this.request(0);
}
public void init(Observable<Integer> workerCount) {
Subscription wc = workerCount.subscribe(n -> {
int n0 = workers.size();
if (n0 < n) {
for (int i = n0; i < n; i++) {
DynamicWorker<T, R> dw = new DynamicWorker<>(++id, this);
workers.add(dw);
request(1);
dw.tryNext();
}
} else if (n0 > n) {
for (int i = 0; i < n; i++) {
workers.get(i).start();
}
for (int i = n0 - 1; i >= n; i--) {
workers.get(i).stop();
}
}
if (!once.get() && once.compareAndSet(false, true)) {
request(n);
}
}, this::onError);
composite.add(wc);
}
void requestMore(long n) {
request(n);
}
@Override
public void onNext(T t) {
queue.offer(t);
wipActive.getAndIncrement();
for (DynamicWorker<T, R> w : workers) {
w.tryNext();
}
}
@Override
public void onError(Throwable e) {
composite.unsubscribe();
actual.onError(e);
}
@Override
public void onCompleted() {
if (wipActive.decrementAndGet() == 0) {
actual.onCompleted();
}
}
}
static final class DynamicWorker<T, R> {
final long id;
final AtomicBoolean running;
final DynamicConcurrentMerge<T, R> parent;
final AtomicBoolean stop;
public DynamicWorker(long id, DynamicConcurrentMerge<T, R> parent) {
this.id = id;
this.parent = parent;
this.stop = new AtomicBoolean();
this.running = new AtomicBoolean();
}
public void tryNext() {
if (!running.get() && running.compareAndSet(false, true)) {
T t;
if (stop.get()) {
parent.workers.remove(this);
return;
}
t = parent.queue.poll();
if (t == null) {
running.set(false);
return;
}
Observable out = parent.mapper.call(t);
Subscriber<R> s = new Subscriber<R>() {
@Override
public void onNext(R t) {
parent.actual.onNext(t);
}
@Override
public void onError(Throwable e) {
parent.onError(e);
}
@Override
public void onCompleted() {
parent.onCompleted();
if (parent.wipActive.get() != 0) {
running.set(false);
parent.requestMore(1);
tryNext();
}
}
};
parent.composite.add(s);
s.add(Subscriptions.create(() -> parent.composite.remove(s)));
// Unchecked assignment to avoid weird Android Studio errors
out.subscribe(s);
}
}
public void start() {
stop.set(false);
tryNext();
}
public void stop() {
stop.set(true);
if (running.compareAndSet(false, true)) {
parent.workers.remove(this);
}
}
}
}