Fix number of simultaneous downloads ignored (again)

This commit is contained in:
inorichi 2016-01-24 12:37:41 +01:00
parent 50306f6ea3
commit 594219848d
2 changed files with 15 additions and 11 deletions

View file

@ -15,8 +15,6 @@ import java.io.IOException;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import eu.kanade.tachiyomi.data.database.models.Chapter; import eu.kanade.tachiyomi.data.database.models.Chapter;
import eu.kanade.tachiyomi.data.database.models.Manga; import eu.kanade.tachiyomi.data.database.models.Manga;
@ -28,6 +26,7 @@ import eu.kanade.tachiyomi.data.source.base.Source;
import eu.kanade.tachiyomi.data.source.model.Page; import eu.kanade.tachiyomi.data.source.model.Page;
import eu.kanade.tachiyomi.event.DownloadChaptersEvent; import eu.kanade.tachiyomi.event.DownloadChaptersEvent;
import eu.kanade.tachiyomi.util.DiskUtils; import eu.kanade.tachiyomi.util.DiskUtils;
import eu.kanade.tachiyomi.util.DynamicConcurrentMergeOperator;
import eu.kanade.tachiyomi.util.UrlUtil; import eu.kanade.tachiyomi.util.UrlUtil;
import rx.Observable; import rx.Observable;
import rx.Subscription; import rx.Subscription;
@ -48,11 +47,12 @@ public class DownloadManager {
private BehaviorSubject<Boolean> runningSubject; private BehaviorSubject<Boolean> runningSubject;
private Subscription downloadsSubscription; private Subscription downloadsSubscription;
private BehaviorSubject<Integer> threadsSubject;
private Subscription threadsSubscription;
private DownloadQueue queue; private DownloadQueue queue;
private volatile boolean isRunning; private volatile boolean isRunning;
private ExecutorService threadPool;
public static final String PAGE_LIST_FILE = "index.json"; public static final String PAGE_LIST_FILE = "index.json";
public DownloadManager(Context context, SourceManager sourceManager, PreferencesHelper preferences) { public DownloadManager(Context context, SourceManager sourceManager, PreferencesHelper preferences) {
@ -65,17 +65,19 @@ public class DownloadManager {
downloadsQueueSubject = PublishSubject.create(); downloadsQueueSubject = PublishSubject.create();
runningSubject = BehaviorSubject.create(); runningSubject = BehaviorSubject.create();
threadsSubject = BehaviorSubject.create();
} }
private void initializeSubscriptions() { private void initializeSubscriptions() {
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed()) if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed())
downloadsSubscription.unsubscribe(); downloadsSubscription.unsubscribe();
threadPool = Executors.newFixedThreadPool(preferences.downloadThreads()); threadsSubscription = preferences.downloadThreads().asObservable()
.subscribe(threadsSubject::onNext);
downloadsSubscription = downloadsQueueSubject downloadsSubscription = downloadsQueueSubject
.flatMap(Observable::from) .flatMap(Observable::from)
.flatMap(c -> downloadChapter(c).subscribeOn(Schedulers.from(threadPool))) .lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threadsSubject))
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
.map(download -> areAllDownloadsFinished()) .map(download -> areAllDownloadsFinished())
.subscribe(finished -> { .subscribe(finished -> {
@ -101,9 +103,10 @@ public class DownloadManager {
downloadsSubscription = null; downloadsSubscription = null;
} }
if (threadPool != null && !threadPool.isShutdown()) { if (threadsSubscription != null && !threadsSubscription.isUnsubscribed()) {
threadPool.shutdown(); threadsSubscription.unsubscribe();
} }
} }
// Create a download object for every chapter in the event and add them to the downloads queue // Create a download object for every chapter in the event and add them to the downloads queue
@ -208,7 +211,8 @@ public class DownloadManager {
.onErrorResumeNext(error -> { .onErrorResumeNext(error -> {
download.setStatus(Download.ERROR); download.setStatus(Download.ERROR);
return Observable.just(download); return Observable.just(download);
})); }))
.subscribeOn(Schedulers.io());
} }
// Get the image from the filesystem if it exists or download from network // Get the image from the filesystem if it exists or download from network

View file

@ -159,8 +159,8 @@ public class PreferencesHelper {
prefs.edit().putString(getKey(R.string.pref_download_directory_key), path).apply(); prefs.edit().putString(getKey(R.string.pref_download_directory_key), path).apply();
} }
public int downloadThreads() { public Preference<Integer> downloadThreads() {
return prefs.getInt(getKey(R.string.pref_download_slots_key), 1); return rxPrefs.getInteger(getKey(R.string.pref_download_slots_key), 1);
} }
public boolean downloadOnlyOverWifi() { public boolean downloadOnlyOverWifi() {