diff options
Diffstat (limited to 'configserver')
7 files changed, 135 insertions, 13 deletions
diff --git a/configserver/pom.xml b/configserver/pom.xml index 8776fbd5ad1..4ebb76bd5fe 100644 --- a/configserver/pom.xml +++ b/configserver/pom.xml @@ -172,6 +172,11 @@ <artifactId>jersey-proxy-client</artifactId> </dependency> <dependency> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java index 3693bfb361c..36b0138ad36 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java @@ -22,11 +22,8 @@ public class FileDistributionProvider { public FileDistributionProvider(File applicationDir, String zooKeepersSpec, String applicationId, Lock fileDistributionLock) { ensureDirExists(FileDistribution.getDefaultFileDBPath()); final FileDistributionManager manager = new FileDistributionManager( - FileDistribution.getDefaultFileDBPath(), - applicationDir, - zooKeepersSpec, - applicationId, - fileDistributionLock); + FileDistribution.getDefaultFileDBPath(), applicationDir, + zooKeepersSpec, applicationId, fileDistributionLock); this.fileDistribution = new FileDBHandler(manager); this.fileRegistry = new FileDBRegistry(manager); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java new file mode 100644 index 00000000000..eb8fb5bf4d7 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -0,0 +1,82 @@ +package com.yahoo.vespa.config.server.filedistribution; + +import com.google.inject.Inject; +import com.yahoo.config.FileReference; +import com.yahoo.config.model.api.FileDistribution; +import com.yahoo.io.IOUtils; +import com.yahoo.jrt.DataValue; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Int64Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Target; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class FileServer { + private static final Logger log = Logger.getLogger(FileServer.class.getName()); + private final String rootDir; + private final ExecutorService executor; + + private String getPath(FileReference ref) { + return rootDir + "/" + ref.value(); + } + + @Inject + public FileServer() { + this(FileDistribution.getDefaultFileDBRoot()); + } + + public FileServer(String rootDir) { + this(rootDir, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + } + + public FileServer(String rootDir, ExecutorService executor) { + this.rootDir = rootDir; + this.executor = executor; + } + public boolean hasFile(String fileName) { + return hasFile(new FileReference(fileName)); + } + public boolean hasFile(FileReference ref) { + return new File(getPath(ref)).exists(); + } + public boolean startFileServing(String fileName, Target target) { + File file = new File(getPath(new FileReference(fileName))); + + if (file.exists()) { + executor.execute(() -> serveFile(fileName, target)); + } + return false; + } + + private void serveFile(String fileName, Target target) { + Request fileBlob = new Request("filedistribution.receiveFile"); + File file = new File(getPath(new FileReference(fileName))); + fileBlob.parameters().add(new StringValue(fileName)); + fileBlob.parameters().add(new StringValue(fileName)); + byte [] blob = new byte [0]; + boolean success = false; + String errorDescription = "OK"; + try { + blob = IOUtils.readFileBytes(file); + success = true; + } catch (IOException e) { + errorDescription = "Failed reading file '" + file.getAbsolutePath() + "'"; + log.warning(errorDescription + "for sending to '" + target.toString() + "'."); + } + XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + fileBlob.parameters().add(new DataValue(blob)); + fileBlob.parameters().add(new Int64Value(hasher.hash(ByteBuffer.wrap(blob), 0))); + fileBlob.parameters().add(new Int32Value(success ? 0 : 1)); + fileBlob.parameters().add(new StringValue(success ? "OK" : errorDescription)); + target.invokeSync(fileBlob, 600); + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 3c9917bf17e..3ae6a8a1af9 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -17,6 +17,7 @@ import com.yahoo.jrt.Spec; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; +import com.yahoo.jrt.Value; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.ErrorCode; import com.yahoo.vespa.config.JRTMethods; @@ -27,6 +28,7 @@ import com.yahoo.vespa.config.protocol.Trace; import com.yahoo.vespa.config.server.SuperModelRequestHandler; import com.yahoo.vespa.config.server.application.ApplicationSet; import com.yahoo.vespa.config.server.GetConfigContext; +import com.yahoo.vespa.config.server.filedistribution.FileServer; import com.yahoo.vespa.config.server.host.HostRegistries; import com.yahoo.vespa.config.server.host.HostRegistry; import com.yahoo.vespa.config.server.ReloadListener; @@ -68,6 +70,18 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { static final int TRACELEVEL_DEBUG = 9; private static final String THREADPOOL_NAME = "rpcserver worker pool"; private static final long SHUTDOWN_TIMEOUT = 60; + private enum FileApiErrorCodes { + OK(0, "OK"), + NOT_FOUND(1, "Filereference not found"); + private final int code; + private final String description; + FileApiErrorCodes(int code, String description) { + this.code = code; + this.description = description; + } + int getCode() { return code; } + String getDescription() { return description; } + } private final Supervisor supervisor = new Supervisor(new Transport()); private Spec spec = null; private final boolean useRequestVersion; @@ -83,6 +97,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { private final MetricUpdater metrics; private final MetricUpdaterFactory metricUpdaterFactory; private final HostLivenessTracker hostLivenessTracker; + private final FileServer fileServer; private final ThreadPoolExecutor executorService; private volatile boolean allTenantsLoaded = false; @@ -93,20 +108,23 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { * @param config The config to use for setting up this server */ @Inject - public RpcServer(ConfigserverConfig config, SuperModelRequestHandler superModelRequestHandler, MetricUpdaterFactory metrics, - HostRegistries hostRegistries, HostLivenessTracker hostLivenessTracker) { + public RpcServer(ConfigserverConfig config, SuperModelRequestHandler superModelRequestHandler, + MetricUpdaterFactory metrics, HostRegistries hostRegistries, + HostLivenessTracker hostLivenessTracker, FileServer fileServer) { this.superModelRequestHandler = superModelRequestHandler; - this.metricUpdaterFactory = metrics; - this.supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize()); + metricUpdaterFactory = metrics; + supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize()); this.metrics = metrics.getOrCreateMetricUpdater(Collections.<String, String>emptyMap()); this.hostLivenessTracker = hostLivenessTracker; BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(config.maxgetconfigclients()); - executorService = new ThreadPoolExecutor(config.numthreads(), config.numthreads(), 0, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getThreadFactory(THREADPOOL_NAME)); + executorService = new ThreadPoolExecutor(config.numthreads(), config.numthreads(), + 0, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getThreadFactory(THREADPOOL_NAME)); delayedConfigResponses = new DelayedConfigResponses(this, config.numDelayedResponseThreads()); spec = new Spec(null, config.rpcport()); hostRegistry = hostRegistries.getTenantHostRegistry(); this.useRequestVersion = config.useVespaVersionInRequest(); this.hostedVespa = config.hostedVespa(); + this.fileServer = fileServer; setUpHandlers(); } @@ -180,6 +198,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { getSupervisor().addMethod(new Method("printStatistics", "", "s", this, "printStatistics") .methodDesc("printStatistics") .returnDesc(0, "statistics", "Statistics for server")); + getSupervisor().addMethod(new Method("filedistribution.serveFile", "s", "is", this, "serveFile")); } /** @@ -402,4 +421,17 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return useRequestVersion; } + @SuppressWarnings("UnusedDeclaration") + public final void serveFile(Request request) { + String fileReference = request.parameters().get(0).asString(); + FileApiErrorCodes result = fileServer.hasFile(fileReference) + ? FileApiErrorCodes.OK + : FileApiErrorCodes.NOT_FOUND; + if (result == FileApiErrorCodes.OK) { + fileServer.startFileServing(fileReference, request.target()); + } + request.returnValues() + .add(new Int32Value(result.getCode())) + .add(new StringValue(result.getDescription())); + } } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java index 3efad7ac133..acda60049ab 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java @@ -5,8 +5,10 @@ import com.google.common.io.Files; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.config.model.NullConfigModelRegistry; import com.yahoo.config.model.api.ConfigDefinitionRepo; +import com.yahoo.config.model.api.FileDistribution; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.config.server.application.PermanentApplicationPackage; +import com.yahoo.vespa.config.server.filedistribution.FileServer; import com.yahoo.vespa.config.server.host.ConfigRequestHostLivenessTracker; import com.yahoo.vespa.config.server.host.HostRegistries; import com.yahoo.vespa.config.server.http.SessionHandlerTest; @@ -61,7 +63,7 @@ public class InjectedGlobalComponentRegistryTest { serverDB = new ConfigServerDB(configserverConfig); sessionPreparer = new SessionTest.MockSessionPreparer(); rpcServer = new RpcServer(configserverConfig, null, Metrics.createTestMetrics(), - new HostRegistries(), new ConfigRequestHostLivenessTracker()); + new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot())); generationCounter = new SuperModelGenerationCounter(curator); defRepo = new StaticConfigDefinitionRepo(); permanentApplicationPackage = new PermanentApplicationPackage(configserverConfig); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java index a08514e8afb..b094a741f34 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java @@ -2,11 +2,13 @@ package com.yahoo.vespa.config.server.rpc; import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.config.model.api.FileDistribution; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.Version; import com.yahoo.vespa.config.protocol.ConfigResponse; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.server.GetConfigContext; +import com.yahoo.vespa.config.server.filedistribution.FileServer; import com.yahoo.vespa.config.server.host.ConfigRequestHostLivenessTracker; import com.yahoo.vespa.config.server.host.HostRegistries; import com.yahoo.vespa.config.server.monitoring.Metrics; @@ -37,7 +39,7 @@ public class MockRpc extends RpcServer { public MockRpc(int port, boolean createDefaultTenant, boolean pretendToHaveLoadedAnyApplication) { super(createConfig(port), null, Metrics.createTestMetrics(), - new HostRegistries(), new ConfigRequestHostLivenessTracker()); + new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot())); if (createDefaultTenant) { onTenantCreate(TenantName.from("default"), new MockTenantProvider(pretendToHaveLoadedAnyApplication)); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java index fa6adb64a8a..933cb770dd1 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.config.server.rpc; import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.config.model.api.FileDistribution; import com.yahoo.config.provision.HostLivenessTracker; import com.yahoo.config.provision.TenantName; import com.yahoo.jrt.Request; @@ -12,6 +13,7 @@ import com.yahoo.net.HostName; import com.yahoo.test.ManualClock; import com.yahoo.vespa.config.GenerationCounter; import com.yahoo.vespa.config.server.*; +import com.yahoo.vespa.config.server.filedistribution.FileServer; import com.yahoo.vespa.config.server.host.ConfigRequestHostLivenessTracker; import com.yahoo.vespa.config.server.host.HostRegistries; import com.yahoo.vespa.config.server.monitoring.Metrics; @@ -88,7 +90,7 @@ public class TestWithRpc { emptyNodeFlavors(), generationCounter)), Metrics.createTestMetrics(), new HostRegistries(), - hostLivenessTracker); + hostLivenessTracker, new FileServer(FileDistribution.getDefaultFileDBRoot())); rpcServer.onTenantCreate(TenantName.from("default"), tenantProvider); t = new Thread(rpcServer); t.start(); |