diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-12-08 19:33:46 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-12-08 19:33:46 +0100 |
commit | 08548b78355a3c4fbe1a82e2e54d03156a108b01 (patch) | |
tree | 720d04da136e71f13e289694912b0a65a0236667 /configserver/src | |
parent | f83c2be974ce171d7d2d32fdf642f5b8bf69fcfa (diff) |
Implement the chunked transfer.
Diffstat (limited to 'configserver/src')
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java | 105 |
1 files changed, 101 insertions, 4 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 4d50cc16fd9..d83d71966c7 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -41,8 +41,10 @@ import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory; import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider; import com.yahoo.vespa.config.server.tenant.TenantListener; import com.yahoo.vespa.config.server.tenant.Tenants; +import com.yahoo.vespa.filedistribution.FileReceiver; import com.yahoo.vespa.filedistribution.FileReferenceData; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -413,9 +415,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return useRequestVersion; } - class FileReceiver implements FileServer.Receiver { + class WholeFileReceiver implements FileServer.Receiver { Target target; - FileReceiver(Target target) { + WholeFileReceiver(Target target) { this.target = target; } @@ -426,7 +428,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { - Request fileBlob = new Request("filedistribution.receiveFile"); + Request fileBlob = new Request(FileReceiver.RECEIVE_METHOD); fileBlob.parameters().add(new StringValue(fileData.fileReference().value())); fileBlob.parameters().add(new StringValue(fileData.filename())); fileBlob.parameters().add(new StringValue(fileData.type().name())); @@ -442,9 +444,104 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } } + class ChunkedFileReceiver implements FileServer.Receiver { + Target target; + ChunkedFileReceiver(Target target) { + this.target = target; + } + + @Override + public String toString() { + return target.toString(); + } + + @Override + public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { + int session = sendMeta(fileData); + sendParts(session, fileData); + sendEof(session, fileData, status); + Request fileBlob = new Request(FileReceiver.RECEIVE_META_METHOD); + fileBlob.parameters().add(new StringValue(fileData.fileReference().value())); + fileBlob.parameters().add(new StringValue(fileData.filename())); + fileBlob.parameters().add(new StringValue(fileData.type().name())); + fileBlob.parameters().add(new Int64Value(fileData.size())); + fileBlob.parameters().add(new Int64Value(fileData.xxhash())); + fileBlob.parameters().add(new Int32Value(status.getCode())); + fileBlob.parameters().add(new StringValue(status.getDescription())); + target.invokeSync(fileBlob, 600); + if (fileBlob.isError()) { + log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + fileBlob.errorMessage() + "'."); + } + } + private void sendParts(int session, FileReferenceData fileData) { + ByteBuffer bb = ByteBuffer.allocate(0x100000); + for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) { + byte [] buf = bb.array(); + if (buf.length != bb.position()) { + buf = new byte [bb.position()]; + bb.flip(); + bb.get(buf); + } + sendPart(session, fileData.fileReference(), partId, buf); + bb.clear(); + } + } + private int sendMeta(FileReferenceData fileData) { + Request request = new Request(FileReceiver.RECEIVE_META_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new StringValue(fileData.filename())); + request.parameters().add(new StringValue(fileData.type().name())); + request.parameters().add(new Int64Value(fileData.size())); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknow error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + return request.returnValues().get(1).asInt32(); + } + private void sendPart(int session, FileReference ref, int partId, byte [] buf) { + Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD); + request.parameters().add(new StringValue(ref.value())); + request.parameters().add(new Int32Value(session)); + request.parameters().add(new Int32Value(partId)); + request.parameters().add(new DataValue(buf)); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering reference '" + ref.value() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknow error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + } + private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) { + Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new Int32Value(session)); + request.parameters().add(new Int64Value(fileData.xxhash())); + request.parameters().add(new Int32Value(status.getCode())); + request.parameters().add(new StringValue(status.getDescription())); + target.invokeSync(request, 600); + if (request.isError()) { + log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + request.errorMessage() + "'."); + } + int retCode = request.returnValues().get(0).asInt32(); + if (retCode != 0) { + throw new IllegalArgumentException("Unknow error from target '" + target.toString() + "' during rpc call " + request.methodName()); + } + } + } + @SuppressWarnings("UnusedDeclaration") public final void serveFile(Request request) { request.detach(); - fileServer.serveFile(request, new FileReceiver(request.target())); + fileServer.serveFile(request, new WholeFileReceiver(request.target())); } } |