summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorJon Marius Venstad <jvenstad@yahoo-inc.com>2017-12-11 13:12:10 +0100
committerJon Marius Venstad <jvenstad@yahoo-inc.com>2017-12-11 13:12:10 +0100
commit6b06b462bf35616ad55a77fa4d88a8ded70a995c (patch)
treec58e81650103d0fb6397b8957d7e312777842658 /configserver
parent95515f4f75ff2a67f72d35cd3c3d938b898d5efe (diff)
parent3a96ac4522946e4b937e09a52772793dff587aa7 (diff)
Need to change new code as well
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java9
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java17
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java109
-rw-r--r--configserver/src/main/resources/configserver-app/services.xml4
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java7
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java2
6 files changed, 129 insertions, 19 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java
index 58c61134bc4..6ab98f5af1c 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigServerBootstrap.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.config.server;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.config.provision.Deployer;
+import com.yahoo.container.jdisc.state.StateMonitor;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.server.rpc.RpcServer;
import com.yahoo.vespa.config.server.version.VersionState;
@@ -23,17 +24,19 @@ public class ConfigServerBootstrap extends AbstractComponent implements Runnable
private final Thread serverThread;
private final Deployer deployer;
private final VersionState versionState;
+ private final StateMonitor stateMonitor;
// The tenants object is injected so that all initial requests handlers are
// added to the rpc server before it starts answering rpc requests.
- @SuppressWarnings("UnusedParameters")
+ @SuppressWarnings("WeakerAccess")
@Inject
public ConfigServerBootstrap(ApplicationRepository applicationRepository, RpcServer server,
- Deployer deployer, VersionState versionState) {
+ Deployer deployer, VersionState versionState, StateMonitor stateMonitor) {
this.applicationRepository = applicationRepository;
this.server = server;
this.deployer = deployer;
this.versionState = versionState;
+ this.stateMonitor = stateMonitor;
this.serverThread = new Thread(this, "configserver main");
serverThread.start();
}
@@ -62,9 +65,11 @@ public class ConfigServerBootstrap extends AbstractComponent implements Runnable
log.log(LogLevel.INFO, "All applications redeployed");
}
versionState.saveNewVersion();
+ stateMonitor.status(StateMonitor.Status.up);
log.log(LogLevel.DEBUG, "Starting RPC server");
server.run();
log.log(LogLevel.DEBUG, "RPC server stopped");
+ stateMonitor.status(StateMonitor.Status.down);
}
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index b1277682849..81f5e62016a 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -20,6 +20,8 @@ import com.yahoo.vespa.config.server.ConfigServerSpec;
import com.yahoo.vespa.filedistribution.CompressedFileReference;
import com.yahoo.vespa.filedistribution.FileDownloader;
import com.yahoo.vespa.filedistribution.FileReferenceData;
+import com.yahoo.vespa.filedistribution.FileReferenceDataBlob;
+import com.yahoo.vespa.filedistribution.LazyFileReferenceData;
import java.io.File;
import java.io.IOException;
@@ -110,7 +112,7 @@ public class FileServer {
log.info("Start serving reference '" + reference.value() + "' with file '" + file.getAbsolutePath() + "'");
boolean success = false;
String errorDescription = "OK";
- FileReferenceData fileData = FileReferenceData.empty(reference, file.getName());
+ FileReferenceData fileData = FileReferenceDataBlob.empty(reference, file.getName());
try {
fileData = readFileReferenceData(reference);
success = true;
@@ -124,21 +126,16 @@ public class FileServer {
log.info("Done serving reference '" + reference.toString() + "' with file '" + file.getAbsolutePath() + "'");
}
-
private FileReferenceData readFileReferenceData(FileReference reference) throws IOException {
File file = root.getFile(reference);
- byte[] blob;
- FileReferenceData.Type type;
if (file.isDirectory()) {
- type = FileReferenceData.Type.compressed;
- blob = CompressedFileReference.compress(file.getParentFile());
+ //TODO Here we should compress to file, but then we have to clean up too. Pending.
+ byte [] blob = CompressedFileReference.compress(file.getParentFile());
+ return new FileReferenceDataBlob(reference, file.getName(), FileReferenceData.Type.compressed, blob);
} else {
- type = FileReferenceData.Type.file;
- blob = IOUtils.readFileBytes(file);
+ return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file);
}
-
- return new FileReferenceData(reference, file.getName(), type, blob);
}
public void serveFile(Request request, Receiver receiver) {
pullExecutor.execute(() -> serveFile(request.parameters().get(0).asString(), request, receiver));
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 17368b48e59..a09087ad8d3 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -41,8 +41,10 @@ import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory;
import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider;
import com.yahoo.vespa.config.server.tenant.TenantListener;
import com.yahoo.vespa.config.server.tenant.Tenants;
+import com.yahoo.vespa.filedistribution.FileReceiver;
import com.yahoo.vespa.filedistribution.FileReferenceData;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -93,6 +95,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
private final FileServer fileServer;
private final ThreadPoolExecutor executorService;
+ private final boolean useChunkedFileTransfer;
private volatile boolean allTenantsLoaded = false;
/**
@@ -118,6 +121,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
this.useRequestVersion = config.useVespaVersionInRequest();
this.hostedVespa = config.hostedVespa();
this.fileServer = fileServer;
+ this.useChunkedFileTransfer = config.usechunkedtransfer();
setUpHandlers();
}
@@ -413,9 +417,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
return useRequestVersion;
}
- class FileReceiver implements FileServer.Receiver {
+ class WholeFileReceiver implements FileServer.Receiver {
Target target;
- FileReceiver(Target target) {
+ WholeFileReceiver(Target target) {
this.target = target;
}
@@ -426,11 +430,11 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@Override
public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
- Request fileBlob = new Request("filedistribution.receiveFile");
+ Request fileBlob = new Request(FileReceiver.RECEIVE_METHOD);
fileBlob.parameters().add(new StringValue(fileData.fileReference().value()));
fileBlob.parameters().add(new StringValue(fileData.filename()));
fileBlob.parameters().add(new StringValue(fileData.type().name()));
- fileBlob.parameters().add(new DataValue(fileData.content()));
+ fileBlob.parameters().add(new DataValue(fileData.content().array()));
fileBlob.parameters().add(new Int64Value(fileData.xxhash()));
fileBlob.parameters().add(new Int32Value(status.getCode()));
fileBlob.parameters().add(new StringValue(status.getDescription()));
@@ -442,9 +446,104 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
}
}
+ class ChunkedFileReceiver implements FileServer.Receiver {
+ Target target;
+ ChunkedFileReceiver(Target target) {
+ this.target = target;
+ }
+
+ @Override
+ public String toString() {
+ return target.toString();
+ }
+
+ @Override
+ public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
+ int session = sendMeta(fileData);
+ sendParts(session, fileData);
+ sendEof(session, fileData, status);
+ Request fileBlob = new Request(FileReceiver.RECEIVE_META_METHOD);
+ fileBlob.parameters().add(new StringValue(fileData.fileReference().value()));
+ fileBlob.parameters().add(new StringValue(fileData.filename()));
+ fileBlob.parameters().add(new StringValue(fileData.type().name()));
+ fileBlob.parameters().add(new Int64Value(fileData.size()));
+ target.invokeSync(fileBlob, 600);
+ if (fileBlob.isError()) {
+ log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
+ target.toString() + " with error: '" + fileBlob.errorMessage() + "'.");
+ }
+ }
+ private void sendParts(int session, FileReferenceData fileData) {
+ ByteBuffer bb = ByteBuffer.allocate(0x100000);
+ for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) {
+ byte [] buf = bb.array();
+ if (buf.length != bb.position()) {
+ buf = new byte [bb.position()];
+ bb.flip();
+ bb.get(buf);
+ }
+ sendPart(session, fileData.fileReference(), partId, buf);
+ bb.clear();
+ }
+ }
+ private int sendMeta(FileReferenceData fileData) {
+ Request request = new Request(FileReceiver.RECEIVE_META_METHOD);
+ request.parameters().add(new StringValue(fileData.fileReference().value()));
+ request.parameters().add(new StringValue(fileData.filename()));
+ request.parameters().add(new StringValue(fileData.type().name()));
+ request.parameters().add(new Int64Value(fileData.size()));
+ target.invokeSync(request, 600);
+ if (request.isError()) {
+ log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
+ target.toString() + " with error: '" + request.errorMessage() + "'.");
+ }
+ int retCode = request.returnValues().get(0).asInt32();
+ if (retCode != 0) {
+ throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName());
+ }
+ return request.returnValues().get(1).asInt32();
+ }
+ private void sendPart(int session, FileReference ref, int partId, byte [] buf) {
+ Request request = new Request(FileReceiver.RECEIVE_PART_METHOD);
+ request.parameters().add(new StringValue(ref.value()));
+ request.parameters().add(new Int32Value(session));
+ request.parameters().add(new Int32Value(partId));
+ request.parameters().add(new DataValue(buf));
+ target.invokeSync(request, 600);
+ if (request.isError()) {
+ log.warning("Failed delivering reference '" + ref.value() + "' to " +
+ target.toString() + " with error: '" + request.errorMessage() + "'.");
+ }
+ int retCode = request.returnValues().get(0).asInt32();
+ if (retCode != 0) {
+ throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName());
+ }
+ }
+ private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) {
+ Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD);
+ request.parameters().add(new StringValue(fileData.fileReference().value()));
+ request.parameters().add(new Int32Value(session));
+ request.parameters().add(new Int64Value(fileData.xxhash()));
+ request.parameters().add(new Int32Value(status.getCode()));
+ request.parameters().add(new StringValue(status.getDescription()));
+ target.invokeSync(request, 600);
+ if (request.isError()) {
+ log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
+ target.toString() + " with error: '" + request.errorMessage() + "'.");
+ }
+ int retCode = request.returnValues().get(0).asInt32();
+ if (retCode != 0) {
+ throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName());
+ }
+ }
+ }
+
@SuppressWarnings("UnusedDeclaration")
public final void serveFile(Request request) {
request.detach();
- fileServer.serveFile(request, new FileReceiver(request.target()));
+ FileServer.Receiver receiver = useChunkedFileTransfer
+ ? new ChunkedFileReceiver(request.target())
+ : new WholeFileReceiver(request.target());
+ fileServer.serveFile(request, receiver);
}
}
diff --git a/configserver/src/main/resources/configserver-app/services.xml b/configserver/src/main/resources/configserver-app/services.xml
index eddba1ee768..fd77fedd789 100644
--- a/configserver/src/main/resources/configserver-app/services.xml
+++ b/configserver/src/main/resources/configserver-app/services.xml
@@ -6,6 +6,10 @@
<maxthreads>100</maxthreads> <!-- Reduced thread count to minimize memory consumption -->
</config>
+ <config name="container.jdisc.config.health-monitor">
+ <initialStatus>initializing</initialStatus>
+ </config>
+
<accesslog type="vespa" fileNamePattern="logs/vespa/configserver/access.log.%Y%m%d%H%M%S" rotationScheme="date" symlinkName="access.log" />
<preprocess:include file='access-logging.xml' required='false' />
<component id="com.yahoo.vespa.config.server.ConfigServerBootstrap" bundle="configserver" />
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java
index e0d65055f21..384ae0853d8 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ConfigServerBootstrapTest.java
@@ -6,7 +6,10 @@ import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.ApplicationName;
import com.yahoo.config.provision.InstanceName;
import com.yahoo.config.provision.TenantName;
+import com.yahoo.container.jdisc.config.HealthMonitorConfig;
+import com.yahoo.container.jdisc.state.StateMonitor;
import com.yahoo.io.IOUtils;
+import com.yahoo.jdisc.core.SystemTimer;
import com.yahoo.vespa.config.server.deploy.MockDeployer;
import com.yahoo.vespa.config.server.host.HostRegistries;
import com.yahoo.vespa.config.server.http.SessionHandlerTest;
@@ -67,7 +70,9 @@ public class ConfigServerBootstrapTest extends TestWithTenant {
assertFalse(myServer.stopped);
VersionState versionState = new VersionState(versionFile);
assertTrue(versionState.isUpgraded());
- ConfigServerBootstrap bootstrap = new ConfigServerBootstrap(applicationRepository, rpc, (application, timeout) -> Optional.empty(), versionState);
+ ConfigServerBootstrap bootstrap =
+ new ConfigServerBootstrap(applicationRepository, rpc, (application, timeout) -> Optional.empty(), versionState,
+ new StateMonitor(new HealthMonitorConfig(new HealthMonitorConfig.Builder()), new SystemTimer()));
waitUntilStarted(rpc, 60000);
assertFalse(versionState.isUpgraded());
assertThat(versionState.currentVersion(), is(versionState.storedVersion()));
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
index 5fcaee6e590..b0dce359d58 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
@@ -103,7 +103,7 @@ public class FileServerTest {
}
@Override
public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
- this.content.complete(fileData.content());
+ this.content.complete(fileData.content().array());
}
}