summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-12-08 19:33:46 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-12-08 19:33:46 +0100
commit08548b78355a3c4fbe1a82e2e54d03156a108b01 (patch)
tree720d04da136e71f13e289694912b0a65a0236667 /configserver
parentf83c2be974ce171d7d2d32fdf642f5b8bf69fcfa (diff)
Implement the chunked transfer.
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java105
1 files changed, 101 insertions, 4 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 4d50cc16fd9..d83d71966c7 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -41,8 +41,10 @@ import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory;
import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider;
import com.yahoo.vespa.config.server.tenant.TenantListener;
import com.yahoo.vespa.config.server.tenant.Tenants;
+import com.yahoo.vespa.filedistribution.FileReceiver;
import com.yahoo.vespa.filedistribution.FileReferenceData;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -413,9 +415,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
return useRequestVersion;
}
- class FileReceiver implements FileServer.Receiver {
+ class WholeFileReceiver implements FileServer.Receiver {
Target target;
- FileReceiver(Target target) {
+ WholeFileReceiver(Target target) {
this.target = target;
}
@@ -426,7 +428,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@Override
public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
- Request fileBlob = new Request("filedistribution.receiveFile");
+ Request fileBlob = new Request(FileReceiver.RECEIVE_METHOD);
fileBlob.parameters().add(new StringValue(fileData.fileReference().value()));
fileBlob.parameters().add(new StringValue(fileData.filename()));
fileBlob.parameters().add(new StringValue(fileData.type().name()));
@@ -442,9 +444,104 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
}
}
+ class ChunkedFileReceiver implements FileServer.Receiver {
+ Target target;
+ ChunkedFileReceiver(Target target) {
+ this.target = target;
+ }
+
+ @Override
+ public String toString() {
+ return target.toString();
+ }
+
+ @Override
+ public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
+ int session = sendMeta(fileData);
+ sendParts(session, fileData);
+ sendEof(session, fileData, status);
+ Request fileBlob = new Request(FileReceiver.RECEIVE_META_METHOD);
+ fileBlob.parameters().add(new StringValue(fileData.fileReference().value()));
+ fileBlob.parameters().add(new StringValue(fileData.filename()));
+ fileBlob.parameters().add(new StringValue(fileData.type().name()));
+ fileBlob.parameters().add(new Int64Value(fileData.size()));
+ fileBlob.parameters().add(new Int64Value(fileData.xxhash()));
+ fileBlob.parameters().add(new Int32Value(status.getCode()));
+ fileBlob.parameters().add(new StringValue(status.getDescription()));
+ target.invokeSync(fileBlob, 600);
+ if (fileBlob.isError()) {
+ log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
+ target.toString() + " with error: '" + fileBlob.errorMessage() + "'.");
+ }
+ }
+ private void sendParts(int session, FileReferenceData fileData) {
+ ByteBuffer bb = ByteBuffer.allocate(0x100000);
+ for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) {
+ byte [] buf = bb.array();
+ if (buf.length != bb.position()) {
+ buf = new byte [bb.position()];
+ bb.flip();
+ bb.get(buf);
+ }
+ sendPart(session, fileData.fileReference(), partId, buf);
+ bb.clear();
+ }
+ }
+ private int sendMeta(FileReferenceData fileData) {
+ Request request = new Request(FileReceiver.RECEIVE_META_METHOD);
+ request.parameters().add(new StringValue(fileData.fileReference().value()));
+ request.parameters().add(new StringValue(fileData.filename()));
+ request.parameters().add(new StringValue(fileData.type().name()));
+ request.parameters().add(new Int64Value(fileData.size()));
+ target.invokeSync(request, 600);
+ if (request.isError()) {
+ log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
+ target.toString() + " with error: '" + request.errorMessage() + "'.");
+ }
+ int retCode = request.returnValues().get(0).asInt32();
+ if (retCode != 0) {
+ throw new IllegalArgumentException("Unknow error from target '" + target.toString() + "' during rpc call " + request.methodName());
+ }
+ return request.returnValues().get(1).asInt32();
+ }
+ private void sendPart(int session, FileReference ref, int partId, byte [] buf) {
+ Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD);
+ request.parameters().add(new StringValue(ref.value()));
+ request.parameters().add(new Int32Value(session));
+ request.parameters().add(new Int32Value(partId));
+ request.parameters().add(new DataValue(buf));
+ target.invokeSync(request, 600);
+ if (request.isError()) {
+ log.warning("Failed delivering reference '" + ref.value() + "' to " +
+ target.toString() + " with error: '" + request.errorMessage() + "'.");
+ }
+ int retCode = request.returnValues().get(0).asInt32();
+ if (retCode != 0) {
+ throw new IllegalArgumentException("Unknow error from target '" + target.toString() + "' during rpc call " + request.methodName());
+ }
+ }
+ private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) {
+ Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD);
+ request.parameters().add(new StringValue(fileData.fileReference().value()));
+ request.parameters().add(new Int32Value(session));
+ request.parameters().add(new Int64Value(fileData.xxhash()));
+ request.parameters().add(new Int32Value(status.getCode()));
+ request.parameters().add(new StringValue(status.getDescription()));
+ target.invokeSync(request, 600);
+ if (request.isError()) {
+ log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
+ target.toString() + " with error: '" + request.errorMessage() + "'.");
+ }
+ int retCode = request.returnValues().get(0).asInt32();
+ if (retCode != 0) {
+ throw new IllegalArgumentException("Unknow error from target '" + target.toString() + "' during rpc call " + request.methodName());
+ }
+ }
+ }
+
@SuppressWarnings("UnusedDeclaration")
public final void serveFile(Request request) {
request.detach();
- fileServer.serveFile(request, new FileReceiver(request.target()));
+ fileServer.serveFile(request, new WholeFileReceiver(request.target()));
}
}