diff options
10 files changed, 96 insertions, 107 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java index 48d23e6ada5..4d03522e980 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java @@ -28,7 +28,7 @@ public class FileDistributionAndUrlDownload { public FileDistributionAndUrlDownload(Supervisor supervisor, ConfigSourceSet source) { fileDistributionRpcServer = new FileDistributionRpcServer(supervisor, - new FileDownloader(new JRTConnectionPool(source, supervisor))); + new FileDownloader(new JRTConnectionPool(source, supervisor), supervisor)); urlDownloadRpcServer = new UrlDownloadRpcServer(supervisor); cleanupExecutor.scheduleAtFixedRate(new CachedFilesMaintainer(), delay.toSeconds(), delay.toSeconds(), TimeUnit.SECONDS); } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java index c27e4cb2bc8..640f3d0c27e 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java @@ -3,7 +3,6 @@ package com.yahoo.config.subscription.impl; import com.yahoo.jrt.Request; import com.yahoo.jrt.RequestWaiter; -import com.yahoo.jrt.Supervisor; import com.yahoo.vespa.config.ConfigPayload; import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; @@ -74,11 +73,6 @@ public class MockConnection implements ConnectionPool, Connection { return numSpecs; } - @Override - public Supervisor getSupervisor() { - return null; - } - public int getNumberOfRequests() { return numberOfRequests; } diff --git a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java index 1ee44bfee87..31f85ee4fd5 100644 --- a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config; -import com.yahoo.jrt.Supervisor; - /** * @author hmusum */ @@ -22,7 +20,4 @@ public interface ConnectionPool extends AutoCloseable { int getSize(); - // TODO: Exposes implementation, try to remove - Supervisor getSupervisor(); - } diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java index d7e21356ba2..048c82b1bde 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java @@ -148,9 +148,4 @@ public class JRTConnectionPool implements ConnectionPool { } } - @Override - public Supervisor getSupervisor() { - return supervisor; - } - } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java index 2d39c5549cc..a1ddad7bfd4 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java @@ -3,13 +3,7 @@ package com.yahoo.vespa.config.server.filedistribution; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.config.FileReference; -import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Transport; import com.yahoo.net.HostName; -import com.yahoo.vespa.config.Connection; -import com.yahoo.vespa.config.ConnectionPool; -import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.config.server.ConfigServerSpec; import java.io.File; @@ -38,62 +32,16 @@ public class FileDistributionUtil { return fileReferencesOnDisk; } - /** - * Returns a connection pool with all config servers except this one, or an empty pool if there - * is only one config server (no point in trying to download from yourself). - */ - public static ConnectionPool createConnectionPool(ConfigserverConfig configserverConfig) { - List<String> configServers = ConfigServerSpec.fromConfig(configserverConfig) - .stream() - .filter(spec -> !spec.getHostName().equals(HostName.getLocalhost())) - .map(spec -> "tcp/" + spec.getHostName() + ":" + spec.getConfigServerPort()) - .collect(Collectors.toList()); - - return configServers.size() > 0 - ? new JRTConnectionPool(new ConfigSourceSet(configServers), - new Supervisor(new Transport("filedistribution-pool")) - .setDropEmptyBuffers(true)) - : emptyConnectionPool(); + public static List<String> getOtherConfigServersInCluster(ConfigserverConfig configserverConfig) { + return ConfigServerSpec.fromConfig(configserverConfig) + .stream() + .filter(spec -> !spec.getHostName().equals(HostName.getLocalhost())) + .map(spec -> "tcp/" + spec.getHostName() + ":" + spec.getConfigServerPort()) + .collect(Collectors.toList()); } public static boolean fileReferenceExistsOnDisk(File downloadDirectory, FileReference applicationPackageReference) { return getFileReferencesOnDisk(downloadDirectory).contains(applicationPackageReference.value()); } - static ConnectionPool emptyConnectionPool() { - return new EmptyConnectionPool(); - } - - private static class EmptyConnectionPool implements ConnectionPool { - private Supervisor supervisor; - - @Override - public void close() { - synchronized (this) { - if (supervisor != null) { - supervisor.transport().shutdown().join(); - } - } - } - - @Override - public Connection getCurrent() { return null; } - - @Override - public Connection switchConnection(Connection connection) { return null; } - - @Override - public int getSize() { return 0; } - - @Override - public Supervisor getSupervisor() { - synchronized (this) { - if (supervisor == null) { - supervisor = new Supervisor(new Transport("empty-connection-pool")).setDropEmptyBuffers(true); - } - } - return supervisor; - } - } - } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 4c7d0b3122a..5d9a9c7b836 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -8,6 +8,8 @@ import com.yahoo.config.FileReference; import com.yahoo.jrt.Int32Value; import com.yahoo.jrt.Request; import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.vespa.filedistribution.CompressedFileReference; import com.yahoo.vespa.filedistribution.EmptyFileReferenceData; @@ -27,8 +29,7 @@ import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; -import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.createConnectionPool; -import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.emptyConnectionPool; +import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; public class FileServer { private static final Logger log = Logger.getLogger(FileServer.class.getName()); @@ -70,12 +71,16 @@ public class FileServer { @Inject public FileServer(ConfigserverConfig configserverConfig) { this(new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())), - new FileDownloader(createConnectionPool(configserverConfig))); + new FileDownloader(getOtherConfigServersInCluster(configserverConfig), + new Supervisor(new Transport("filedistribution-pool")) + .setDropEmptyBuffers(true))); } // For testing only public FileServer(File rootDir) { - this(rootDir, new FileDownloader(emptyConnectionPool())); + this(rootDir, new FileDownloader(FileDownloader.emptyConnectionPool(), + new Supervisor(new Transport("fileserver-for-testing")) + .setDropEmptyBuffers(true))); } public FileServer(File rootDir, FileDownloader fileDownloader) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java index 64765871676..78f61feed85 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java @@ -4,7 +4,10 @@ package com.yahoo.vespa.config.server.maintenance; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.config.FileReference; import com.yahoo.config.provision.ApplicationId; -import com.yahoo.vespa.config.ConnectionPool; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.config.server.ApplicationRepository; import com.yahoo.vespa.config.server.session.Session; import com.yahoo.vespa.config.server.session.SessionRepository; @@ -19,8 +22,8 @@ import java.io.File; import java.time.Duration; import java.util.logging.Logger; -import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.createConnectionPool; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk; +import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; /** * Verifies that all active sessions has an application package on local disk. @@ -33,8 +36,10 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { private static final Logger log = Logger.getLogger(ApplicationPackageMaintainer.class.getName()); private final ApplicationRepository applicationRepository; - private final ConnectionPool connectionPool; private final File downloadDirectory; + private final ConfigserverConfig configserverConfig; + private final Supervisor supervisor; + ApplicationPackageMaintainer(ApplicationRepository applicationRepository, Curator curator, @@ -42,9 +47,8 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { FlagSource flagSource) { super(applicationRepository, curator, flagSource, applicationRepository.clock().instant(), interval); this.applicationRepository = applicationRepository; - ConfigserverConfig configserverConfig = applicationRepository.configserverConfig(); - connectionPool = createConnectionPool(configserverConfig); - + this.configserverConfig = applicationRepository.configserverConfig(); + this.supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true); downloadDirectory = new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())); } @@ -53,7 +57,7 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { int attempts = 0; int failures = 0; - try (var fileDownloader = new FileDownloader(connectionPool, downloadDirectory, new Downloads())) { + try (var fileDownloader = createFileDownloader()) { for (var applicationId : applicationRepository.listApplications()) { log.fine(() -> "Verifying application package for " + applicationId); Session session = applicationRepository.getActiveSession(applicationId); @@ -81,9 +85,16 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { return asSuccessFactor(attempts, failures); } + private FileDownloader createFileDownloader() { + return new FileDownloader(new JRTConnectionPool(new ConfigSourceSet(getOtherConfigServersInCluster(configserverConfig)), supervisor), + supervisor, + downloadDirectory, + new Downloads()); + } + @Override public void awaitShutdown() { - connectionPool.close(); + supervisor.transport().shutdown().join(); super.awaitShutdown(); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 1f2efdfe17b..ec94b845546 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -4,6 +4,8 @@ package com.yahoo.vespa.config.server.filedistribution; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.config.FileReference; import com.yahoo.io.IOUtils; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; import com.yahoo.net.HostName; import com.yahoo.vespa.filedistribution.Downloads; import com.yahoo.vespa.filedistribution.FileDownloader; @@ -22,7 +24,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.emptyConnectionPool; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -138,7 +139,12 @@ public class FileServerTest { private static class MockFileDownloader extends FileDownloader { public MockFileDownloader(File downloadDirectory) { - super(emptyConnectionPool(), downloadDirectory, new Downloads(), Duration.ofMillis(100), Duration.ofMillis(100)); + super(FileDownloader.emptyConnectionPool(), + new Supervisor(new Transport("mock")).setDropEmptyBuffers(true), + downloadDirectory, + new Downloads(), + Duration.ofMillis(100), + Duration.ofMillis(100)); } } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 58e08cccb99..0eef005311d 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -2,14 +2,17 @@ package com.yahoo.vespa.filedistribution; import com.yahoo.config.FileReference; -import java.util.logging.Level; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.jrt.Supervisor; +import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; +import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.yolean.Exceptions; import java.io.File; import java.time.Duration; - +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -18,6 +21,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -31,27 +35,33 @@ public class FileDownloader implements AutoCloseable { public static File defaultDownloadDirectory = new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")); private final ConnectionPool connectionPool; + private final Supervisor supervisor; private final File downloadDirectory; private final Duration timeout; private final FileReferenceDownloader fileReferenceDownloader; private final Downloads downloads; - public FileDownloader(ConnectionPool connectionPool) { - this(connectionPool, defaultDownloadDirectory, new Downloads()); + public FileDownloader(List<String> configservers, Supervisor supervisor) { + this(getConnectionPool(configservers, supervisor), supervisor); + } + + public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor) { + this(connectionPool, supervisor, defaultDownloadDirectory, new Downloads()); } - public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads) { + public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor, File downloadDirectory, Downloads downloads) { // TODO: Reduce timeout even more, timeout is so long that we might get starvation - this(connectionPool, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10)); + this(connectionPool, supervisor, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10)); } - public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads, + public FileDownloader(ConnectionPool connectionPool, Supervisor supervisor, File downloadDirectory, Downloads downloads, Duration timeout, Duration sleepBetweenRetries) { this.connectionPool = connectionPool; + this.supervisor = supervisor; this.downloadDirectory = downloadDirectory; this.timeout = timeout; - // Needed to receive RPC calls receiveFile* from server after asking for files - new FileReceiver(connectionPool.getSupervisor(), downloads, downloadDirectory); + // Needed to receive RPC receiveFile* calls from server after asking for files + new FileReceiver(supervisor, downloads, downloadDirectory); this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout, sleepBetweenRetries); this.downloads = downloads; } @@ -133,6 +143,33 @@ public class FileDownloader implements AutoCloseable { public void close() { fileReferenceDownloader.close(); + supervisor.transport().shutdown().join(); + } + + private static ConnectionPool getConnectionPool(List<String> configServers, Supervisor supervisor) { + return configServers.size() > 0 + ? new JRTConnectionPool(new ConfigSourceSet(configServers), supervisor) + : emptyConnectionPool(); + } + + public static ConnectionPool emptyConnectionPool() { + return new EmptyConnectionPool(); + } + + private static class EmptyConnectionPool implements ConnectionPool { + + @Override + public void close() { } + + @Override + public Connection getCurrent() { return null; } + + @Override + public Connection switchConnection(Connection connection) { return null; } + + @Override + public int getSize() { return 0; } + } } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 15cc28007ce..6855f7f818c 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -44,6 +44,7 @@ public class FileDownloaderTest { private Downloads downloads; private FileDownloader fileDownloader; private File downloadDir; + private Supervisor supervisor; @Before public void setup() { @@ -51,7 +52,8 @@ public class FileDownloaderTest { downloadDir = Files.createTempDirectory("filedistribution").toFile(); connection = new MockConnection(); downloads = new Downloads(); - fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(1), sleepBetweenRetries); + supervisor = new Supervisor(new Transport()).setDropEmptyBuffers(true); + fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(1), sleepBetweenRetries); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); @@ -61,6 +63,7 @@ public class FileDownloaderTest { @After public void teardown() { fileDownloader.close(); + supervisor.transport().shutdown().join(); } @Test @@ -163,7 +166,7 @@ public class FileDownloaderTest { @Test public void getFileWhenConnectionError() throws IOException { - fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(2), sleepBetweenRetries); + fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(2), sleepBetweenRetries); File downloadDir = fileDownloader.downloadDirectory(); int timesToFail = 2; @@ -197,7 +200,7 @@ public class FileDownloaderTest { public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2); String filename = "abc.jar"; - fileDownloader = new FileDownloader(connection, downloadDir, downloads, Duration.ofSeconds(3), sleepBetweenRetries); + fileDownloader = new FileDownloader(connection, supervisor, downloadDir, downloads, Duration.ofSeconds(3), sleepBetweenRetries); File downloadDir = fileDownloader.downloadDirectory(); // Delay response so that we can make a second request while downloading the file from the first request @@ -237,7 +240,7 @@ public class FileDownloaderTest { Duration timeout = Duration.ofMillis(200); MockConnection connectionPool = new MockConnection(); connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); - FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, downloads, timeout, sleepBetweenRetries); + FileDownloader fileDownloader = new FileDownloader(connectionPool, supervisor, downloadDir, downloads, timeout, sleepBetweenRetries); FileReference xyzzy = new FileReference("xyzzy"); // Should download since we do not have the file on disk fileDownloader.downloadIfNeeded(new FileReferenceDownload(xyzzy)); @@ -339,11 +342,6 @@ public class FileDownloaderTest { return 1; } - @Override - public Supervisor getSupervisor() { - return new Supervisor(new Transport()); - } - void setResponseHandler(ResponseHandler responseHandler) { this.responseHandler = responseHandler; } |