diff options
author | Jon Marius Venstad <jvenstad@yahoo-inc.com> | 2017-12-11 13:12:10 +0100 |
---|---|---|
committer | Jon Marius Venstad <jvenstad@yahoo-inc.com> | 2017-12-11 13:12:10 +0100 |
commit | 6b06b462bf35616ad55a77fa4d88a8ded70a995c (patch) | |
tree | c58e81650103d0fb6397b8957d7e312777842658 /configserver | |
parent | 95515f4f75ff2a67f72d35cd3c3d938b898d5efe (diff) | |
parent | 3a96ac4522946e4b937e09a52772793dff587aa7 (diff) |
Need to change new code as well
Diffstat (limited to 'configserver')
6 files changed, 129 insertions, 19 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java index 58c61134bc4..6ab98f5af1c 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.config.server; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.config.provision.Deployer; +import com.yahoo.container.jdisc.state.StateMonitor; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.server.rpc.RpcServer; import com.yahoo.vespa.config.server.version.VersionState; @@ -23,17 +24,19 @@ public class ConfigServerBootstrap extends AbstractComponent implements Runnable private final Thread serverThread; private final Deployer deployer; private final VersionState versionState; + private final StateMonitor stateMonitor; // The tenants object is injected so that all initial requests handlers are // added to the rpc server before it starts answering rpc requests. - @SuppressWarnings("UnusedParameters") + @SuppressWarnings("WeakerAccess") @Inject public ConfigServerBootstrap(ApplicationRepository applicationRepository, RpcServer server, - Deployer deployer, VersionState versionState) { + Deployer deployer, VersionState versionState, StateMonitor stateMonitor) { this.applicationRepository = applicationRepository; this.server = server; this.deployer = deployer; this.versionState = versionState; + this.stateMonitor = stateMonitor; this.serverThread = new Thread(this, "configserver main"); serverThread.start(); } @@ -62,9 +65,11 @@ public class ConfigServerBootstrap extends AbstractComponent implements Runnable log.log(LogLevel.INFO, "All applications redeployed"); } versionState.saveNewVersion(); + stateMonitor.status(StateMonitor.Status.up); log.log(LogLevel.DEBUG, "Starting RPC server"); server.run(); log.log(LogLevel.DEBUG, "RPC server stopped"); + stateMonitor.status(StateMonitor.Status.down); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index b1277682849..81f5e62016a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -20,6 +20,8 @@ import com.yahoo.vespa.config.server.ConfigServerSpec; import com.yahoo.vespa.filedistribution.CompressedFileReference; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReferenceData; +import com.yahoo.vespa.filedistribution.FileReferenceDataBlob; +import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import java.io.File; import java.io.IOException; @@ -110,7 +112,7 @@ public class FileServer { log.info("Start serving reference '" + reference.value() + "' with file '" + file.getAbsolutePath() + "'"); boolean success = false; String errorDescription = "OK"; - FileReferenceData fileData = FileReferenceData.empty(reference, file.getName()); + FileReferenceData fileData = FileReferenceDataBlob.empty(reference, file.getName()); try { fileData = readFileReferenceData(reference); success = true; @@ -124,21 +126,16 @@ public class FileServer { log.info("Done serving reference '" + reference.toString() + "' with file '" + file.getAbsolutePath() + "'"); } - private FileReferenceData readFileReferenceData(FileReference reference) throws IOException { File file = root.getFile(reference); - byte[] blob; - FileReferenceData.Type type; if (file.isDirectory()) { - type = FileReferenceData.Type.compressed; - blob = CompressedFileReference.compress(file.getParentFile()); + //TODO Here we should compress to file, but then we have to clean up too. Pending. + byte [] blob = CompressedFileReference.compress(file.getParentFile()); + return new FileReferenceDataBlob(reference, file.getName(), FileReferenceData.Type.compressed, blob); } else { - type = FileReferenceData.Type.file; - blob = IOUtils.readFileBytes(file); + return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file); } - - return new FileReferenceData(reference, file.getName(), type, blob); } public void serveFile(Request request, Receiver receiver) { pullExecutor.execute(() -> serveFile(request.parameters().get(0).asString(), request, receiver)); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 17368b48e59..a09087ad8d3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -41,8 +41,10 @@ import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory; import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider; import com.yahoo.vespa.config.server.tenant.TenantListener; import com.yahoo.vespa.config.server.tenant.Tenants; +import com.yahoo.vespa.filedistribution.FileReceiver; import com.yahoo.vespa.filedistribution.FileReferenceData; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -93,6 +95,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { private final FileServer fileServer; private final ThreadPoolExecutor executorService; + private final boolean useChunkedFileTransfer; private volatile boolean allTenantsLoaded = false; /** @@ -118,6 +121,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { this.useRequestVersion = config.useVespaVersionInRequest(); this.hostedVespa = config.hostedVespa(); this.fileServer = fileServer; + this.useChunkedFileTransfer = config.usechunkedtransfer(); setUpHandlers(); } @@ -413,9 +417,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return useRequestVersion; } - class FileReceiver implements FileServer.Receiver { + class WholeFileReceiver implements FileServer.Receiver { Target target; - FileReceiver(Target target) { + WholeFileReceiver(Target target) { this.target = target; } @@ -426,11 +430,11 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { - Request fileBlob = new Request("filedistribution.receiveFile"); + Request fileBlob = new Request(FileReceiver.RECEIVE_METHOD); fileBlob.parameters().add(new StringValue(fileData.fileReference().value())); fileBlob.parameters().add(new StringValue(fileData.filename())); fileBlob.parameters().add(new StringValue(fileData.type().name())); - fileBlob.parameters().add(new DataValue(fileData.content())); + fileBlob.parameters().add(new DataValue(fileData.content().array())); fileBlob.parameters().add(new Int64Value(fileData.xxhash())); fileBlob.parameters().add(new Int32Value(status.getCode())); fileBlob.parameters().add(new StringValue(status.getDescription())); @@ -442,9 +446,104 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } } + class ChunkedFileReceiver implements FileServer.Receiver { + Target target; + ChunkedFileReceiver(Target target) { + this.target = target; + } + + @Override + public String toString() { + return target.toString(); + } + + @Override + public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { + int session = sendMeta(fileData); + sendParts(session, fileData); + sendEof(session, fileData, status); + Request fileBlob = new Request(FileReceiver.RECEIVE_META_METHOD); + fileBlob.parameters().add(new StringValue(fileData.fileReference().value())); + fileBlob.parameters().add(new StringValue(fileData.filename())); + fileBlob.parameters().add(new StringValue(fileData.type().name())); + fileBlob.parameters().add(new Int64Value(fileData.size())); + target.invokeSync(fileBlob, 600); + if (fileBlob.isError()) { + log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + fileBlob.errorMessage() + "'."); + } + } + private void sendParts(int session, FileReferenceData fileData) { + ByteBuffer bb = ByteBuffer.allocate(0x100000); + for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) { + byte [] buf = bb.array(); + if (buf.length != bb.position()) { + buf = new byte [bb.position()]; + bb.flip(); + bb.get(buf); + } + sendPart(session, fileData.fileReference(), partId, buf); + bb.clear(); + } + } + private int sendMeta(FileReferenceData fileData) { + Request request = new Request(FileReceiver.RECEIVE_META_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new StringValue(fileData.filename())); + request.parameters().add(new StringValue(fileData.type().name())); + request.parameters().add(new Int64Value(fileData.size())); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + return request.returnValues().get(1).asInt32(); + } + private void sendPart(int session, FileReference ref, int partId, byte [] buf) { + Request request = new Request(FileReceiver.RECEIVE_PART_METHOD); + request.parameters().add(new StringValue(ref.value())); + request.parameters().add(new Int32Value(session)); + request.parameters().add(new Int32Value(partId)); + request.parameters().add(new DataValue(buf)); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering reference '" + ref.value() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + } + private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) { + Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new Int32Value(session)); + request.parameters().add(new Int64Value(fileData.xxhash())); + request.parameters().add(new Int32Value(status.getCode())); + request.parameters().add(new StringValue(status.getDescription())); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + } + } + @SuppressWarnings("UnusedDeclaration") public final void serveFile(Request request) { request.detach(); - fileServer.serveFile(request, new FileReceiver(request.target())); + FileServer.Receiver receiver = useChunkedFileTransfer + ? new ChunkedFileReceiver(request.target()) + : new WholeFileReceiver(request.target()); + fileServer.serveFile(request, receiver); } } diff --git a/configserver/src/main/resources/configserver-app/services.xml b/configserver/src/main/resources/configserver-app/services.xml index eddba1ee768..fd77fedd789 100644 --- a/configserver/src/main/resources/configserver-app/services.xml +++ b/configserver/src/main/resources/configserver-app/services.xml @@ -6,6 +6,10 @@ <maxthreads>100</maxthreads> <!-- Reduced thread count to minimize memory consumption --> </config> + <config name="container.jdisc.config.health-monitor"> + <initialStatus>initializing</initialStatus> + </config> + <accesslog type="vespa" fileNamePattern="logs/vespa/configserver/access.log.%Y%m%d%H%M%S" rotationScheme="date" symlinkName="access.log" /> <preprocess:include file='access-logging.xml' required='false' /> <component id="com.yahoo.vespa.config.server.ConfigServerBootstrap" bundle="configserver" /> diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java index e0d65055f21..384ae0853d8 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java @@ -6,7 +6,10 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.TenantName; +import com.yahoo.container.jdisc.config.HealthMonitorConfig; +import com.yahoo.container.jdisc.state.StateMonitor; import com.yahoo.io.IOUtils; +import com.yahoo.jdisc.core.SystemTimer; import com.yahoo.vespa.config.server.deploy.MockDeployer; import com.yahoo.vespa.config.server.host.HostRegistries; import com.yahoo.vespa.config.server.http.SessionHandlerTest; @@ -67,7 +70,9 @@ public class ConfigServerBootstrapTest extends TestWithTenant { assertFalse(myServer.stopped); VersionState versionState = new VersionState(versionFile); assertTrue(versionState.isUpgraded()); - ConfigServerBootstrap bootstrap = new ConfigServerBootstrap(applicationRepository, rpc, (application, timeout) -> Optional.empty(), versionState); + ConfigServerBootstrap bootstrap = + new ConfigServerBootstrap(applicationRepository, rpc, (application, timeout) -> Optional.empty(), versionState, + new StateMonitor(new HealthMonitorConfig(new HealthMonitorConfig.Builder()), new SystemTimer())); waitUntilStarted(rpc, 60000); assertFalse(versionState.isUpgraded()); assertThat(versionState.currentVersion(), is(versionState.storedVersion())); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 5fcaee6e590..b0dce359d58 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -103,7 +103,7 @@ public class FileServerTest { } @Override public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { - this.content.complete(fileData.content()); + this.content.complete(fileData.content().array()); } } |