diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-11-24 10:57:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-24 10:57:07 +0100 |
commit | 2f5a882a19af695fcb29f04bf349e8c57aa8ba1b (patch) | |
tree | 87e0a8a9c8dbba5cf4c08b2aac94d0a960faff0b | |
parent | 67dac84dbbabc92340c693039b83e43763b3c4f4 (diff) | |
parent | 03b5d9887f7464e2b25a0f2817dbc5e3cfc8f6e8 (diff) |
Merge pull request #4263 from vespa-engine/hmusum/chained-download
Implement chained download of file references
5 files changed, 100 insertions, 15 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 9dc94c9fe93..9316a9a5c8e 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -2,24 +2,33 @@ package com.yahoo.vespa.config.server.filedistribution; import com.google.inject.Inject; +import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.config.FileReference; import com.yahoo.config.model.api.FileDistribution; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.io.IOUtils; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.net.HostName; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.config.JRTConnectionPool; +import com.yahoo.vespa.config.server.ConfigServerSpec; import com.yahoo.vespa.filedistribution.FileDownloader; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; +import java.util.stream.Collectors; public class FileServer { private static final Logger log = Logger.getLogger(FileServer.class.getName()); private final FileDirectory root; private final ExecutorService executor; - private final FileDownloader downloader = new FileDownloader(new JRTConnectionPool(ConfigSourceSet.createDefault())); + private final FileDownloader downloader; public static class ReplayStatus { private final int code; @@ -38,18 +47,21 @@ public class FileServer { } @Inject - public FileServer() { - this(FileDistribution.getDefaultFileDBPath()); + public FileServer(ConfigserverConfig configserverConfig) { + this(createConnectionPool(configserverConfig), FileDistribution.getDefaultFileDBPath()); } + // For testing only public FileServer(File rootDir) { - this(rootDir, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + this(new EmptyConnectionPool(), rootDir); } - public FileServer(File rootDir, ExecutorService executor) { + private FileServer(ConnectionPool connectionPool, File rootDir) { + this.downloader = new FileDownloader(connectionPool); this.root = new FileDirectory(rootDir); - this.executor = executor; + this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); } + public boolean hasFile(String fileName) { return hasFile(new FileReference(fileName)); } @@ -94,4 +106,40 @@ public class FileServer { public void download(FileReference fileReference) { downloader.getFile(fileReference); } + + public FileDownloader downloader() { + return downloader; + } + + // Connection pool with all config servers except this one (might be an empty pool if there is only one config server) + private static ConnectionPool createConnectionPool(ConfigserverConfig configserverConfig) { + List<String> configServers = ConfigServerSpec.fromConfig(configserverConfig) + .stream() + .filter(spec -> !spec.getHostName().equals(HostName.getLocalhost())) + .map(spec -> "tcp/" + spec.getHostName() + ":" + spec.getConfigServerPort()) + .collect(Collectors.toList()); + + return configServers.size() > 0 ? new JRTConnectionPool(new ConfigSourceSet(configServers)) : new EmptyConnectionPool(); + } + + private static class EmptyConnectionPool implements ConnectionPool { + + @Override + public void close() {} + + @Override + public void setError(Connection connection, int i) {} + + @Override + public Connection getCurrent() { return null; } + + @Override + public Connection setNewCurrentConnection() { return null; } + + @Override + public int getSize() { return 0; } + + @Override + public Supervisor getSupervisor() { return new Supervisor(new Transport()); } + } } diff --git a/configserver/src/main/resources/configserver-app/services.xml b/configserver/src/main/resources/configserver-app/services.xml index 635ce07e727..fbab854ae9e 100644 --- a/configserver/src/main/resources/configserver-app/services.xml +++ b/configserver/src/main/resources/configserver-app/services.xml @@ -34,6 +34,7 @@ <component id="com.yahoo.config.provision.Zone" bundle="config-provisioning" /> <component id="com.yahoo.vespa.config.server.application.ApplicationConvergenceChecker" bundle="configserver" /> <component id="com.yahoo.vespa.config.server.application.HttpProxy" bundle="configserver" /> + <component id="com.yahoo.vespa.config.server.filedistribution.FileServer" bundle="configserver" /> <component id="com.yahoo.vespa.serviceview.ConfigServerLocation" bundle="configserver" /> diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 4913798e5ad..09260987ac0 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -1,12 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.filedistribution; +import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.config.FileReference; import com.yahoo.io.IOUtils; +import com.yahoo.net.HostName; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -57,6 +60,7 @@ public class FileServerTest { this.content.complete(content); } } + @Test public void requireThatWeCanReplayFile() throws IOException, InterruptedException, ExecutionException { createCleanDir("12y"); @@ -67,6 +71,33 @@ public class FileServerTest { cleanup(); } + @Test + public void requireThatDifferentNumberOfConfigServersWork() throws IOException { + // Empty connection pool in tests etc. + ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); + FileServer fileServer = new FileServer(new ConfigserverConfig(builder)); + assertEquals(0, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize()); + + // Empty connection pool when only one server, no use in downloading from yourself + List<ConfigserverConfig.Zookeeperserver.Builder> servers = new ArrayList<>(); + ConfigserverConfig.Zookeeperserver.Builder serverBuilder = new ConfigserverConfig.Zookeeperserver.Builder(); + serverBuilder.hostname(HostName.getLocalhost()); + serverBuilder.port(123456); + servers.add(serverBuilder); + builder.zookeeperserver(servers); + fileServer = new FileServer(new ConfigserverConfig(builder)); + assertEquals(0, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize()); + + // connection pool of size 1 when 2 servers + ConfigserverConfig.Zookeeperserver.Builder serverBuilder2 = new ConfigserverConfig.Zookeeperserver.Builder(); + serverBuilder2.hostname("bar"); + serverBuilder2.port(123456); + servers.add(serverBuilder2); + builder.zookeeperserver(servers); + fileServer = new FileServer(new ConfigserverConfig(builder)); + assertEquals(1, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize()); + } + private void cleanup() { created.forEach((file) -> IOUtils.recursiveDeleteDir(file)); created.clear(); diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index cd4b3afb9b5..fde410bc8d7 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -146,8 +146,7 @@ public class FileDownloader { fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); } - Set<FileReference> queuedDownloads() { - return fileReferenceDownloader.queuedDownloads(); + public FileReferenceDownloader fileReferenceDownloader() { + return fileReferenceDownloader; } - } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 08595662f36..4c9c37dd6da 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -37,7 +37,7 @@ import java.util.stream.Collectors; * @author hmusum */ // TODO: Handle shutdown of executors -class FileReferenceDownloader { +public class FileReferenceDownloader { private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); private final static Duration rpcTimeout = Duration.ofSeconds(10); @@ -107,8 +107,7 @@ class FileReferenceDownloader { Thread.sleep(10); } catch (InterruptedException e) { /* ignore for now */} } else { - log.log(LogLevel.INFO, "Polling queue, found file reference '" + - fileReferenceDownload.fileReference().value() + "' to download"); + log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'"); downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); } } while (true); @@ -133,12 +132,16 @@ class FileReferenceDownloader { return true; } else { log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress()); + connectionPool.setNewCurrentConnection(); return false; } } else { - log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress()); - if (request.isError() && request.errorCode() == ErrorCode.CONNECTION) - connection.setError(request.errorCode()); + log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress() + + ", error code: " + request.errorCode()); + if (request.isError() && request.errorCode() == ErrorCode.CONNECTION || request.errorCode() == ErrorCode.TIMEOUT) { + log.log(LogLevel.WARNING, "Setting error for connection " + connection.getAddress()); + connectionPool.setError(connection, request.errorCode()); + } return false; } } @@ -181,4 +184,7 @@ class FileReferenceDownloader { return ImmutableMap.copyOf(downloadStatus); } + public ConnectionPool connectionPool() { + return connectionPool; + } } |