summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-11-24 10:57:07 +0100
committerGitHub <noreply@github.com>2017-11-24 10:57:07 +0100
commit2f5a882a19af695fcb29f04bf349e8c57aa8ba1b (patch)
tree87e0a8a9c8dbba5cf4c08b2aac94d0a960faff0b
parent67dac84dbbabc92340c693039b83e43763b3c4f4 (diff)
parent03b5d9887f7464e2b25a0f2817dbc5e3cfc8f6e8 (diff)
Merge pull request #4263 from vespa-engine/hmusum/chained-download
Implement chained download of file references
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java60
-rw-r--r--configserver/src/main/resources/configserver-app/services.xml1
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java31
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java5
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java18
5 files changed, 100 insertions, 15 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index 9dc94c9fe93..9316a9a5c8e 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -2,24 +2,33 @@
package com.yahoo.vespa.config.server.filedistribution;
import com.google.inject.Inject;
+import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.config.FileReference;
import com.yahoo.config.model.api.FileDistribution;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.io.IOUtils;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Transport;
+import com.yahoo.net.HostName;
+import com.yahoo.vespa.config.Connection;
+import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.config.JRTConnectionPool;
+import com.yahoo.vespa.config.server.ConfigServerSpec;
import com.yahoo.vespa.filedistribution.FileDownloader;
import java.io.File;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
public class FileServer {
private static final Logger log = Logger.getLogger(FileServer.class.getName());
private final FileDirectory root;
private final ExecutorService executor;
- private final FileDownloader downloader = new FileDownloader(new JRTConnectionPool(ConfigSourceSet.createDefault()));
+ private final FileDownloader downloader;
public static class ReplayStatus {
private final int code;
@@ -38,18 +47,21 @@ public class FileServer {
}
@Inject
- public FileServer() {
- this(FileDistribution.getDefaultFileDBPath());
+ public FileServer(ConfigserverConfig configserverConfig) {
+ this(createConnectionPool(configserverConfig), FileDistribution.getDefaultFileDBPath());
}
+ // For testing only
public FileServer(File rootDir) {
- this(rootDir, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
+ this(new EmptyConnectionPool(), rootDir);
}
- public FileServer(File rootDir, ExecutorService executor) {
+ private FileServer(ConnectionPool connectionPool, File rootDir) {
+ this.downloader = new FileDownloader(connectionPool);
this.root = new FileDirectory(rootDir);
- this.executor = executor;
+ this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
+
public boolean hasFile(String fileName) {
return hasFile(new FileReference(fileName));
}
@@ -94,4 +106,40 @@ public class FileServer {
public void download(FileReference fileReference) {
downloader.getFile(fileReference);
}
+
+ public FileDownloader downloader() {
+ return downloader;
+ }
+
+ // Connection pool with all config servers except this one (might be an empty pool if there is only one config server)
+ private static ConnectionPool createConnectionPool(ConfigserverConfig configserverConfig) {
+ List<String> configServers = ConfigServerSpec.fromConfig(configserverConfig)
+ .stream()
+ .filter(spec -> !spec.getHostName().equals(HostName.getLocalhost()))
+ .map(spec -> "tcp/" + spec.getHostName() + ":" + spec.getConfigServerPort())
+ .collect(Collectors.toList());
+
+ return configServers.size() > 0 ? new JRTConnectionPool(new ConfigSourceSet(configServers)) : new EmptyConnectionPool();
+ }
+
+ private static class EmptyConnectionPool implements ConnectionPool {
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void setError(Connection connection, int i) {}
+
+ @Override
+ public Connection getCurrent() { return null; }
+
+ @Override
+ public Connection setNewCurrentConnection() { return null; }
+
+ @Override
+ public int getSize() { return 0; }
+
+ @Override
+ public Supervisor getSupervisor() { return new Supervisor(new Transport()); }
+ }
}
diff --git a/configserver/src/main/resources/configserver-app/services.xml b/configserver/src/main/resources/configserver-app/services.xml
index 635ce07e727..fbab854ae9e 100644
--- a/configserver/src/main/resources/configserver-app/services.xml
+++ b/configserver/src/main/resources/configserver-app/services.xml
@@ -34,6 +34,7 @@
<component id="com.yahoo.config.provision.Zone" bundle="config-provisioning" />
<component id="com.yahoo.vespa.config.server.application.ApplicationConvergenceChecker" bundle="configserver" />
<component id="com.yahoo.vespa.config.server.application.HttpProxy" bundle="configserver" />
+ <component id="com.yahoo.vespa.config.server.filedistribution.FileServer" bundle="configserver" />
<component id="com.yahoo.vespa.serviceview.ConfigServerLocation" bundle="configserver" />
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
index 4913798e5ad..09260987ac0 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
@@ -1,12 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.filedistribution;
+import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.config.FileReference;
import com.yahoo.io.IOUtils;
+import com.yahoo.net.HostName;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -57,6 +60,7 @@ public class FileServerTest {
this.content.complete(content);
}
}
+
@Test
public void requireThatWeCanReplayFile() throws IOException, InterruptedException, ExecutionException {
createCleanDir("12y");
@@ -67,6 +71,33 @@ public class FileServerTest {
cleanup();
}
+ @Test
+ public void requireThatDifferentNumberOfConfigServersWork() throws IOException {
+ // Empty connection pool in tests etc.
+ ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
+ FileServer fileServer = new FileServer(new ConfigserverConfig(builder));
+ assertEquals(0, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize());
+
+ // Empty connection pool when only one server, no use in downloading from yourself
+ List<ConfigserverConfig.Zookeeperserver.Builder> servers = new ArrayList<>();
+ ConfigserverConfig.Zookeeperserver.Builder serverBuilder = new ConfigserverConfig.Zookeeperserver.Builder();
+ serverBuilder.hostname(HostName.getLocalhost());
+ serverBuilder.port(123456);
+ servers.add(serverBuilder);
+ builder.zookeeperserver(servers);
+ fileServer = new FileServer(new ConfigserverConfig(builder));
+ assertEquals(0, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize());
+
+ // connection pool of size 1 when 2 servers
+ ConfigserverConfig.Zookeeperserver.Builder serverBuilder2 = new ConfigserverConfig.Zookeeperserver.Builder();
+ serverBuilder2.hostname("bar");
+ serverBuilder2.port(123456);
+ servers.add(serverBuilder2);
+ builder.zookeeperserver(servers);
+ fileServer = new FileServer(new ConfigserverConfig(builder));
+ assertEquals(1, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize());
+ }
+
private void cleanup() {
created.forEach((file) -> IOUtils.recursiveDeleteDir(file));
created.clear();
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index cd4b3afb9b5..fde410bc8d7 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -146,8 +146,7 @@ public class FileDownloader {
fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
}
- Set<FileReference> queuedDownloads() {
- return fileReferenceDownloader.queuedDownloads();
+ public FileReferenceDownloader fileReferenceDownloader() {
+ return fileReferenceDownloader;
}
-
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 08595662f36..4c9c37dd6da 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
* @author hmusum
*/
// TODO: Handle shutdown of executors
-class FileReferenceDownloader {
+public class FileReferenceDownloader {
private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
private final static Duration rpcTimeout = Duration.ofSeconds(10);
@@ -107,8 +107,7 @@ class FileReferenceDownloader {
Thread.sleep(10);
} catch (InterruptedException e) { /* ignore for now */}
} else {
- log.log(LogLevel.INFO, "Polling queue, found file reference '" +
- fileReferenceDownload.fileReference().value() + "' to download");
+ log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
}
} while (true);
@@ -133,12 +132,16 @@ class FileReferenceDownloader {
return true;
} else {
log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress());
+ connectionPool.setNewCurrentConnection();
return false;
}
} else {
- log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress());
- if (request.isError() && request.errorCode() == ErrorCode.CONNECTION)
- connection.setError(request.errorCode());
+ log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress() +
+ ", error code: " + request.errorCode());
+ if (request.isError() && request.errorCode() == ErrorCode.CONNECTION || request.errorCode() == ErrorCode.TIMEOUT) {
+ log.log(LogLevel.WARNING, "Setting error for connection " + connection.getAddress());
+ connectionPool.setError(connection, request.errorCode());
+ }
return false;
}
}
@@ -181,4 +184,7 @@ class FileReferenceDownloader {
return ImmutableMap.copyOf(downloadStatus);
}
+ public ConnectionPool connectionPool() {
+ return connectionPool;
+ }
}