summaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/main/java')
-rw-r--r--documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java5
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java48
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java14
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java931
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java5
6 files changed, 1000 insertions, 5 deletions
diff --git a/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
new file mode 100644
index 00000000000..4331a3d461e
--- /dev/null
+++ b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
@@ -0,0 +1,5 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package ai.vespa.documentapi.protobuf;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index ec49a0c570f..061d9e9afb9 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -256,9 +256,9 @@ public class DocumentProtocol implements Protocol {
private DocumentProtocol(DocumentTypeManager docMan, String configId,
DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) {
- if (docMan != null)
+ if (docMan != null) {
this.docMan = docMan;
- else {
+ } else {
this.docMan = new DocumentTypeManager();
DocumentTypeManagerConfigurer.configure(this.docMan, configId);
}
@@ -275,6 +275,11 @@ public class DocumentProtocol implements Protocol {
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
+ registerLegacyV6Factories();
+ registerV8Factories();
+ }
+
+ private void registerLegacyV6Factories() {
// Prepare version specifications to use when adding routable factories.
VersionSpecification version6 = new VersionSpecification(6, 221);
@@ -311,11 +316,48 @@ public class DocumentProtocol implements Protocol {
putRoutableFactory(REPLY_REMOVELOCATION, new RoutableFactories60.RemoveLocationReplyFactory(), from6);
putRoutableFactory(REPLY_STATBUCKET, new RoutableFactories60.StatBucketReplyFactory(), from6);
putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
- putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
putRoutableFactory(REPLY_VISITORINFO, new RoutableFactories60.VisitorInfoReplyFactory(), from6);
putRoutableFactory(REPLY_WRONGDISTRIBUTION, new RoutableFactories60.WrongDistributionReplyFactory(), from6);
}
+ private void registerV8Factories() {
+ var version8 = new VersionSpecification(8, 304); // Must be same as in C++ impl
+ var from8 = Collections.singletonList(version8);
+
+ putRoutableFactory(MESSAGE_CREATEVISITOR, RoutableFactories80.createCreateVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DOCUMENTLIST, RoutableFactories80.createDocumentListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETLIST, RoutableFactories80.createGetBucketListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETDOCUMENT, RoutableFactories80.createGetDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_MAPVISITOR, RoutableFactories80.createMapVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_PUTDOCUMENT, RoutableFactories80.createPutDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_QUERYRESULT, RoutableFactories80.createQueryResultMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVELOCATION, RoutableFactories80.createRemoveLocationMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_STATBUCKET, RoutableFactories80.createStatBucketMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_VISITORINFO, RoutableFactories80.createVisitorInfoMessageFactory(), from8);
+ putRoutableFactory(REPLY_CREATEVISITOR, RoutableFactories80.createCreateVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTIGNORED, RoutableFactories80.createDocumentIgnoredReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTLIST, RoutableFactories80.createDocumentListReplyFactory(), from8);
+ putRoutableFactory(REPLY_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETLIST, RoutableFactories80.createGetBucketListReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETDOCUMENT, RoutableFactories80.createGetDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_MAPVISITOR, RoutableFactories80.createMapVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_PUTDOCUMENT, RoutableFactories80.createPutDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_QUERYRESULT, RoutableFactories80.createQueryResultReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVELOCATION, RoutableFactories80.createRemoveLocationReplyFactory(), from8);
+ putRoutableFactory(REPLY_STATBUCKET, RoutableFactories80.createStatBucketReplyFactory(), from8);
+ putRoutableFactory(REPLY_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_VISITORINFO, RoutableFactories80.createVisitorInfoReplyFactory(), from8);
+ putRoutableFactory(REPLY_WRONGDISTRIBUTION, RoutableFactories80.createWrongDistributionReplyFactory(), from8);
+ }
+
/**
* Adds a new routable factory to this protocol. This method is thread-safe, and may be invoked on a protocol object
* that is already in use by a message bus instance. Notice that the name you supply for a factory is the
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
index 727d1e4cd89..10ca1930317 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
@@ -39,6 +39,8 @@ public class DocumentState implements Comparable<DocumentState> {
removeEntry = buf.getByte(null)>0;
}
+ public boolean hasDocId() { return docId != null; }
+
public DocumentId getDocId() {
return docId;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
index 957e65c54e1..25862eb39f3 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
@@ -6,15 +6,17 @@ import com.yahoo.document.select.BucketSelector;
import java.util.Set;
/**
- * Message (VDS only) to remove an entire location for users using n= or g= schemes.
+ * Message to remove an entire location for users using n= or g= schemes.
* We use a document selection so the user can specify a subset of those documents to be deleted
* if they wish.
*/
public class RemoveLocationMessage extends DocumentMessage {
String documentSelection;
BucketId bucketId;
+ private final String bucketSpace;
- public RemoveLocationMessage(String documentSelection) {
+ public RemoveLocationMessage(String documentSelection, String bucketSpace) {
+ this.bucketSpace = bucketSpace;
try {
this.documentSelection = documentSelection;
BucketSelector bucketSel = new BucketSelector(new BucketIdFactory());
@@ -32,6 +34,10 @@ public class RemoveLocationMessage extends DocumentMessage {
}
}
+ public RemoveLocationMessage(String documentSelection) {
+ this(documentSelection, FixedBucketSpaces.defaultSpace());
+ }
+
public String getDocumentSelection() {
return documentSelection;
}
@@ -49,4 +55,8 @@ public class RemoveLocationMessage extends DocumentMessage {
public BucketId getBucketId() {
return bucketId;
}
+
+ public String getBucketSpace() {
+ return bucketSpace;
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
new file mode 100644
index 00000000000..e03a7a05a4b
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
@@ -0,0 +1,931 @@
+package com.yahoo.documentapi.messagebus.protocol;
+
+import ai.vespa.documentapi.protobuf.DocapiCommon;
+import ai.vespa.documentapi.protobuf.DocapiFeed;
+import ai.vespa.documentapi.protobuf.DocapiInspect;
+import ai.vespa.documentapi.protobuf.DocapiVisiting;
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.GlobalId;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.serialization.DocumentDeserializer;
+import com.yahoo.document.serialization.DocumentDeserializerFactory;
+import com.yahoo.document.serialization.DocumentSerializer;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.vdslib.DocumentSummary;
+import com.yahoo.vdslib.SearchResult;
+import com.yahoo.vdslib.VisitorStatistics;
+import com.yahoo.vespa.objects.BufferSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Implementation of MessageBus message request/response serialization built around Protocol Buffers.
+ */
+abstract class RoutableFactories80 {
+
+ private static class ProtobufCodec<DocApiT extends Routable, ProtoT extends AbstractMessage> implements RoutableFactory {
+
+ private final Class<DocApiT> apiClass;
+ private final Function<DocApiT, ProtoT> encoderFn;
+ private final Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodec(Class<DocApiT> apiClass,
+ Function<DocApiT, ProtoT> encoderFn,
+ Function<DocumentDeserializer, DocApiT> decoderFn) {
+ this.apiClass = apiClass;
+ this.encoderFn = encoderFn;
+ this.decoderFn = decoderFn;
+ }
+
+ @Override
+ public boolean encode(Routable obj, DocumentSerializer out) {
+ try {
+ var protoMsg = encoderFn.apply(apiClass.cast(obj));
+ var protoStream = CodedOutputStream.newInstance(out.getBuf().getByteBuffer()); // Not AutoCloseable...
+ try {
+ protoMsg.writeTo(protoStream);
+ } finally {
+ protoStream.flush();
+ }
+ } catch (IOException | UnsupportedOperationException e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Routable decode(DocumentDeserializer in) {
+ return decoderFn.apply(in);
+ }
+ }
+
+ private static class ProtobufCodecBuilder<DocApiT extends Routable, ProtoT extends AbstractMessage> {
+
+ private final Class<DocApiT> apiClass;
+ private final Class<ProtoT> protoClass;
+ private Function<DocApiT, ProtoT> encoderFn;
+ private Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodecBuilder(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ this.apiClass = apiClass;
+ this.protoClass = protoClass;
+ }
+
+ static <DocApiT extends Routable, ProtoT extends AbstractMessage> ProtobufCodecBuilder<DocApiT, ProtoT>
+ of(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ return new ProtobufCodecBuilder<>(apiClass, protoClass);
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> encoder(Function<DocApiT, ProtoT> fn) {
+ if (encoderFn != null) {
+ throw new IllegalArgumentException("Encoder already set");
+ }
+ encoderFn = fn;
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoder(ProtoT messagePrototype, Function<ProtoT, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj));
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoderWithRepo(ProtoT messagePrototype, BiFunction<ProtoT, DocumentTypeManager, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj), buf.getTypeRepo());
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodec<DocApiT, ProtoT> build() {
+ Objects.requireNonNull(encoderFn, "Encoder has not been set");
+ Objects.requireNonNull(decoderFn, "Decoder has not been set");
+ return new ProtobufCodec<>(apiClass, encoderFn, decoderFn);
+ }
+ }
+
+ // Protobuf codec helpers for common types
+
+ private static DocapiCommon.GlobalId toProtoGlobalId(GlobalId gid) {
+ return DocapiCommon.GlobalId.newBuilder().setRawGid(ByteString.copyFrom(gid.getRawId())).build();
+ }
+
+ private static GlobalId fromProtoGlobalId(DocapiCommon.GlobalId gid) {
+ return new GlobalId(gid.getRawGid().toByteArray());
+ }
+
+ private static DocapiCommon.BucketId toProtoBucketId(BucketId id) {
+ return DocapiCommon.BucketId.newBuilder().setRawId(id.getRawId()).build();
+ }
+
+ private static BucketId fromProtoBucketId(DocapiCommon.BucketId id) {
+ return new BucketId(id.getRawId());
+ }
+
+ private static DocapiCommon.DocumentId toProtoDocId(DocumentId id) {
+ return DocapiCommon.DocumentId.newBuilder().setId(id.toString()).build();
+ }
+
+ private static DocumentId fromProtoDocId(DocapiCommon.DocumentId id) {
+ return new DocumentId(id.getId());
+ }
+
+ private static DocapiCommon.FieldSet toProtoFieldSet(String rawFieldSpec) {
+ return DocapiCommon.FieldSet.newBuilder().setSpec(rawFieldSpec).build();
+ }
+
+ private static String fromProtoFieldSet(DocapiCommon.FieldSet fieldSet) {
+ return fieldSet.getSpec();
+ }
+
+ private static ByteBuffer serializeDoc(Document doc) {
+ var buf = new GrowableByteBuffer();
+ doc.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiCommon.Document toProtoDocument(Document doc) {
+ // TODO a lot of copying here... Consider adding Document serialization to OutputStream
+ // so that we can serialize directly into a ByteString.Output instance.
+ return toProtoDocument(serializeDoc(doc));
+ }
+
+ private static DocapiCommon.Document toProtoDocument(ByteBuffer rawDocData) {
+ return DocapiCommon.Document.newBuilder()
+ .setPayload(ByteString.copyFrom(rawDocData))
+ .build();
+ }
+
+ private static Document fromProtoDocument(DocapiCommon.Document protoDoc, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoDoc.getPayload().asReadOnlyByteBuffer()));
+ return Document.createDocument(deserializer);
+ }
+
+ private static Document deserializeDoc(ByteBuffer rawDocData, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(rawDocData));
+ return Document.createDocument(deserializer);
+ }
+
+ private static DocapiFeed.TestAndSetCondition toProtoTasCondition(TestAndSetCondition tasCond) {
+ return DocapiFeed.TestAndSetCondition.newBuilder()
+ .setSelection(tasCond.getSelection())
+ .build();
+ }
+
+ private static TestAndSetCondition fromProtoTasCondition(DocapiFeed.TestAndSetCondition protoTasCond) {
+ // Note: the empty (default) string implies "no condition present"
+ return new TestAndSetCondition(protoTasCond.getSelection());
+ }
+
+ private static ByteBuffer serializeUpdate(DocumentUpdate update) {
+ var buf = new GrowableByteBuffer();
+ update.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiFeed.DocumentUpdate toProtoUpdate(DocumentUpdate update) {
+ // TODO also consider DocumentUpdate serialization directly to OutputStream to avoid unneeded copying
+ return DocapiFeed.DocumentUpdate.newBuilder()
+ .setPayload(ByteString.copyFrom(serializeUpdate(update)))
+ .build();
+ }
+
+ private static DocumentUpdate fromProtoUpdate(DocapiFeed.DocumentUpdate protoUpdate, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoUpdate.getPayload().asReadOnlyByteBuffer()));
+ return new DocumentUpdate(deserializer);
+ }
+
+ private static DocapiCommon.DocumentSelection toProtoDocumentSelection(String rawSelection) {
+ return DocapiCommon.DocumentSelection.newBuilder()
+ .setSelection(rawSelection)
+ .build();
+ }
+
+ private static String fromProtoDocumentSelection(DocapiCommon.DocumentSelection protoSelection) {
+ return protoSelection.getSelection();
+ }
+
+ private static DocapiCommon.BucketSpace toProtoBucketSpace(String spaceName) {
+ return DocapiCommon.BucketSpace.newBuilder()
+ .setName(spaceName)
+ .build();
+ }
+
+ private static String fromProtoBucketSpace(DocapiCommon.BucketSpace protoSpace) {
+ return protoSpace.getName();
+ }
+
+ private static DocapiCommon.ClusterState toProtoClusterState(String stateStr) {
+ return DocapiCommon.ClusterState.newBuilder().setStateString(stateStr).build();
+ }
+
+ private static String fromProtoClusterState(DocapiCommon.ClusterState state) {
+ return state.getStateString();
+ }
+
+ // Message codec implementations
+
+ // ---------------------------------------------
+ // Get request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentMessage.class, DocapiFeed.GetDocumentRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.GetDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .build())
+ .decoder(DocapiFeed.GetDocumentRequest.getDefaultInstance(), (protoMsg) ->
+ new GetDocumentMessage(
+ fromProtoDocId(protoMsg.getDocumentId()),
+ fromProtoFieldSet(protoMsg.getFieldSet())))
+ .build();
+ }
+
+ static RoutableFactory createGetDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentReply.class, DocapiFeed.GetDocumentResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiFeed.GetDocumentResponse.newBuilder()
+ .setLastModified(apiReply.getLastModified());
+ var maybeDoc = apiReply.getDocument();
+ if (maybeDoc != null) {
+ builder.setDocument(toProtoDocument(serializeDoc(maybeDoc)));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.GetDocumentResponse.getDefaultInstance(), (protoReply, repo) -> {
+ GetDocumentReply reply;
+ if (protoReply.hasDocument()) {
+ var doc = fromProtoDocument(protoReply.getDocument(), repo);
+ doc.setLastModified(protoReply.getLastModified());
+ reply = new GetDocumentReply(doc);
+ } else {
+ reply = new GetDocumentReply(null);
+ }
+ reply.setLastModified(protoReply.getLastModified());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Put request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createPutDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(PutDocumentMessage.class, DocapiFeed.PutDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.PutDocumentRequest.newBuilder()
+ .setForceAssignTimestamp(apiMsg.getTimestamp())
+ .setCreateIfMissing(apiMsg.getCreateIfNonExistent())
+ .setDocument(toProtoDocument(apiMsg.getDocumentPut().getDocument()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.PutDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var doc = fromProtoDocument(protoMsg.getDocument(), repo);
+ var msg = new PutDocumentMessage(new DocumentPut(doc));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ msg.setTimestamp(protoMsg.getForceAssignTimestamp());
+ msg.setCreateIfNonExistent(protoMsg.getCreateIfMissing());
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createPutDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WriteDocumentReply.class, DocapiFeed.PutDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.PutDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.PutDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Update request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createUpdateDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentMessage.class, DocapiFeed.UpdateDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.UpdateDocumentRequest.newBuilder()
+ .setUpdate(toProtoUpdate(apiMsg.getDocumentUpdate()))
+ .setExpectedOldTimestamp(apiMsg.getOldTimestamp())
+ .setForceAssignTimestamp(apiMsg.getNewTimestamp());
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.UpdateDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new UpdateDocumentMessage(fromProtoUpdate(protoMsg.getUpdate(), repo));
+ msg.setOldTimestamp(protoMsg.getExpectedOldTimestamp());
+ msg.setNewTimestamp(protoMsg.getForceAssignTimestamp());
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createUpdateDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentReply.class, DocapiFeed.UpdateDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.UpdateDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .setWasFound(apiReply.wasFound())
+ .build())
+ .decoder(DocapiFeed.UpdateDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new UpdateDocumentReply();
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ reply.setWasFound(protoReply.getWasFound());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Remove request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentMessage.class, DocapiFeed.RemoveDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.RemoveDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiFeed.RemoveDocumentRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new RemoveDocumentMessage(fromProtoDocId(protoMsg.getDocumentId()));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createRemoveDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentReply.class, DocapiFeed.RemoveDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.RemoveDocumentResponse.newBuilder()
+ .setWasFound(apiReply.wasFound())
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.RemoveDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new RemoveDocumentReply();
+ reply.setWasFound(protoReply.getWasFound());
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // RemoveLocation request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveLocationMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveLocationMessage.class, DocapiFeed.RemoveLocationRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.RemoveLocationRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiFeed.RemoveLocationRequest.getDefaultInstance(), (protoMsg) ->
+ new RemoveLocationMessage(
+ fromProtoDocumentSelection(protoMsg.getSelection()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createRemoveLocationReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentReply.class, DocapiFeed.RemoveLocationResponse.class)
+ .encoder((apiReply) -> DocapiFeed.RemoveLocationResponse.newBuilder().build())
+ .decoder(DocapiFeed.RemoveLocationResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentReply(DocumentProtocol.REPLY_REMOVELOCATION))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // CreateVisitor request and response
+ // ---------------------------------------------
+
+ private static DocapiVisiting.VisitorParameter toProtoVisitorParameter(String key, byte[] value) {
+ return DocapiVisiting.VisitorParameter.newBuilder()
+ .setKey(key)
+ .setValue(ByteString.copyFrom(value))
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorMessage.class, DocapiVisiting.CreateVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.CreateVisitorRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setVisitorLibraryName(apiMsg.getLibraryName())
+ .setInstanceId(apiMsg.getInstanceId())
+ .setControlDestination(apiMsg.getControlDestination())
+ .setDataDestination(apiMsg.getDataDestination())
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .setMaxPendingReplyCount(apiMsg.getMaxPendingReplyCount())
+ .setFromTimestamp(apiMsg.getFromTimestamp())
+ .setToTimestamp(apiMsg.getToTimestamp())
+ .setVisitTombstones(apiMsg.getVisitRemoves())
+ .setVisitInconsistentBuckets(apiMsg.getVisitInconsistentBuckets())
+ .setMaxBucketsPerVisitor(apiMsg.getMaxBucketsPerVisitor());
+ for (var id : apiMsg.getBuckets()) {
+ builder.addBuckets(toProtoBucketId(id));
+ }
+ for (var entry : apiMsg.getParameters().entrySet()) {
+ builder.addParameters(toProtoVisitorParameter(entry.getKey(), entry.getValue()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new CreateVisitorMessage();
+ msg.setBucketSpace(fromProtoBucketSpace(protoMsg.getBucketSpace()));
+ msg.setLibraryName(protoMsg.getVisitorLibraryName());
+ msg.setInstanceId(protoMsg.getInstanceId());
+ msg.setControlDestination(protoMsg.getControlDestination());
+ msg.setDataDestination(protoMsg.getDataDestination());
+ msg.setDocumentSelection(fromProtoDocumentSelection(protoMsg.getSelection()));
+ msg.setFieldSet(fromProtoFieldSet(protoMsg.getFieldSet()));
+ msg.setMaxPendingReplyCount(protoMsg.getMaxPendingReplyCount());
+ msg.setFromTimestamp(protoMsg.getFromTimestamp());
+ msg.setToTimestamp(protoMsg.getToTimestamp());
+ msg.setVisitRemoves(protoMsg.getVisitTombstones());
+ msg.setVisitInconsistentBuckets(protoMsg.getVisitInconsistentBuckets());
+ msg.setMaxBucketsPerVisitor(protoMsg.getMaxBucketsPerVisitor());
+ for (var protoId : protoMsg.getBucketsList()) {
+ msg.getBuckets().add(fromProtoBucketId(protoId));
+ }
+ for (var protoParam : protoMsg.getParametersList()) {
+ msg.getParameters().put(protoParam.getKey(), protoParam.getValue().toByteArray());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorReply.class, DocapiVisiting.CreateVisitorResponse.class)
+ .encoder((apiReply) -> {
+ var stats = apiReply.getVisitorStatistics();
+ return DocapiVisiting.CreateVisitorResponse.newBuilder()
+ .setLastBucket(toProtoBucketId(apiReply.getLastBucket()))
+ .setStatistics(DocapiVisiting.VisitorStatistics.newBuilder()
+ .setBucketsVisited(stats.getBucketsVisited())
+ .setDocumentsVisited(stats.getDocumentsVisited())
+ .setBytesVisited(stats.getBytesVisited())
+ .setDocumentsReturned(stats.getDocumentsReturned())
+ .setBytesReturned(stats.getBytesReturned())
+ .build())
+ .build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR);
+ reply.setLastBucket(fromProtoBucketId(protoReply.getLastBucket()));
+ var protoVs = protoReply.getStatistics();
+ var vs = new VisitorStatistics();
+ vs.setBucketsVisited(protoVs.getBucketsVisited());
+ vs.setDocumentsVisited(protoVs.getDocumentsVisited());
+ vs.setBytesVisited(protoVs.getBytesVisited());
+ vs.setDocumentsReturned(protoVs.getDocumentsReturned());
+ vs.setBytesReturned(protoVs.getBytesReturned());
+ reply.setVisitorStatistics(vs);
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DestroyVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createDestroyVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DestroyVisitorMessage.class, DocapiVisiting.DestroyVisitorRequest.class)
+ .encoder((apiMsg) ->
+ DocapiVisiting.DestroyVisitorRequest.newBuilder()
+ .setInstanceId(apiMsg.getInstanceId())
+ .build())
+ .decoder(DocapiVisiting.DestroyVisitorRequest.getDefaultInstance(),
+ (protoMsg) -> new DestroyVisitorMessage(protoMsg.getInstanceId()))
+ .build();
+ }
+
+ static RoutableFactory createDestroyVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DestroyVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DestroyVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DestroyVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DESTROYVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // MapVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createMapVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(MapVisitorMessage.class, DocapiVisiting.MapVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.MapVisitorRequest.newBuilder();
+ for (var entry : apiMsg.getData().entrySet()) {
+ // FIXME MapVisitorMessage uses Parameters (i.e. string -> bytes) in C++, but string -> string in Java...
+ // ... but due to this, UTF-8 is effectively enforced anyway. Not that anything actually uses this :I
+ builder.addData(toProtoVisitorParameter(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8)));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.MapVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new MapVisitorMessage();
+ for (var param : protoMsg.getDataList()) {
+ msg.getData().put(param.getKey(), param.getValue().toStringUtf8());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createMapVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.MapVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.MapVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.MapVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_MAPVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // QueryResult request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createQueryResultMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(QueryResultMessage.class, DocapiVisiting.QueryResultRequest.class)
+ .encoder((apiMsg) -> {
+ // Serialization of QueryResultMessages is not implemented in Java (receive only)
+ throw new UnsupportedOperationException("Serialization of QueryResultMessage instances is not supported");
+ })
+ .decoder(DocapiVisiting.QueryResultRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new QueryResultMessage();
+ // Explicitly enforce presence of result/summary fields, as our object is not necessarily
+ // well-defined if these have not been initialized.
+ if (!protoMsg.hasSearchResult() || !protoMsg.hasDocumentSummary()) {
+ throw new IllegalArgumentException("Query result does not have all required fields set");
+ }
+ // We have to use toByteArray() instead of asReadOnlyByteBuffer(), as the deserialization routines
+ // try to fetch the raw arrays, which are considered mutable (causing a ReadOnlyBufferException).
+ msg.setSearchResult(new SearchResult(new BufferSerializer(
+ protoMsg.getSearchResult().getPayload().toByteArray())));
+ msg.setSummary(new DocumentSummary(new BufferSerializer(
+ protoMsg.getDocumentSummary().getPayload().toByteArray())));
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createQueryResultReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.QueryResultResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.QueryResultResponse.newBuilder().build())
+ .decoder(DocapiVisiting.QueryResultResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_QUERYRESULT))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // VisitorInfo request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createVisitorInfoMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorInfoMessage.class, DocapiVisiting.VisitorInfoRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.VisitorInfoRequest.newBuilder()
+ .setErrorMessage(apiMsg.getErrorMessage());
+ for (var id : apiMsg.getFinishedBuckets()) {
+ builder.addFinishedBuckets(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.VisitorInfoRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new VisitorInfoMessage();
+ msg.setErrorMessage(protoMsg.getErrorMessage());
+ for (var protoId : protoMsg.getFinishedBucketsList()) {
+ msg.getFinishedBuckets().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createVisitorInfoReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.VisitorInfoResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.VisitorInfoResponse.newBuilder().build())
+ .decoder(DocapiVisiting.VisitorInfoResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_VISITORINFO))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentList request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentListMessage.class, DocapiVisiting.DocumentListRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.DocumentListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()));
+ for (var doc : apiMsg.getDocuments()) {
+ builder.addEntries(DocapiVisiting.DocumentListRequest.Entry.newBuilder()
+ .setTimestamp(doc.getTimestamp())
+ .setIsTombstone(doc.isRemoveEntry())
+ .setDocument(toProtoDocument(doc.getDocument())));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiVisiting.DocumentListRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new DocumentListMessage();
+ msg.setBucketId(fromProtoBucketId(protoMsg.getBucketId()));
+ for (var entry : protoMsg.getEntriesList()) {
+ msg.getDocuments().add(new DocumentListEntry(
+ fromProtoDocument(entry.getDocument(), repo),
+ entry.getTimestamp(),
+ entry.getIsTombstone()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createDocumentListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DocumentListResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DocumentListResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DocumentListResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // EmptyBuckets request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createEmptyBucketsMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(EmptyBucketsMessage.class, DocapiVisiting.EmptyBucketsRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.EmptyBucketsRequest.newBuilder();
+ for (var id : apiMsg.getBucketIds()) {
+ builder.addBucketIds(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.EmptyBucketsRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new EmptyBucketsMessage();
+ for (var protoId : protoMsg.getBucketIdsList()) {
+ msg.getBucketIds().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createEmptyBucketsReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.EmptyBucketsResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.EmptyBucketsResponse.newBuilder().build())
+ .decoder(DocapiVisiting.EmptyBucketsResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_EMPTYBUCKETS))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketList request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListMessage.class, DocapiInspect.GetBucketListRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .build())
+ .decoder(DocapiInspect.GetBucketListRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketListMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListReply.class, DocapiInspect.GetBucketListResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketListResponse.newBuilder();
+ for (var info : apiReply.getBuckets()) {
+ builder.addBucketInfo(DocapiInspect.BucketInformation.newBuilder()
+ .setBucketId(toProtoBucketId(info.getBucketId()))
+ .setInfo(info.getBucketInformation()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketListResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketListReply();
+ for (var info : protoReply.getBucketInfoList()) {
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(
+ fromProtoBucketId(info.getBucketId()),
+ info.getInfo()));
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketState request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketStateMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateMessage.class, DocapiInspect.GetBucketStateRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketStateRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .build())
+ .decoder(DocapiInspect.GetBucketStateRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketStateMessage(fromProtoBucketId(protoMsg.getBucketId())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketStateReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateReply.class, DocapiInspect.GetBucketStateResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketStateResponse.newBuilder();
+ for (var state : apiReply.getBucketState()) {
+ var stateBuilder = DocapiInspect.DocumentState.newBuilder()
+ .setTimestamp(state.getTimestamp())
+ .setIsTombstone(state.isRemoveEntry());
+ if (state.hasDocId()) {
+ stateBuilder.setDocumentId(toProtoDocId(state.getDocId()));
+ } else {
+ stateBuilder.setGlobalId(toProtoGlobalId(state.getGid()));
+ }
+ builder.addStates(stateBuilder);
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketStateResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketStateReply();
+ for (var state : protoReply.getStatesList()) {
+ if (state.hasDocumentId()) {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoDocId(state.getDocumentId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ } else {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoGlobalId(state.getGlobalId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ }
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // StatBucket request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createStatBucketMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketMessage.class, DocapiInspect.StatBucketRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.StatBucketRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiInspect.StatBucketRequest.getDefaultInstance(), (protoMsg) ->
+ new StatBucketMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace()),
+ fromProtoDocumentSelection(protoMsg.getSelection())))
+ .build();
+ }
+
+ static RoutableFactory createStatBucketReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketReply.class, DocapiInspect.StatBucketResponse.class)
+ .encoder((apiReply) ->
+ DocapiInspect.StatBucketResponse.newBuilder()
+ .setResults(apiReply.getResults())
+ .build())
+ .decoder(DocapiInspect.StatBucketResponse.getDefaultInstance(), (protoReply) ->
+ new StatBucketReply(protoReply.getResults()))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // WrongDistribution response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createWrongDistributionReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WrongDistributionReply.class, DocapiCommon.WrongDistributionResponse.class)
+ .encoder((apiReply) ->
+ DocapiCommon.WrongDistributionResponse.newBuilder()
+ .setClusterState(toProtoClusterState(apiReply.getSystemState()))
+ .build())
+ .decoder(DocapiCommon.WrongDistributionResponse.getDefaultInstance(), (protoReply) ->
+ new WrongDistributionReply(fromProtoClusterState(protoReply.getClusterState())))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentIgnored response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentIgnoredReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentIgnoredReply.class, DocapiCommon.DocumentIgnoredResponse.class)
+ .encoder((apiReply) -> DocapiCommon.DocumentIgnoredResponse.newBuilder().build())
+ .decoder(DocapiCommon.DocumentIgnoredResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentIgnoredReply())
+ .build();
+ }
+
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
index fa53a3f8568..73ce4dbbc68 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
@@ -9,6 +9,11 @@ public class StatBucketReply extends DocumentReply {
super(DocumentProtocol.REPLY_STATBUCKET);
}
+ public StatBucketReply(String results) {
+ super(DocumentProtocol.REPLY_STATBUCKET);
+ this.results = results;
+ }
+
public String getResults() {
return results;
}