summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2021-10-08 12:20:43 +0200
committerHarald Musum <musum@yahooinc.com>2021-10-08 12:20:43 +0200
commit8242de189462e8aaf6342f307ff58ad5bfb9326f (patch)
treeebb80f7c896de1427ec4d8dabd46fc0424b99c62 /filedistribution
parenta6b7ec5af213d5f3bdd71593064322c232ad7c87 (diff)
Remove getSupervisor() from ConnectionPool interface
Cleanup use of supervisor in connection pool and some file download classes
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java55
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java16
2 files changed, 53 insertions, 18 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 58e08cccb99..0eef005311d 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -2,14 +2,17 @@
package com.yahoo.vespa.filedistribution;
import com.yahoo.config.FileReference;
-import java.util.logging.Level;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
+import com.yahoo.vespa.config.JRTConnectionPool;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.time.Duration;
-
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -18,6 +21,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -31,27 +35,33 @@ public class FileDownloader implements AutoCloseable {
public static File defaultDownloadDirectory = new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution"));
private final ConnectionPool connectionPool;
+ private final Supervisor supervisor;
private final File downloadDirectory;
private final Duration timeout;
private final FileReferenceDownloader fileReferenceDownloader;
private final Downloads downloads;
- public FileDownloader(ConnectionPool connectionPool) {
- this(connectionPool, defaultDownloadDirectory, new Downloads());
+ public FileDownloader(List<String> configservers, Supervisor supervisor) {
+ this(getConnectionPool(configservers, supervisor), supervisor);
+ }
+
+ public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor) {
+ this(connectionPool, supervisor, defaultDownloadDirectory, new Downloads());
}
- public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads) {
+ public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor, File downloadDirectory, Downloads downloads) {
// TODO: Reduce timeout even more, timeout is so long that we might get starvation
- this(connectionPool, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10));
+ this(connectionPool, supervisor, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10));
}
- public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads,
+ public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor, File downloadDirectory, Downloads downloads,
Duration timeout, Duration sleepBetweenRetries) {
this.connectionPool = connectionPool;
+ this.supervisor = supervisor;
this.downloadDirectory = downloadDirectory;
this.timeout = timeout;
- // Needed to receive RPC calls receiveFile* from server after asking for files
- new FileReceiver(connectionPool.getSupervisor(), downloads, downloadDirectory);
+ // Needed to receive RPC receiveFile* calls from server after asking for files
+ new FileReceiver(supervisor, downloads, downloadDirectory);
this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout, sleepBetweenRetries);
this.downloads = downloads;
}
@@ -133,6 +143,33 @@ public class FileDownloader implements AutoCloseable {
public void close() {
fileReferenceDownloader.close();
+ supervisor.transport().shutdown().join();
+ }
+
+ private static ConnectionPool getConnectionPool(List<String> configServers, Supervisor supervisor) {
+ return configServers.size() > 0
+ ? new JRTConnectionPool(new ConfigSourceSet(configServers), supervisor)
+ : emptyConnectionPool();
+ }
+
+ public static ConnectionPool emptyConnectionPool() {
+ return new EmptyConnectionPool();
+ }
+
+ private static class EmptyConnectionPool implements ConnectionPool {
+
+ @Override
+ public void close() { }
+
+ @Override
+ public Connection getCurrent() { return null; }
+
+ @Override
+ public Connection switchConnection(Connection connection) { return null; }
+
+ @Override
+ public int getSize() { return 0; }
+
}
}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index 15cc28007ce..6855f7f818c 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -44,6 +44,7 @@ public class FileDownloaderTest {
private Downloads downloads;
private FileDownloader fileDownloader;
private File downloadDir;
+ private Supervisor supervisor;
@Before
public void setup() {
@@ -51,7 +52,8 @@ public class FileDownloaderTest {
downloadDir = Files.createTempDirectory("filedistribution").toFile();
connection = new MockConnection();
downloads = new Downloads();
- fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(1), sleepBetweenRetries);
+ supervisor = new Supervisor(new Transport()).setDropEmptyBuffers(true);
+ fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(1), sleepBetweenRetries);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -61,6 +63,7 @@ public class FileDownloaderTest {
@After
public void teardown() {
fileDownloader.close();
+ supervisor.transport().shutdown().join();
}
@Test
@@ -163,7 +166,7 @@ public class FileDownloaderTest {
@Test
public void getFileWhenConnectionError() throws IOException {
- fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(2), sleepBetweenRetries);
+ fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(2), sleepBetweenRetries);
File downloadDir = fileDownloader.downloadDirectory();
int timesToFail = 2;
@@ -197,7 +200,7 @@ public class FileDownloaderTest {
public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
String filename = "abc.jar";
- fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(3), sleepBetweenRetries);
+ fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(3), sleepBetweenRetries);
File downloadDir = fileDownloader.downloadDirectory();
// Delay response so that we can make a second request while downloading the file from the first request
@@ -237,7 +240,7 @@ public class FileDownloaderTest {
Duration timeout = Duration.ofMillis(200);
MockConnection connectionPool = new MockConnection();
connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
- FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, downloads, timeout, sleepBetweenRetries);
+ FileDownloader fileDownloader = new FileDownloader(connectionPool, supervisor, downloadDir, downloads, timeout, sleepBetweenRetries);
FileReference xyzzy = new FileReference("xyzzy");
// Should download since we do not have the file on disk
fileDownloader.downloadIfNeeded(new FileReferenceDownload(xyzzy));
@@ -339,11 +342,6 @@ public class FileDownloaderTest {
return 1;
}
- @Override
- public Supervisor getSupervisor() {
- return new Supervisor(new Transport());
- }
-
void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}