summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@yahoo-inc.com>2017-11-08 10:31:40 +0100
committerGitHub <noreply@github.com>2017-11-08 10:31:40 +0100
commit87791e3ee279c2e0cc22db9c474b8ad1be53789d (patch)
tree3418206543e2807e13fe5dd3344a6f903a57a769
parent0eca7c0fa0dd318ccd20bfac3da526e1cfdfd4cb (diff)
parentc90fa829e3593062ea397b6a8527178f7616d613 (diff)
Merge pull request #4034 from vespa-engine/balder/add-filedist-rpc-to-configserver
Balder/add filedist rpc to configserver
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/FileDistribution.java6
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java5
-rw-r--r--configserver/pom.xml5
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java7
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java82
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java42
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java4
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java4
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java4
9 files changed, 144 insertions, 15 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/FileDistribution.java b/config-model-api/src/main/java/com/yahoo/config/model/api/FileDistribution.java
index a5d54fb84b3..d2365bcbf00 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/FileDistribution.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/FileDistribution.java
@@ -22,8 +22,12 @@ public interface FileDistribution {
void limitSendingOfDeployedFilesTo(Collection<String> hostNames);
void removeDeploymentsThatHaveDifferentApplicationId(Collection<String> targetHostnames);
+ static String getDefaultFileDBRoot() {
+ return Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution");
+ }
+
static File getDefaultFileDBPath() {
- return new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution"));
+ return new File(getDefaultFileDBRoot());
}
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index 01f2c161247..ae0360fecf2 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -131,12 +131,15 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
- supervisor.addMethod(new Method("filedistribution.receiveFile", "slx", "i", // TODO Temporary method to get started with testing
+ supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
this, "receiveFile")
.methodDesc("receive file reference content")
.paramDesc(0, "file references", "file reference to download")
.paramDesc(1, "filename", "filename")
.paramDesc(2, "content", "array of bytes")
+ .paramDesc(3, "hash", "xx64hash of the file content")
+ .paramDesc(4, "errorcode", "Error code. 0 if none")
+ .paramDesc(5, "error-description", "Error description.")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
diff --git a/configserver/pom.xml b/configserver/pom.xml
index 8776fbd5ad1..4ebb76bd5fe 100644
--- a/configserver/pom.xml
+++ b/configserver/pom.xml
@@ -172,6 +172,11 @@
<artifactId>jersey-proxy-client</artifactId>
</dependency>
<dependency>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java
index 3693bfb361c..36b0138ad36 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java
@@ -22,11 +22,8 @@ public class FileDistributionProvider {
public FileDistributionProvider(File applicationDir, String zooKeepersSpec, String applicationId, Lock fileDistributionLock) {
ensureDirExists(FileDistribution.getDefaultFileDBPath());
final FileDistributionManager manager = new FileDistributionManager(
- FileDistribution.getDefaultFileDBPath(),
- applicationDir,
- zooKeepersSpec,
- applicationId,
- fileDistributionLock);
+ FileDistribution.getDefaultFileDBPath(), applicationDir,
+ zooKeepersSpec, applicationId, fileDistributionLock);
this.fileDistribution = new FileDBHandler(manager);
this.fileRegistry = new FileDBRegistry(manager);
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
new file mode 100644
index 00000000000..eb8fb5bf4d7
--- /dev/null
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -0,0 +1,82 @@
+package com.yahoo.vespa.config.server.filedistribution;
+
+import com.google.inject.Inject;
+import com.yahoo.config.FileReference;
+import com.yahoo.config.model.api.FileDistribution;
+import com.yahoo.io.IOUtils;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int64Value;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Target;
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+public class FileServer {
+ private static final Logger log = Logger.getLogger(FileServer.class.getName());
+ private final String rootDir;
+ private final ExecutorService executor;
+
+ private String getPath(FileReference ref) {
+ return rootDir + "/" + ref.value();
+ }
+
+ @Inject
+ public FileServer() {
+ this(FileDistribution.getDefaultFileDBRoot());
+ }
+
+ public FileServer(String rootDir) {
+ this(rootDir, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
+ }
+
+ public FileServer(String rootDir, ExecutorService executor) {
+ this.rootDir = rootDir;
+ this.executor = executor;
+ }
+ public boolean hasFile(String fileName) {
+ return hasFile(new FileReference(fileName));
+ }
+ public boolean hasFile(FileReference ref) {
+ return new File(getPath(ref)).exists();
+ }
+ public boolean startFileServing(String fileName, Target target) {
+ File file = new File(getPath(new FileReference(fileName)));
+
+ if (file.exists()) {
+ executor.execute(() -> serveFile(fileName, target));
+ }
+ return false;
+ }
+
+ private void serveFile(String fileName, Target target) {
+ Request fileBlob = new Request("filedistribution.receiveFile");
+ File file = new File(getPath(new FileReference(fileName)));
+ fileBlob.parameters().add(new StringValue(fileName));
+ fileBlob.parameters().add(new StringValue(fileName));
+ byte [] blob = new byte [0];
+ boolean success = false;
+ String errorDescription = "OK";
+ try {
+ blob = IOUtils.readFileBytes(file);
+ success = true;
+ } catch (IOException e) {
+ errorDescription = "Failed reading file '" + file.getAbsolutePath() + "'";
+ log.warning(errorDescription + "for sending to '" + target.toString() + "'.");
+ }
+ XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
+ fileBlob.parameters().add(new DataValue(blob));
+ fileBlob.parameters().add(new Int64Value(hasher.hash(ByteBuffer.wrap(blob), 0)));
+ fileBlob.parameters().add(new Int32Value(success ? 0 : 1));
+ fileBlob.parameters().add(new StringValue(success ? "OK" : errorDescription));
+ target.invokeSync(fileBlob, 600);
+ }
+}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 3c9917bf17e..3ae6a8a1af9 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -17,6 +17,7 @@ import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
+import com.yahoo.jrt.Value;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.ErrorCode;
import com.yahoo.vespa.config.JRTMethods;
@@ -27,6 +28,7 @@ import com.yahoo.vespa.config.protocol.Trace;
import com.yahoo.vespa.config.server.SuperModelRequestHandler;
import com.yahoo.vespa.config.server.application.ApplicationSet;
import com.yahoo.vespa.config.server.GetConfigContext;
+import com.yahoo.vespa.config.server.filedistribution.FileServer;
import com.yahoo.vespa.config.server.host.HostRegistries;
import com.yahoo.vespa.config.server.host.HostRegistry;
import com.yahoo.vespa.config.server.ReloadListener;
@@ -68,6 +70,18 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
static final int TRACELEVEL_DEBUG = 9;
private static final String THREADPOOL_NAME = "rpcserver worker pool";
private static final long SHUTDOWN_TIMEOUT = 60;
+ private enum FileApiErrorCodes {
+ OK(0, "OK"),
+ NOT_FOUND(1, "Filereference not found");
+ private final int code;
+ private final String description;
+ FileApiErrorCodes(int code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+ int getCode() { return code; }
+ String getDescription() { return description; }
+ }
private final Supervisor supervisor = new Supervisor(new Transport());
private Spec spec = null;
private final boolean useRequestVersion;
@@ -83,6 +97,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
private final MetricUpdater metrics;
private final MetricUpdaterFactory metricUpdaterFactory;
private final HostLivenessTracker hostLivenessTracker;
+ private final FileServer fileServer;
private final ThreadPoolExecutor executorService;
private volatile boolean allTenantsLoaded = false;
@@ -93,20 +108,23 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
* @param config The config to use for setting up this server
*/
@Inject
- public RpcServer(ConfigserverConfig config, SuperModelRequestHandler superModelRequestHandler, MetricUpdaterFactory metrics,
- HostRegistries hostRegistries, HostLivenessTracker hostLivenessTracker) {
+ public RpcServer(ConfigserverConfig config, SuperModelRequestHandler superModelRequestHandler,
+ MetricUpdaterFactory metrics, HostRegistries hostRegistries,
+ HostLivenessTracker hostLivenessTracker, FileServer fileServer) {
this.superModelRequestHandler = superModelRequestHandler;
- this.metricUpdaterFactory = metrics;
- this.supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize());
+ metricUpdaterFactory = metrics;
+ supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize());
this.metrics = metrics.getOrCreateMetricUpdater(Collections.<String, String>emptyMap());
this.hostLivenessTracker = hostLivenessTracker;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(config.maxgetconfigclients());
- executorService = new ThreadPoolExecutor(config.numthreads(), config.numthreads(), 0, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getThreadFactory(THREADPOOL_NAME));
+ executorService = new ThreadPoolExecutor(config.numthreads(), config.numthreads(),
+ 0, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getThreadFactory(THREADPOOL_NAME));
delayedConfigResponses = new DelayedConfigResponses(this, config.numDelayedResponseThreads());
spec = new Spec(null, config.rpcport());
hostRegistry = hostRegistries.getTenantHostRegistry();
this.useRequestVersion = config.useVespaVersionInRequest();
this.hostedVespa = config.hostedVespa();
+ this.fileServer = fileServer;
setUpHandlers();
}
@@ -180,6 +198,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
getSupervisor().addMethod(new Method("printStatistics", "", "s", this, "printStatistics")
.methodDesc("printStatistics")
.returnDesc(0, "statistics", "Statistics for server"));
+ getSupervisor().addMethod(new Method("filedistribution.serveFile", "s", "is", this, "serveFile"));
}
/**
@@ -402,4 +421,17 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
return useRequestVersion;
}
+ @SuppressWarnings("UnusedDeclaration")
+ public final void serveFile(Request request) {
+ String fileReference = request.parameters().get(0).asString();
+ FileApiErrorCodes result = fileServer.hasFile(fileReference)
+ ? FileApiErrorCodes.OK
+ : FileApiErrorCodes.NOT_FOUND;
+ if (result == FileApiErrorCodes.OK) {
+ fileServer.startFileServing(fileReference, request.target());
+ }
+ request.returnValues()
+ .add(new Int32Value(result.getCode()))
+ .add(new StringValue(result.getDescription()));
+ }
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java
index 3efad7ac133..acda60049ab 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java
@@ -5,8 +5,10 @@ import com.google.common.io.Files;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.config.model.NullConfigModelRegistry;
import com.yahoo.config.model.api.ConfigDefinitionRepo;
+import com.yahoo.config.model.api.FileDistribution;
import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.config.server.application.PermanentApplicationPackage;
+import com.yahoo.vespa.config.server.filedistribution.FileServer;
import com.yahoo.vespa.config.server.host.ConfigRequestHostLivenessTracker;
import com.yahoo.vespa.config.server.host.HostRegistries;
import com.yahoo.vespa.config.server.http.SessionHandlerTest;
@@ -61,7 +63,7 @@ public class InjectedGlobalComponentRegistryTest {
serverDB = new ConfigServerDB(configserverConfig);
sessionPreparer = new SessionTest.MockSessionPreparer();
rpcServer = new RpcServer(configserverConfig, null, Metrics.createTestMetrics(),
- new HostRegistries(), new ConfigRequestHostLivenessTracker());
+ new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot()));
generationCounter = new SuperModelGenerationCounter(curator);
defRepo = new StaticConfigDefinitionRepo();
permanentApplicationPackage = new PermanentApplicationPackage(configserverConfig);
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java
index a08514e8afb..b094a741f34 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java
@@ -2,11 +2,13 @@
package com.yahoo.vespa.config.server.rpc;
import com.yahoo.cloud.config.ConfigserverConfig;
+import com.yahoo.config.model.api.FileDistribution;
import com.yahoo.config.provision.TenantName;
import com.yahoo.config.provision.Version;
import com.yahoo.vespa.config.protocol.ConfigResponse;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.server.GetConfigContext;
+import com.yahoo.vespa.config.server.filedistribution.FileServer;
import com.yahoo.vespa.config.server.host.ConfigRequestHostLivenessTracker;
import com.yahoo.vespa.config.server.host.HostRegistries;
import com.yahoo.vespa.config.server.monitoring.Metrics;
@@ -37,7 +39,7 @@ public class MockRpc extends RpcServer {
public MockRpc(int port, boolean createDefaultTenant, boolean pretendToHaveLoadedAnyApplication) {
super(createConfig(port), null, Metrics.createTestMetrics(),
- new HostRegistries(), new ConfigRequestHostLivenessTracker());
+ new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot()));
if (createDefaultTenant) {
onTenantCreate(TenantName.from("default"), new MockTenantProvider(pretendToHaveLoadedAnyApplication));
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java
index fa6adb64a8a..933cb770dd1 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java
@@ -2,6 +2,7 @@
package com.yahoo.vespa.config.server.rpc;
import com.yahoo.cloud.config.ConfigserverConfig;
+import com.yahoo.config.model.api.FileDistribution;
import com.yahoo.config.provision.HostLivenessTracker;
import com.yahoo.config.provision.TenantName;
import com.yahoo.jrt.Request;
@@ -12,6 +13,7 @@ import com.yahoo.net.HostName;
import com.yahoo.test.ManualClock;
import com.yahoo.vespa.config.GenerationCounter;
import com.yahoo.vespa.config.server.*;
+import com.yahoo.vespa.config.server.filedistribution.FileServer;
import com.yahoo.vespa.config.server.host.ConfigRequestHostLivenessTracker;
import com.yahoo.vespa.config.server.host.HostRegistries;
import com.yahoo.vespa.config.server.monitoring.Metrics;
@@ -88,7 +90,7 @@ public class TestWithRpc {
emptyNodeFlavors(),
generationCounter)),
Metrics.createTestMetrics(), new HostRegistries(),
- hostLivenessTracker);
+ hostLivenessTracker, new FileServer(FileDistribution.getDefaultFileDBRoot()));
rpcServer.onTenantCreate(TenantName.from("default"), tenantProvider);
t = new Thread(rpcServer);
t.start();