Use configured compression method in swap job

This commit is contained in:
Shane Kilkelly 2021-01-14 14:02:22 +00:00
parent c67014b859
commit b92c737814
3 changed files with 70 additions and 12 deletions

View file

@ -34,6 +34,7 @@ public class SwapJobImpl implements SwapJob {
private final RepoStore repoStore;
private final DBStore dbStore;
private final SwapStore swapStore;
private final CompressionMethod compressionMethod;
private final Timer timer;
@ -51,6 +52,7 @@ public class SwapJobImpl implements SwapJob {
GiB * cfg.getLowGiB(),
GiB * cfg.getHighGiB(),
Duration.ofMillis(cfg.getIntervalMillis()),
cfg.getCompressionMethod(),
lock,
repoStore,
dbStore,
@ -63,6 +65,7 @@ public class SwapJobImpl implements SwapJob {
long lowWatermarkBytes,
long highWatermarkBytes,
Duration interval,
CompressionMethod method,
ProjectLock lock,
RepoStore repoStore,
DBStore dbStore,
@ -72,6 +75,7 @@ public class SwapJobImpl implements SwapJob {
this.lowWatermarkBytes = lowWatermarkBytes;
this.highWatermarkBytes = highWatermarkBytes;
this.interval = interval;
this.compressionMethod = method;
this.lock = lock;
this.repoStore = repoStore;
this.dbStore = dbStore;
@ -196,15 +200,29 @@ public class SwapJobImpl implements SwapJob {
Log.error("[{}] Exception while running gc on project: {}", projName, e);
}
long[] sizePtr = new long[1];
try (InputStream blob = repoStore.bzip2Project(projName, sizePtr)) {
try (InputStream blob = getBlobStream(projName, sizePtr)) {
swapStore.upload(projName, blob, sizePtr[0]);
dbStore.setLastAccessedTime(projName, null);
String compression = SwapJob.compressionMethodAsString(compressionMethod);
if (compression == null) {
throw new RuntimeException("invalid compression method, should not happen");
}
dbStore.swap(projName, compression);
repoStore.remove(projName);
}
}
Log.info("Evicted project: {}", projName);
}
private InputStream getBlobStream(String projName, long[] sizePtr) throws IOException {
if (compressionMethod == CompressionMethod.Gzip) {
return repoStore.gzipProject(projName, sizePtr);
} else if (compressionMethod == CompressionMethod.Bzip2) {
return repoStore.bzip2Project(projName, sizePtr);
} else {
throw new RuntimeException("invalid compression method, should not happen");
}
}
/**
* @see SwapJob#restore(String) for high-level description.
*
@ -220,15 +238,23 @@ public class SwapJobImpl implements SwapJob {
public void restore(String projName) throws IOException {
try (LockGuard __ = lock.lockGuard(projName)) {
try (InputStream zipped = swapStore.openDownloadStream(projName)) {
repoStore.unbzip2Project(
projName,
zipped
);
String compression = dbStore.getSwapCompression(projName);
if (compression == null) {
throw new RuntimeException("Missing compression method during restore, should not happen");
}
if ("gzip".equals(compression)) {
repoStore.ungzipProject(
projName,
zipped
);
} else if ("bzip2".equals(compression)) {
repoStore.unbzip2Project(
projName,
zipped
);
}
swapStore.remove(projName);
dbStore.setLastAccessedTime(
projName,
Timestamp.valueOf(LocalDateTime.now())
);
dbStore.restore(projName);
}
}
}

View file

@ -667,7 +667,7 @@ public class WLGitBridgeIntegrationTest {
server.start();
server.setState(states.get("wlgbCanSwapProjects").get("state"));
wlgb = new GitBridgeApp(new String[] {
makeConfigFile(33874, 3874, new SwapJobConfig(1, 0, 0, 250))
makeConfigFile(33874, 3874, new SwapJobConfig(1, 0, 0, 250, null))
});
wlgb.run();
File rootGitDir = new File(wlgb.config.getRootGitDirectory());

View file

@ -68,6 +68,7 @@ public class SwapJobImplTest {
15000,
30000,
Duration.ofMillis(100),
SwapJob.CompressionMethod.Bzip2,
lock,
repoStore,
dbStore,
@ -120,11 +121,42 @@ public class SwapJobImplTest {
while (swapJob.swaps.get() < 1);
assertEquals(1, dbStore.getNumUnswappedProjects());
assertEquals("proj1", dbStore.getOldestUnswappedProject());
assertEquals("bzip2", dbStore.getSwapCompression("proj2"));
swapJob.restore("proj2");
assertEquals(null, dbStore.getSwapCompression("proj2"));
int numSwaps = swapJob.swaps.get();
while (swapJob.swaps.get() <= numSwaps);
assertEquals(1, dbStore.getNumUnswappedProjects());
assertEquals("proj2", dbStore.getOldestUnswappedProject());
}
}
@Test
public void swapCompressionGzip() throws IOException {
swapJob = new SwapJobImpl(
1,
15000,
30000,
Duration.ofMillis(100),
SwapJob.CompressionMethod.Gzip,
lock,
repoStore,
dbStore,
swapStore
);
swapJob.lowWatermarkBytes = 16384;
assertEquals(2, dbStore.getNumUnswappedProjects());
assertEquals("proj2", dbStore.getOldestUnswappedProject());
swapJob.start();
while (swapJob.swaps.get() < 1);
assertEquals(1, dbStore.getNumUnswappedProjects());
assertEquals("proj1", dbStore.getOldestUnswappedProject());
assertEquals("gzip", dbStore.getSwapCompression("proj2"));
swapJob.restore("proj2");
assertEquals(null, dbStore.getSwapCompression("proj2"));
int numSwaps = swapJob.swaps.get();
while (swapJob.swaps.get() <= numSwaps);
assertEquals(1, dbStore.getNumUnswappedProjects());
assertEquals("proj2", dbStore.getOldestUnswappedProject());
}
}