aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-02-01 15:25:45 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-02-16 13:42:49 +0000
commite0195ce27f47717ad5ba59ea59ab027de31d703f (patch)
tree44a13aa38fcbf95b23df91e8051c1d2b8bb2f688 /documentapi/src/main
parent42b1512d4913778dde06ebe0b1a08257ead3155a (diff)
Add new Protobuf-based MessageBus DocumentAPI protocol
This adds an entirely new implementation of the internal MessageBus DocumentAPI protocol, which shall be functionally 1-to-1 compatible with the existing legacy protocol. New protobuf schemas have been added to the top-level documentapi module, which are separated into different domains of responsibility: * CRUD messages * Visiting messages * Data inspection messages As well as a schema for shared, common message types. Both C++ and Java protocol implementations separate serialization and deserialization into a codec abstraction per message type, which hides the boilerplate required for Protobuf buffer management. The Java version is a tad more verbose due to generics type-erasure. This protocol does _not_ currently support lazy (de-)serialization in Java, as the existing mechanisms for doing so are inherently tied to the legacy protocol version. Performance tests will decide if we need to introduce such functionality to the new protocol version. To avoid having the new protocol go live in production, this commit changes the semantics of how MessageBus version reporting works (at least for the near future); instead of reporting the current Vespa _release_ version, it reports the highest supported _protocol_ version. This lets us conditionally enable the new protocol by reporting a MessageBus version greater than or equal to the protocol version _iff_ the protocol should be active. The new protocol is disabled by default. Other changes: * Protocol tests have been moved up one package directory level to be aligned with the actual package of the classes they test. This allows for using package-protected constructors in the serialization tests. * `DocumentDeserializer` now exposes the underlying document type repo/manager. This is done to detangle `Document`/`DocumentUpdate` deserialization from the underlying wire buffer management. * `RemoveLocationMessage` at long last contains a bucket space, which was forgotten when we initially added this concept to the other messages, and where the pain of adding it in later was too big (not so anymore!). Unit tests for both C++ and Java have been hoisted from the legacy test suite, cleaned up and extended with additional cases. The C++ tests use the old unit test kit and should receive a good follow-up washing and GTest-rewrite. **Important**: due to how MessageBus protocol versioning works, the final protocol version is _not_ yet decided, as setting it requires syncing against our build systems. A follow-up commit will assign the final version as well as include all required binary test files.
Diffstat (limited to 'documentapi/src/main')
-rw-r--r--documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java5
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java48
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java14
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java931
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java5
6 files changed, 1000 insertions, 5 deletions
diff --git a/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
new file mode 100644
index 00000000000..4331a3d461e
--- /dev/null
+++ b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
@@ -0,0 +1,5 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package ai.vespa.documentapi.protobuf;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index ec49a0c570f..061d9e9afb9 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -256,9 +256,9 @@ public class DocumentProtocol implements Protocol {
private DocumentProtocol(DocumentTypeManager docMan, String configId,
DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) {
- if (docMan != null)
+ if (docMan != null) {
this.docMan = docMan;
- else {
+ } else {
this.docMan = new DocumentTypeManager();
DocumentTypeManagerConfigurer.configure(this.docMan, configId);
}
@@ -275,6 +275,11 @@ public class DocumentProtocol implements Protocol {
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
+ registerLegacyV6Factories();
+ registerV8Factories();
+ }
+
+ private void registerLegacyV6Factories() {
// Prepare version specifications to use when adding routable factories.
VersionSpecification version6 = new VersionSpecification(6, 221);
@@ -311,11 +316,48 @@ public class DocumentProtocol implements Protocol {
putRoutableFactory(REPLY_REMOVELOCATION, new RoutableFactories60.RemoveLocationReplyFactory(), from6);
putRoutableFactory(REPLY_STATBUCKET, new RoutableFactories60.StatBucketReplyFactory(), from6);
putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
- putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
putRoutableFactory(REPLY_VISITORINFO, new RoutableFactories60.VisitorInfoReplyFactory(), from6);
putRoutableFactory(REPLY_WRONGDISTRIBUTION, new RoutableFactories60.WrongDistributionReplyFactory(), from6);
}
+ private void registerV8Factories() {
+ var version8 = new VersionSpecification(8, 304); // Must be same as in C++ impl
+ var from8 = Collections.singletonList(version8);
+
+ putRoutableFactory(MESSAGE_CREATEVISITOR, RoutableFactories80.createCreateVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DOCUMENTLIST, RoutableFactories80.createDocumentListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETLIST, RoutableFactories80.createGetBucketListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETDOCUMENT, RoutableFactories80.createGetDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_MAPVISITOR, RoutableFactories80.createMapVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_PUTDOCUMENT, RoutableFactories80.createPutDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_QUERYRESULT, RoutableFactories80.createQueryResultMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVELOCATION, RoutableFactories80.createRemoveLocationMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_STATBUCKET, RoutableFactories80.createStatBucketMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_VISITORINFO, RoutableFactories80.createVisitorInfoMessageFactory(), from8);
+ putRoutableFactory(REPLY_CREATEVISITOR, RoutableFactories80.createCreateVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTIGNORED, RoutableFactories80.createDocumentIgnoredReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTLIST, RoutableFactories80.createDocumentListReplyFactory(), from8);
+ putRoutableFactory(REPLY_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETLIST, RoutableFactories80.createGetBucketListReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETDOCUMENT, RoutableFactories80.createGetDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_MAPVISITOR, RoutableFactories80.createMapVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_PUTDOCUMENT, RoutableFactories80.createPutDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_QUERYRESULT, RoutableFactories80.createQueryResultReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVELOCATION, RoutableFactories80.createRemoveLocationReplyFactory(), from8);
+ putRoutableFactory(REPLY_STATBUCKET, RoutableFactories80.createStatBucketReplyFactory(), from8);
+ putRoutableFactory(REPLY_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_VISITORINFO, RoutableFactories80.createVisitorInfoReplyFactory(), from8);
+ putRoutableFactory(REPLY_WRONGDISTRIBUTION, RoutableFactories80.createWrongDistributionReplyFactory(), from8);
+ }
+
/**
* Adds a new routable factory to this protocol. This method is thread-safe, and may be invoked on a protocol object
* that is already in use by a message bus instance. Notice that the name you supply for a factory is the
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
index 727d1e4cd89..10ca1930317 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
@@ -39,6 +39,8 @@ public class DocumentState implements Comparable<DocumentState> {
removeEntry = buf.getByte(null)>0;
}
+ public boolean hasDocId() { return docId != null; }
+
public DocumentId getDocId() {
return docId;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
index 957e65c54e1..25862eb39f3 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
@@ -6,15 +6,17 @@ import com.yahoo.document.select.BucketSelector;
import java.util.Set;
/**
- * Message (VDS only) to remove an entire location for users using n= or g= schemes.
+ * Message to remove an entire location for users using n= or g= schemes.
* We use a document selection so the user can specify a subset of those documents to be deleted
* if they wish.
*/
public class RemoveLocationMessage extends DocumentMessage {
String documentSelection;
BucketId bucketId;
+ private final String bucketSpace;
- public RemoveLocationMessage(String documentSelection) {
+ public RemoveLocationMessage(String documentSelection, String bucketSpace) {
+ this.bucketSpace = bucketSpace;
try {
this.documentSelection = documentSelection;
BucketSelector bucketSel = new BucketSelector(new BucketIdFactory());
@@ -32,6 +34,10 @@ public class RemoveLocationMessage extends DocumentMessage {
}
}
+ public RemoveLocationMessage(String documentSelection) {
+ this(documentSelection, FixedBucketSpaces.defaultSpace());
+ }
+
public String getDocumentSelection() {
return documentSelection;
}
@@ -49,4 +55,8 @@ public class RemoveLocationMessage extends DocumentMessage {
public BucketId getBucketId() {
return bucketId;
}
+
+ public String getBucketSpace() {
+ return bucketSpace;
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
new file mode 100644
index 00000000000..e03a7a05a4b
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
@@ -0,0 +1,931 @@
+package com.yahoo.documentapi.messagebus.protocol;
+
+import ai.vespa.documentapi.protobuf.DocapiCommon;
+import ai.vespa.documentapi.protobuf.DocapiFeed;
+import ai.vespa.documentapi.protobuf.DocapiInspect;
+import ai.vespa.documentapi.protobuf.DocapiVisiting;
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.GlobalId;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.serialization.DocumentDeserializer;
+import com.yahoo.document.serialization.DocumentDeserializerFactory;
+import com.yahoo.document.serialization.DocumentSerializer;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.vdslib.DocumentSummary;
+import com.yahoo.vdslib.SearchResult;
+import com.yahoo.vdslib.VisitorStatistics;
+import com.yahoo.vespa.objects.BufferSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Implementation of MessageBus message request/response serialization built around Protocol Buffers.
+ */
+abstract class RoutableFactories80 {
+
+ private static class ProtobufCodec<DocApiT extends Routable, ProtoT extends AbstractMessage> implements RoutableFactory {
+
+ private final Class<DocApiT> apiClass;
+ private final Function<DocApiT, ProtoT> encoderFn;
+ private final Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodec(Class<DocApiT> apiClass,
+ Function<DocApiT, ProtoT> encoderFn,
+ Function<DocumentDeserializer, DocApiT> decoderFn) {
+ this.apiClass = apiClass;
+ this.encoderFn = encoderFn;
+ this.decoderFn = decoderFn;
+ }
+
+ @Override
+ public boolean encode(Routable obj, DocumentSerializer out) {
+ try {
+ var protoMsg = encoderFn.apply(apiClass.cast(obj));
+ var protoStream = CodedOutputStream.newInstance(out.getBuf().getByteBuffer()); // Not AutoCloseable...
+ try {
+ protoMsg.writeTo(protoStream);
+ } finally {
+ protoStream.flush();
+ }
+ } catch (IOException | UnsupportedOperationException e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Routable decode(DocumentDeserializer in) {
+ return decoderFn.apply(in);
+ }
+ }
+
+ private static class ProtobufCodecBuilder<DocApiT extends Routable, ProtoT extends AbstractMessage> {
+
+ private final Class<DocApiT> apiClass;
+ private final Class<ProtoT> protoClass;
+ private Function<DocApiT, ProtoT> encoderFn;
+ private Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodecBuilder(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ this.apiClass = apiClass;
+ this.protoClass = protoClass;
+ }
+
+ static <DocApiT extends Routable, ProtoT extends AbstractMessage> ProtobufCodecBuilder<DocApiT, ProtoT>
+ of(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ return new ProtobufCodecBuilder<>(apiClass, protoClass);
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> encoder(Function<DocApiT, ProtoT> fn) {
+ if (encoderFn != null) {
+ throw new IllegalArgumentException("Encoder already set");
+ }
+ encoderFn = fn;
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoder(ProtoT messagePrototype, Function<ProtoT, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj));
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoderWithRepo(ProtoT messagePrototype, BiFunction<ProtoT, DocumentTypeManager, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj), buf.getTypeRepo());
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodec<DocApiT, ProtoT> build() {
+ Objects.requireNonNull(encoderFn, "Encoder has not been set");
+ Objects.requireNonNull(decoderFn, "Decoder has not been set");
+ return new ProtobufCodec<>(apiClass, encoderFn, decoderFn);
+ }
+ }
+
+ // Protobuf codec helpers for common types
+
+ private static DocapiCommon.GlobalId toProtoGlobalId(GlobalId gid) {
+ return DocapiCommon.GlobalId.newBuilder().setRawGid(ByteString.copyFrom(gid.getRawId())).build();
+ }
+
+ private static GlobalId fromProtoGlobalId(DocapiCommon.GlobalId gid) {
+ return new GlobalId(gid.getRawGid().toByteArray());
+ }
+
+ private static DocapiCommon.BucketId toProtoBucketId(BucketId id) {
+ return DocapiCommon.BucketId.newBuilder().setRawId(id.getRawId()).build();
+ }
+
+ private static BucketId fromProtoBucketId(DocapiCommon.BucketId id) {
+ return new BucketId(id.getRawId());
+ }
+
+ private static DocapiCommon.DocumentId toProtoDocId(DocumentId id) {
+ return DocapiCommon.DocumentId.newBuilder().setId(id.toString()).build();
+ }
+
+ private static DocumentId fromProtoDocId(DocapiCommon.DocumentId id) {
+ return new DocumentId(id.getId());
+ }
+
+ private static DocapiCommon.FieldSet toProtoFieldSet(String rawFieldSpec) {
+ return DocapiCommon.FieldSet.newBuilder().setSpec(rawFieldSpec).build();
+ }
+
+ private static String fromProtoFieldSet(DocapiCommon.FieldSet fieldSet) {
+ return fieldSet.getSpec();
+ }
+
+ private static ByteBuffer serializeDoc(Document doc) {
+ var buf = new GrowableByteBuffer();
+ doc.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiCommon.Document toProtoDocument(Document doc) {
+ // TODO a lot of copying here... Consider adding Document serialization to OutputStream
+ // so that we can serialize directly into a ByteString.Output instance.
+ return toProtoDocument(serializeDoc(doc));
+ }
+
+ private static DocapiCommon.Document toProtoDocument(ByteBuffer rawDocData) {
+ return DocapiCommon.Document.newBuilder()
+ .setPayload(ByteString.copyFrom(rawDocData))
+ .build();
+ }
+
+ private static Document fromProtoDocument(DocapiCommon.Document protoDoc, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoDoc.getPayload().asReadOnlyByteBuffer()));
+ return Document.createDocument(deserializer);
+ }
+
+ private static Document deserializeDoc(ByteBuffer rawDocData, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(rawDocData));
+ return Document.createDocument(deserializer);
+ }
+
+ private static DocapiFeed.TestAndSetCondition toProtoTasCondition(TestAndSetCondition tasCond) {
+ return DocapiFeed.TestAndSetCondition.newBuilder()
+ .setSelection(tasCond.getSelection())
+ .build();
+ }
+
+ private static TestAndSetCondition fromProtoTasCondition(DocapiFeed.TestAndSetCondition protoTasCond) {
+ // Note: the empty (default) string implies "no condition present"
+ return new TestAndSetCondition(protoTasCond.getSelection());
+ }
+
+ private static ByteBuffer serializeUpdate(DocumentUpdate update) {
+ var buf = new GrowableByteBuffer();
+ update.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiFeed.DocumentUpdate toProtoUpdate(DocumentUpdate update) {
+ // TODO also consider DocumentUpdate serialization directly to OutputStream to avoid unneeded copying
+ return DocapiFeed.DocumentUpdate.newBuilder()
+ .setPayload(ByteString.copyFrom(serializeUpdate(update)))
+ .build();
+ }
+
+ private static DocumentUpdate fromProtoUpdate(DocapiFeed.DocumentUpdate protoUpdate, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoUpdate.getPayload().asReadOnlyByteBuffer()));
+ return new DocumentUpdate(deserializer);
+ }
+
+ private static DocapiCommon.DocumentSelection toProtoDocumentSelection(String rawSelection) {
+ return DocapiCommon.DocumentSelection.newBuilder()
+ .setSelection(rawSelection)
+ .build();
+ }
+
+ private static String fromProtoDocumentSelection(DocapiCommon.DocumentSelection protoSelection) {
+ return protoSelection.getSelection();
+ }
+
+ private static DocapiCommon.BucketSpace toProtoBucketSpace(String spaceName) {
+ return DocapiCommon.BucketSpace.newBuilder()
+ .setName(spaceName)
+ .build();
+ }
+
+ private static String fromProtoBucketSpace(DocapiCommon.BucketSpace protoSpace) {
+ return protoSpace.getName();
+ }
+
+ private static DocapiCommon.ClusterState toProtoClusterState(String stateStr) {
+ return DocapiCommon.ClusterState.newBuilder().setStateString(stateStr).build();
+ }
+
+ private static String fromProtoClusterState(DocapiCommon.ClusterState state) {
+ return state.getStateString();
+ }
+
+ // Message codec implementations
+
+ // ---------------------------------------------
+ // Get request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentMessage.class, DocapiFeed.GetDocumentRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.GetDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .build())
+ .decoder(DocapiFeed.GetDocumentRequest.getDefaultInstance(), (protoMsg) ->
+ new GetDocumentMessage(
+ fromProtoDocId(protoMsg.getDocumentId()),
+ fromProtoFieldSet(protoMsg.getFieldSet())))
+ .build();
+ }
+
+ static RoutableFactory createGetDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentReply.class, DocapiFeed.GetDocumentResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiFeed.GetDocumentResponse.newBuilder()
+ .setLastModified(apiReply.getLastModified());
+ var maybeDoc = apiReply.getDocument();
+ if (maybeDoc != null) {
+ builder.setDocument(toProtoDocument(serializeDoc(maybeDoc)));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.GetDocumentResponse.getDefaultInstance(), (protoReply, repo) -> {
+ GetDocumentReply reply;
+ if (protoReply.hasDocument()) {
+ var doc = fromProtoDocument(protoReply.getDocument(), repo);
+ doc.setLastModified(protoReply.getLastModified());
+ reply = new GetDocumentReply(doc);
+ } else {
+ reply = new GetDocumentReply(null);
+ }
+ reply.setLastModified(protoReply.getLastModified());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Put request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createPutDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(PutDocumentMessage.class, DocapiFeed.PutDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.PutDocumentRequest.newBuilder()
+ .setForceAssignTimestamp(apiMsg.getTimestamp())
+ .setCreateIfMissing(apiMsg.getCreateIfNonExistent())
+ .setDocument(toProtoDocument(apiMsg.getDocumentPut().getDocument()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.PutDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var doc = fromProtoDocument(protoMsg.getDocument(), repo);
+ var msg = new PutDocumentMessage(new DocumentPut(doc));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ msg.setTimestamp(protoMsg.getForceAssignTimestamp());
+ msg.setCreateIfNonExistent(protoMsg.getCreateIfMissing());
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createPutDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WriteDocumentReply.class, DocapiFeed.PutDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.PutDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.PutDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Update request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createUpdateDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentMessage.class, DocapiFeed.UpdateDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.UpdateDocumentRequest.newBuilder()
+ .setUpdate(toProtoUpdate(apiMsg.getDocumentUpdate()))
+ .setExpectedOldTimestamp(apiMsg.getOldTimestamp())
+ .setForceAssignTimestamp(apiMsg.getNewTimestamp());
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.UpdateDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new UpdateDocumentMessage(fromProtoUpdate(protoMsg.getUpdate(), repo));
+ msg.setOldTimestamp(protoMsg.getExpectedOldTimestamp());
+ msg.setNewTimestamp(protoMsg.getForceAssignTimestamp());
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createUpdateDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentReply.class, DocapiFeed.UpdateDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.UpdateDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .setWasFound(apiReply.wasFound())
+ .build())
+ .decoder(DocapiFeed.UpdateDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new UpdateDocumentReply();
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ reply.setWasFound(protoReply.getWasFound());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Remove request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentMessage.class, DocapiFeed.RemoveDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.RemoveDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiFeed.RemoveDocumentRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new RemoveDocumentMessage(fromProtoDocId(protoMsg.getDocumentId()));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createRemoveDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentReply.class, DocapiFeed.RemoveDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.RemoveDocumentResponse.newBuilder()
+ .setWasFound(apiReply.wasFound())
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.RemoveDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new RemoveDocumentReply();
+ reply.setWasFound(protoReply.getWasFound());
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // RemoveLocation request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveLocationMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveLocationMessage.class, DocapiFeed.RemoveLocationRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.RemoveLocationRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiFeed.RemoveLocationRequest.getDefaultInstance(), (protoMsg) ->
+ new RemoveLocationMessage(
+ fromProtoDocumentSelection(protoMsg.getSelection()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createRemoveLocationReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentReply.class, DocapiFeed.RemoveLocationResponse.class)
+ .encoder((apiReply) -> DocapiFeed.RemoveLocationResponse.newBuilder().build())
+ .decoder(DocapiFeed.RemoveLocationResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentReply(DocumentProtocol.REPLY_REMOVELOCATION))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // CreateVisitor request and response
+ // ---------------------------------------------
+
+ private static DocapiVisiting.VisitorParameter toProtoVisitorParameter(String key, byte[] value) {
+ return DocapiVisiting.VisitorParameter.newBuilder()
+ .setKey(key)
+ .setValue(ByteString.copyFrom(value))
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorMessage.class, DocapiVisiting.CreateVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.CreateVisitorRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setVisitorLibraryName(apiMsg.getLibraryName())
+ .setInstanceId(apiMsg.getInstanceId())
+ .setControlDestination(apiMsg.getControlDestination())
+ .setDataDestination(apiMsg.getDataDestination())
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .setMaxPendingReplyCount(apiMsg.getMaxPendingReplyCount())
+ .setFromTimestamp(apiMsg.getFromTimestamp())
+ .setToTimestamp(apiMsg.getToTimestamp())
+ .setVisitTombstones(apiMsg.getVisitRemoves())
+ .setVisitInconsistentBuckets(apiMsg.getVisitInconsistentBuckets())
+ .setMaxBucketsPerVisitor(apiMsg.getMaxBucketsPerVisitor());
+ for (var id : apiMsg.getBuckets()) {
+ builder.addBuckets(toProtoBucketId(id));
+ }
+ for (var entry : apiMsg.getParameters().entrySet()) {
+ builder.addParameters(toProtoVisitorParameter(entry.getKey(), entry.getValue()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new CreateVisitorMessage();
+ msg.setBucketSpace(fromProtoBucketSpace(protoMsg.getBucketSpace()));
+ msg.setLibraryName(protoMsg.getVisitorLibraryName());
+ msg.setInstanceId(protoMsg.getInstanceId());
+ msg.setControlDestination(protoMsg.getControlDestination());
+ msg.setDataDestination(protoMsg.getDataDestination());
+ msg.setDocumentSelection(fromProtoDocumentSelection(protoMsg.getSelection()));
+ msg.setFieldSet(fromProtoFieldSet(protoMsg.getFieldSet()));
+ msg.setMaxPendingReplyCount(protoMsg.getMaxPendingReplyCount());
+ msg.setFromTimestamp(protoMsg.getFromTimestamp());
+ msg.setToTimestamp(protoMsg.getToTimestamp());
+ msg.setVisitRemoves(protoMsg.getVisitTombstones());
+ msg.setVisitInconsistentBuckets(protoMsg.getVisitInconsistentBuckets());
+ msg.setMaxBucketsPerVisitor(protoMsg.getMaxBucketsPerVisitor());
+ for (var protoId : protoMsg.getBucketsList()) {
+ msg.getBuckets().add(fromProtoBucketId(protoId));
+ }
+ for (var protoParam : protoMsg.getParametersList()) {
+ msg.getParameters().put(protoParam.getKey(), protoParam.getValue().toByteArray());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorReply.class, DocapiVisiting.CreateVisitorResponse.class)
+ .encoder((apiReply) -> {
+ var stats = apiReply.getVisitorStatistics();
+ return DocapiVisiting.CreateVisitorResponse.newBuilder()
+ .setLastBucket(toProtoBucketId(apiReply.getLastBucket()))
+ .setStatistics(DocapiVisiting.VisitorStatistics.newBuilder()
+ .setBucketsVisited(stats.getBucketsVisited())
+ .setDocumentsVisited(stats.getDocumentsVisited())
+ .setBytesVisited(stats.getBytesVisited())
+ .setDocumentsReturned(stats.getDocumentsReturned())
+ .setBytesReturned(stats.getBytesReturned())
+ .build())
+ .build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR);
+ reply.setLastBucket(fromProtoBucketId(protoReply.getLastBucket()));
+ var protoVs = protoReply.getStatistics();
+ var vs = new VisitorStatistics();
+ vs.setBucketsVisited(protoVs.getBucketsVisited());
+ vs.setDocumentsVisited(protoVs.getDocumentsVisited());
+ vs.setBytesVisited(protoVs.getBytesVisited());
+ vs.setDocumentsReturned(protoVs.getDocumentsReturned());
+ vs.setBytesReturned(protoVs.getBytesReturned());
+ reply.setVisitorStatistics(vs);
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DestroyVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createDestroyVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DestroyVisitorMessage.class, DocapiVisiting.DestroyVisitorRequest.class)
+ .encoder((apiMsg) ->
+ DocapiVisiting.DestroyVisitorRequest.newBuilder()
+ .setInstanceId(apiMsg.getInstanceId())
+ .build())
+ .decoder(DocapiVisiting.DestroyVisitorRequest.getDefaultInstance(),
+ (protoMsg) -> new DestroyVisitorMessage(protoMsg.getInstanceId()))
+ .build();
+ }
+
+ static RoutableFactory createDestroyVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DestroyVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DestroyVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DestroyVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DESTROYVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // MapVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createMapVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(MapVisitorMessage.class, DocapiVisiting.MapVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.MapVisitorRequest.newBuilder();
+ for (var entry : apiMsg.getData().entrySet()) {
+ // FIXME MapVisitorMessage uses Parameters (i.e. string -> bytes) in C++, but string -> string in Java...
+ // ... but due to this, UTF-8 is effectively enforced anyway. Not that anything actually uses this :I
+ builder.addData(toProtoVisitorParameter(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8)));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.MapVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new MapVisitorMessage();
+ for (var param : protoMsg.getDataList()) {
+ msg.getData().put(param.getKey(), param.getValue().toStringUtf8());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createMapVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.MapVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.MapVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.MapVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_MAPVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // QueryResult request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createQueryResultMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(QueryResultMessage.class, DocapiVisiting.QueryResultRequest.class)
+ .encoder((apiMsg) -> {
+ // Serialization of QueryResultMessages is not implemented in Java (receive only)
+ throw new UnsupportedOperationException("Serialization of QueryResultMessage instances is not supported");
+ })
+ .decoder(DocapiVisiting.QueryResultRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new QueryResultMessage();
+ // Explicitly enforce presence of result/summary fields, as our object is not necessarily
+ // well-defined if these have not been initialized.
+ if (!protoMsg.hasSearchResult() || !protoMsg.hasDocumentSummary()) {
+ throw new IllegalArgumentException("Query result does not have all required fields set");
+ }
+ // We have to use toByteArray() instead of asReadOnlyByteBuffer(), as the deserialization routines
+ // try to fetch the raw arrays, which are considered mutable (causing a ReadOnlyBufferException).
+ msg.setSearchResult(new SearchResult(new BufferSerializer(
+ protoMsg.getSearchResult().getPayload().toByteArray())));
+ msg.setSummary(new DocumentSummary(new BufferSerializer(
+ protoMsg.getDocumentSummary().getPayload().toByteArray())));
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createQueryResultReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.QueryResultResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.QueryResultResponse.newBuilder().build())
+ .decoder(DocapiVisiting.QueryResultResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_QUERYRESULT))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // VisitorInfo request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createVisitorInfoMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorInfoMessage.class, DocapiVisiting.VisitorInfoRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.VisitorInfoRequest.newBuilder()
+ .setErrorMessage(apiMsg.getErrorMessage());
+ for (var id : apiMsg.getFinishedBuckets()) {
+ builder.addFinishedBuckets(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.VisitorInfoRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new VisitorInfoMessage();
+ msg.setErrorMessage(protoMsg.getErrorMessage());
+ for (var protoId : protoMsg.getFinishedBucketsList()) {
+ msg.getFinishedBuckets().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createVisitorInfoReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.VisitorInfoResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.VisitorInfoResponse.newBuilder().build())
+ .decoder(DocapiVisiting.VisitorInfoResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_VISITORINFO))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentList request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentListMessage.class, DocapiVisiting.DocumentListRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.DocumentListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()));
+ for (var doc : apiMsg.getDocuments()) {
+ builder.addEntries(DocapiVisiting.DocumentListRequest.Entry.newBuilder()
+ .setTimestamp(doc.getTimestamp())
+ .setIsTombstone(doc.isRemoveEntry())
+ .setDocument(toProtoDocument(doc.getDocument())));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiVisiting.DocumentListRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new DocumentListMessage();
+ msg.setBucketId(fromProtoBucketId(protoMsg.getBucketId()));
+ for (var entry : protoMsg.getEntriesList()) {
+ msg.getDocuments().add(new DocumentListEntry(
+ fromProtoDocument(entry.getDocument(), repo),
+ entry.getTimestamp(),
+ entry.getIsTombstone()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createDocumentListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DocumentListResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DocumentListResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DocumentListResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // EmptyBuckets request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createEmptyBucketsMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(EmptyBucketsMessage.class, DocapiVisiting.EmptyBucketsRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.EmptyBucketsRequest.newBuilder();
+ for (var id : apiMsg.getBucketIds()) {
+ builder.addBucketIds(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.EmptyBucketsRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new EmptyBucketsMessage();
+ for (var protoId : protoMsg.getBucketIdsList()) {
+ msg.getBucketIds().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createEmptyBucketsReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.EmptyBucketsResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.EmptyBucketsResponse.newBuilder().build())
+ .decoder(DocapiVisiting.EmptyBucketsResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_EMPTYBUCKETS))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketList request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListMessage.class, DocapiInspect.GetBucketListRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .build())
+ .decoder(DocapiInspect.GetBucketListRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketListMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListReply.class, DocapiInspect.GetBucketListResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketListResponse.newBuilder();
+ for (var info : apiReply.getBuckets()) {
+ builder.addBucketInfo(DocapiInspect.BucketInformation.newBuilder()
+ .setBucketId(toProtoBucketId(info.getBucketId()))
+ .setInfo(info.getBucketInformation()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketListResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketListReply();
+ for (var info : protoReply.getBucketInfoList()) {
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(
+ fromProtoBucketId(info.getBucketId()),
+ info.getInfo()));
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketState request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketStateMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateMessage.class, DocapiInspect.GetBucketStateRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketStateRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .build())
+ .decoder(DocapiInspect.GetBucketStateRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketStateMessage(fromProtoBucketId(protoMsg.getBucketId())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketStateReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateReply.class, DocapiInspect.GetBucketStateResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketStateResponse.newBuilder();
+ for (var state : apiReply.getBucketState()) {
+ var stateBuilder = DocapiInspect.DocumentState.newBuilder()
+ .setTimestamp(state.getTimestamp())
+ .setIsTombstone(state.isRemoveEntry());
+ if (state.hasDocId()) {
+ stateBuilder.setDocumentId(toProtoDocId(state.getDocId()));
+ } else {
+ stateBuilder.setGlobalId(toProtoGlobalId(state.getGid()));
+ }
+ builder.addStates(stateBuilder);
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketStateResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketStateReply();
+ for (var state : protoReply.getStatesList()) {
+ if (state.hasDocumentId()) {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoDocId(state.getDocumentId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ } else {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoGlobalId(state.getGlobalId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ }
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // StatBucket request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createStatBucketMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketMessage.class, DocapiInspect.StatBucketRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.StatBucketRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiInspect.StatBucketRequest.getDefaultInstance(), (protoMsg) ->
+ new StatBucketMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace()),
+ fromProtoDocumentSelection(protoMsg.getSelection())))
+ .build();
+ }
+
+ static RoutableFactory createStatBucketReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketReply.class, DocapiInspect.StatBucketResponse.class)
+ .encoder((apiReply) ->
+ DocapiInspect.StatBucketResponse.newBuilder()
+ .setResults(apiReply.getResults())
+ .build())
+ .decoder(DocapiInspect.StatBucketResponse.getDefaultInstance(), (protoReply) ->
+ new StatBucketReply(protoReply.getResults()))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // WrongDistribution response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createWrongDistributionReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WrongDistributionReply.class, DocapiCommon.WrongDistributionResponse.class)
+ .encoder((apiReply) ->
+ DocapiCommon.WrongDistributionResponse.newBuilder()
+ .setClusterState(toProtoClusterState(apiReply.getSystemState()))
+ .build())
+ .decoder(DocapiCommon.WrongDistributionResponse.getDefaultInstance(), (protoReply) ->
+ new WrongDistributionReply(fromProtoClusterState(protoReply.getClusterState())))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentIgnored response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentIgnoredReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentIgnoredReply.class, DocapiCommon.DocumentIgnoredResponse.class)
+ .encoder((apiReply) -> DocapiCommon.DocumentIgnoredResponse.newBuilder().build())
+ .decoder(DocapiCommon.DocumentIgnoredResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentIgnoredReply())
+ .build();
+ }
+
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
index fa53a3f8568..73ce4dbbc68 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
@@ -9,6 +9,11 @@ public class StatBucketReply extends DocumentReply {
super(DocumentProtocol.REPLY_STATBUCKET);
}
+ public StatBucketReply(String results) {
+ super(DocumentProtocol.REPLY_STATBUCKET);
+ this.results = results;
+ }
+
public String getResults() {
return results;
}