diff options
author | Harald Musum <musum@yahooinc.com> | 2021-10-08 12:20:43 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2021-10-08 12:20:43 +0200 |
commit | 8242de189462e8aaf6342f307ff58ad5bfb9326f (patch) | |
tree | ebb80f7c896de1427ec4d8dabd46fc0424b99c62 /filedistribution/src | |
parent | a6b7ec5af213d5f3bdd71593064322c232ad7c87 (diff) |
Remove getSupervisor() from ConnectionPool interface
Cleanup use of supervisor in connection pool and some file download
classes
Diffstat (limited to 'filedistribution/src')
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java | 55 | ||||
-rw-r--r-- | filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java | 16 |
2 files changed, 53 insertions, 18 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 58e08cccb99..0eef005311d 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -2,14 +2,17 @@ package com.yahoo.vespa.filedistribution; import com.yahoo.config.FileReference; -import java.util.logging.Level; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.jrt.Supervisor; +import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; +import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.yolean.Exceptions; import java.io.File; import java.time.Duration; - +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -18,6 +21,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -31,27 +35,33 @@ public class FileDownloader implements AutoCloseable { public static File defaultDownloadDirectory = new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")); private final ConnectionPool connectionPool; + private final Supervisor supervisor; private final File downloadDirectory; private final Duration timeout; private final FileReferenceDownloader fileReferenceDownloader; private final Downloads downloads; - public FileDownloader(ConnectionPool connectionPool) { - this(connectionPool, defaultDownloadDirectory, new Downloads()); + public FileDownloader(List<String> configservers, Supervisor supervisor) { + this(getConnectionPool(configservers, supervisor), supervisor); + } + + public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor) { + this(connectionPool, supervisor, defaultDownloadDirectory, new Downloads()); } - public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads) { + public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor, File downloadDirectory, Downloads downloads) { // TODO: Reduce timeout even more, timeout is so long that we might get starvation - this(connectionPool, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10)); + this(connectionPool, supervisor, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10)); } - public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads, + public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor, File downloadDirectory, Downloads downloads, Duration timeout, Duration sleepBetweenRetries) { this.connectionPool = connectionPool; + this.supervisor = supervisor; this.downloadDirectory = downloadDirectory; this.timeout = timeout; - // Needed to receive RPC calls receiveFile* from server after asking for files - new FileReceiver(connectionPool.getSupervisor(), downloads, downloadDirectory); + // Needed to receive RPC receiveFile* calls from server after asking for files + new FileReceiver(supervisor, downloads, downloadDirectory); this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout, sleepBetweenRetries); this.downloads = downloads; } @@ -133,6 +143,33 @@ public class FileDownloader implements AutoCloseable { public void close() { fileReferenceDownloader.close(); + supervisor.transport().shutdown().join(); + } + + private static ConnectionPool getConnectionPool(List<String> configServers, Supervisor supervisor) { + return configServers.size() > 0 + ? new JRTConnectionPool(new ConfigSourceSet(configServers), supervisor) + : emptyConnectionPool(); + } + + public static ConnectionPool emptyConnectionPool() { + return new EmptyConnectionPool(); + } + + private static class EmptyConnectionPool implements ConnectionPool { + + @Override + public void close() { } + + @Override + public Connection getCurrent() { return null; } + + @Override + public Connection switchConnection(Connection connection) { return null; } + + @Override + public int getSize() { return 0; } + } } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 15cc28007ce..6855f7f818c 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -44,6 +44,7 @@ public class FileDownloaderTest { private Downloads downloads; private FileDownloader fileDownloader; private File downloadDir; + private Supervisor supervisor; @Before public void setup() { @@ -51,7 +52,8 @@ public class FileDownloaderTest { downloadDir = Files.createTempDirectory("filedistribution").toFile(); connection = new MockConnection(); downloads = new Downloads(); - fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(1), sleepBetweenRetries); + supervisor = new Supervisor(new Transport()).setDropEmptyBuffers(true); + fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(1), sleepBetweenRetries); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); @@ -61,6 +63,7 @@ public class FileDownloaderTest { @After public void teardown() { fileDownloader.close(); + supervisor.transport().shutdown().join(); } @Test @@ -163,7 +166,7 @@ public class FileDownloaderTest { @Test public void getFileWhenConnectionError() throws IOException { - fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(2), sleepBetweenRetries); + fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(2), sleepBetweenRetries); File downloadDir = fileDownloader.downloadDirectory(); int timesToFail = 2; @@ -197,7 +200,7 @@ public class FileDownloaderTest { public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2); String filename = "abc.jar"; - fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(3), sleepBetweenRetries); + fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(3), sleepBetweenRetries); File downloadDir = fileDownloader.downloadDirectory(); // Delay response so that we can make a second request while downloading the file from the first request @@ -237,7 +240,7 @@ public class FileDownloaderTest { Duration timeout = Duration.ofMillis(200); MockConnection connectionPool = new MockConnection(); connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); - FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, downloads, timeout, sleepBetweenRetries); + FileDownloader fileDownloader = new FileDownloader(connectionPool, supervisor, downloadDir, downloads, timeout, sleepBetweenRetries); FileReference xyzzy = new FileReference("xyzzy"); // Should download since we do not have the file on disk fileDownloader.downloadIfNeeded(new FileReferenceDownload(xyzzy)); @@ -339,11 +342,6 @@ public class FileDownloaderTest { return 1; } - @Override - public Supervisor getSupervisor() { - return new Supervisor(new Transport()); - } - void setResponseHandler(ResponseHandler responseHandler) { this.responseHandler = responseHandler; } |