summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-dev/pom.xml4
-rw-r--r--container-documentapi/pom.xml7
-rw-r--r--document/abi-spec.json4
-rw-r--r--document/src/main/java/com/yahoo/document/Document.java2
-rw-r--r--document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java3
-rw-r--r--document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java2
-rw-r--r--document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java5
-rw-r--r--documentapi-dependencies/pom.xml4
-rw-r--r--documentapi/abi-spec.json6
-rw-r--r--documentapi/pom.xml4
-rw-r--r--documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java5
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java48
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java14
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java931
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java5
-rw-r--r--documentapi/src/protobuf/docapi_common.proto50
-rw-r--r--documentapi/src/protobuf/docapi_feed.proto71
-rw-r--r--documentapi/src/protobuf/docapi_inspect.proto48
-rw-r--r--documentapi/src/protobuf/docapi_visiting.proto115
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java)31
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java729
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java)5
-rw-r--r--documentapi/src/tests/messages/CMakeLists.txt10
-rw-r--r--documentapi/src/tests/messages/messages60test.cpp19
-rw-r--r--documentapi/src/tests/messages/messages60test.h6
-rw-r--r--documentapi/src/tests/messages/messages80test.cpp908
-rw-r--r--documentapi/src/tests/messages/testbase.cpp39
-rw-r--r--documentapi/src/tests/messages/testbase.h4
-rw-r--r--documentapi/src/vespa/documentapi/CMakeLists.txt2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/.gitignore2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt23
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp134
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.h4
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h17
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h10
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp7
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h8
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/visitor.h14
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp883
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h72
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp30
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routablerepository.h5
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java28
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp21
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h6
-rw-r--r--vdslib/src/vespa/vdslib/container/parameters.cpp6
-rw-r--r--vdslib/src/vespa/vdslib/container/parameters.h5
50 files changed, 4181 insertions, 181 deletions
diff --git a/container-dev/pom.xml b/container-dev/pom.xml
index 8c45a124e26..c3ceb0e8d07 100644
--- a/container-dev/pom.xml
+++ b/container-dev/pom.xml
@@ -104,6 +104,10 @@
<version>${project.version}</version>
<exclusions>
<exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.yahoo.vespa</groupId>
<artifactId>http-utils</artifactId>
</exclusion>
diff --git a/container-documentapi/pom.xml b/container-documentapi/pom.xml
index fbccc1a5184..7e6bd749b4a 100644
--- a/container-documentapi/pom.xml
+++ b/container-documentapi/pom.xml
@@ -37,6 +37,13 @@
<scope>provided</scope>
</dependency>
+ <!-- documentapi needs protobuf runtime, and it's not provided from the container -->
+ <!-- TODO: Remove this when we have a better solution for protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/document/abi-spec.json b/document/abi-spec.json
index 899c107a242..ca06e2547d7 100644
--- a/document/abi-spec.json
+++ b/document/abi-spec.json
@@ -2724,7 +2724,8 @@
"abstract"
],
"methods" : [
- "public abstract com.yahoo.io.GrowableByteBuffer getBuf()"
+ "public abstract com.yahoo.io.GrowableByteBuffer getBuf()",
+ "public abstract com.yahoo.document.DocumentTypeManager getTypeRepo()"
],
"fields" : [ ]
},
@@ -3015,6 +3016,7 @@
],
"methods" : [
"public final com.yahoo.document.DocumentTypeManager getDocumentTypeManager()",
+ "public com.yahoo.document.DocumentTypeManager getTypeRepo()",
"public void read(com.yahoo.document.Document)",
"public void read(com.yahoo.vespa.objects.FieldBase, com.yahoo.document.Document)",
"public void read(com.yahoo.vespa.objects.FieldBase, com.yahoo.document.datatypes.FieldValue)",
diff --git a/document/src/main/java/com/yahoo/document/Document.java b/document/src/main/java/com/yahoo/document/Document.java
index 4bb93426994..294750f40f3 100644
--- a/document/src/main/java/com/yahoo/document/Document.java
+++ b/document/src/main/java/com/yahoo/document/Document.java
@@ -123,6 +123,7 @@ public class Document extends StructuredFieldValue {
}
public int getSerializedSize() throws SerializationException {
+ // TODO shouldn't this be createHead()?
DocumentSerializer data = DocumentSerializerFactory.create6(new GrowableByteBuffer(64 * 1024, 2.0f));
data.write(this);
return data.getBuf().position();
@@ -135,6 +136,7 @@ public class Document extends StructuredFieldValue {
public final int getApproxSize() { return 4096; }
public void serialize(OutputStream out) throws SerializationException {
+ // TODO shouldn't this be createHead()?
DocumentSerializer writer = DocumentSerializerFactory.create6(new GrowableByteBuffer(64 * 1024, 2.0f));
writer.write(this);
GrowableByteBuffer data = writer.getBuf();
diff --git a/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java b/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java
index 59849d88c28..f6e3a75c7b2 100644
--- a/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java
+++ b/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.serialization;
+import com.yahoo.document.DocumentTypeManager;
import com.yahoo.io.GrowableByteBuffer;
/**
@@ -17,5 +18,7 @@ public interface DocumentDeserializer extends DocumentReader, DocumentUpdateRead
*/
GrowableByteBuffer getBuf();
+ DocumentTypeManager getTypeRepo();
+
}
diff --git a/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java b/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java
index 9d52bc4aead..faaad95d5e1 100644
--- a/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java
+++ b/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java
@@ -15,6 +15,6 @@ public interface DocumentSerializer extends DocumentWriter, SpanNodeWriter, Anno
/**
* Returns the underlying buffer used for serialization.
*/
- public GrowableByteBuffer getBuf();
+ GrowableByteBuffer getBuf();
}
diff --git a/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java b/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java
index 5f24a2d8f60..b508f0d2c7c 100644
--- a/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java
+++ b/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java
@@ -91,6 +91,11 @@ public class VespaDocumentDeserializer6 extends BufferSerializer implements Docu
final public DocumentTypeManager getDocumentTypeManager() { return manager; }
+ @Override
+ public DocumentTypeManager getTypeRepo() {
+ return manager;
+ }
+
public void read(Document document) {
read(null, document);
}
diff --git a/documentapi-dependencies/pom.xml b/documentapi-dependencies/pom.xml
index 7fd73017d8b..99b4616ffe7 100644
--- a/documentapi-dependencies/pom.xml
+++ b/documentapi-dependencies/pom.xml
@@ -16,6 +16,10 @@
<dependencies>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>annotations</artifactId>
<version>${project.version}</version>
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 0252da8a4d1..d00c89ae737 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -2078,6 +2078,7 @@
"public void <init>(com.yahoo.document.DocumentId, long, boolean)",
"public void <init>(com.yahoo.document.GlobalId, long, boolean)",
"public void <init>(com.yahoo.vespa.objects.Deserializer)",
+ "public boolean hasDocId()",
"public com.yahoo.document.DocumentId getDocId()",
"public com.yahoo.document.GlobalId getGid()",
"public long getTimestamp()",
@@ -2509,11 +2510,13 @@
"public"
],
"methods" : [
+ "public void <init>(java.lang.String, java.lang.String)",
"public void <init>(java.lang.String)",
"public java.lang.String getDocumentSelection()",
"public com.yahoo.documentapi.messagebus.protocol.DocumentReply createReply()",
"public int getType()",
- "public com.yahoo.document.BucketId getBucketId()"
+ "public com.yahoo.document.BucketId getBucketId()",
+ "public java.lang.String getBucketSpace()"
],
"fields" : [ ]
},
@@ -3072,6 +3075,7 @@
],
"methods" : [
"public void <init>()",
+ "public void <init>(java.lang.String)",
"public java.lang.String getResults()",
"public void setResults(java.lang.String)"
],
diff --git a/documentapi/pom.xml b/documentapi/pom.xml
index 4b026d7f359..9188b803ca7 100644
--- a/documentapi/pom.xml
+++ b/documentapi/pom.xml
@@ -47,6 +47,10 @@
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
<groupId>com.helger.maven</groupId>
<artifactId>ph-javacc-maven-plugin</artifactId>
<executions>
diff --git a/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
new file mode 100644
index 00000000000..4331a3d461e
--- /dev/null
+++ b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
@@ -0,0 +1,5 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package ai.vespa.documentapi.protobuf;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index ec49a0c570f..061d9e9afb9 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -256,9 +256,9 @@ public class DocumentProtocol implements Protocol {
private DocumentProtocol(DocumentTypeManager docMan, String configId,
DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) {
- if (docMan != null)
+ if (docMan != null) {
this.docMan = docMan;
- else {
+ } else {
this.docMan = new DocumentTypeManager();
DocumentTypeManagerConfigurer.configure(this.docMan, configId);
}
@@ -275,6 +275,11 @@ public class DocumentProtocol implements Protocol {
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
+ registerLegacyV6Factories();
+ registerV8Factories();
+ }
+
+ private void registerLegacyV6Factories() {
// Prepare version specifications to use when adding routable factories.
VersionSpecification version6 = new VersionSpecification(6, 221);
@@ -311,11 +316,48 @@ public class DocumentProtocol implements Protocol {
putRoutableFactory(REPLY_REMOVELOCATION, new RoutableFactories60.RemoveLocationReplyFactory(), from6);
putRoutableFactory(REPLY_STATBUCKET, new RoutableFactories60.StatBucketReplyFactory(), from6);
putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
- putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
putRoutableFactory(REPLY_VISITORINFO, new RoutableFactories60.VisitorInfoReplyFactory(), from6);
putRoutableFactory(REPLY_WRONGDISTRIBUTION, new RoutableFactories60.WrongDistributionReplyFactory(), from6);
}
+ private void registerV8Factories() {
+ var version8 = new VersionSpecification(8, 304); // Must be same as in C++ impl
+ var from8 = Collections.singletonList(version8);
+
+ putRoutableFactory(MESSAGE_CREATEVISITOR, RoutableFactories80.createCreateVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DOCUMENTLIST, RoutableFactories80.createDocumentListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETLIST, RoutableFactories80.createGetBucketListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETDOCUMENT, RoutableFactories80.createGetDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_MAPVISITOR, RoutableFactories80.createMapVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_PUTDOCUMENT, RoutableFactories80.createPutDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_QUERYRESULT, RoutableFactories80.createQueryResultMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVELOCATION, RoutableFactories80.createRemoveLocationMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_STATBUCKET, RoutableFactories80.createStatBucketMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_VISITORINFO, RoutableFactories80.createVisitorInfoMessageFactory(), from8);
+ putRoutableFactory(REPLY_CREATEVISITOR, RoutableFactories80.createCreateVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTIGNORED, RoutableFactories80.createDocumentIgnoredReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTLIST, RoutableFactories80.createDocumentListReplyFactory(), from8);
+ putRoutableFactory(REPLY_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETLIST, RoutableFactories80.createGetBucketListReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETDOCUMENT, RoutableFactories80.createGetDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_MAPVISITOR, RoutableFactories80.createMapVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_PUTDOCUMENT, RoutableFactories80.createPutDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_QUERYRESULT, RoutableFactories80.createQueryResultReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVELOCATION, RoutableFactories80.createRemoveLocationReplyFactory(), from8);
+ putRoutableFactory(REPLY_STATBUCKET, RoutableFactories80.createStatBucketReplyFactory(), from8);
+ putRoutableFactory(REPLY_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_VISITORINFO, RoutableFactories80.createVisitorInfoReplyFactory(), from8);
+ putRoutableFactory(REPLY_WRONGDISTRIBUTION, RoutableFactories80.createWrongDistributionReplyFactory(), from8);
+ }
+
/**
* Adds a new routable factory to this protocol. This method is thread-safe, and may be invoked on a protocol object
* that is already in use by a message bus instance. Notice that the name you supply for a factory is the
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
index 727d1e4cd89..10ca1930317 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
@@ -39,6 +39,8 @@ public class DocumentState implements Comparable<DocumentState> {
removeEntry = buf.getByte(null)>0;
}
+ public boolean hasDocId() { return docId != null; }
+
public DocumentId getDocId() {
return docId;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
index 957e65c54e1..25862eb39f3 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
@@ -6,15 +6,17 @@ import com.yahoo.document.select.BucketSelector;
import java.util.Set;
/**
- * Message (VDS only) to remove an entire location for users using n= or g= schemes.
+ * Message to remove an entire location for users using n= or g= schemes.
* We use a document selection so the user can specify a subset of those documents to be deleted
* if they wish.
*/
public class RemoveLocationMessage extends DocumentMessage {
String documentSelection;
BucketId bucketId;
+ private final String bucketSpace;
- public RemoveLocationMessage(String documentSelection) {
+ public RemoveLocationMessage(String documentSelection, String bucketSpace) {
+ this.bucketSpace = bucketSpace;
try {
this.documentSelection = documentSelection;
BucketSelector bucketSel = new BucketSelector(new BucketIdFactory());
@@ -32,6 +34,10 @@ public class RemoveLocationMessage extends DocumentMessage {
}
}
+ public RemoveLocationMessage(String documentSelection) {
+ this(documentSelection, FixedBucketSpaces.defaultSpace());
+ }
+
public String getDocumentSelection() {
return documentSelection;
}
@@ -49,4 +55,8 @@ public class RemoveLocationMessage extends DocumentMessage {
public BucketId getBucketId() {
return bucketId;
}
+
+ public String getBucketSpace() {
+ return bucketSpace;
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
new file mode 100644
index 00000000000..e03a7a05a4b
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
@@ -0,0 +1,931 @@
+package com.yahoo.documentapi.messagebus.protocol;
+
+import ai.vespa.documentapi.protobuf.DocapiCommon;
+import ai.vespa.documentapi.protobuf.DocapiFeed;
+import ai.vespa.documentapi.protobuf.DocapiInspect;
+import ai.vespa.documentapi.protobuf.DocapiVisiting;
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.GlobalId;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.serialization.DocumentDeserializer;
+import com.yahoo.document.serialization.DocumentDeserializerFactory;
+import com.yahoo.document.serialization.DocumentSerializer;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.vdslib.DocumentSummary;
+import com.yahoo.vdslib.SearchResult;
+import com.yahoo.vdslib.VisitorStatistics;
+import com.yahoo.vespa.objects.BufferSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Implementation of MessageBus message request/response serialization built around Protocol Buffers.
+ */
+abstract class RoutableFactories80 {
+
+ private static class ProtobufCodec<DocApiT extends Routable, ProtoT extends AbstractMessage> implements RoutableFactory {
+
+ private final Class<DocApiT> apiClass;
+ private final Function<DocApiT, ProtoT> encoderFn;
+ private final Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodec(Class<DocApiT> apiClass,
+ Function<DocApiT, ProtoT> encoderFn,
+ Function<DocumentDeserializer, DocApiT> decoderFn) {
+ this.apiClass = apiClass;
+ this.encoderFn = encoderFn;
+ this.decoderFn = decoderFn;
+ }
+
+ @Override
+ public boolean encode(Routable obj, DocumentSerializer out) {
+ try {
+ var protoMsg = encoderFn.apply(apiClass.cast(obj));
+ var protoStream = CodedOutputStream.newInstance(out.getBuf().getByteBuffer()); // Not AutoCloseable...
+ try {
+ protoMsg.writeTo(protoStream);
+ } finally {
+ protoStream.flush();
+ }
+ } catch (IOException | UnsupportedOperationException e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Routable decode(DocumentDeserializer in) {
+ return decoderFn.apply(in);
+ }
+ }
+
+ private static class ProtobufCodecBuilder<DocApiT extends Routable, ProtoT extends AbstractMessage> {
+
+ private final Class<DocApiT> apiClass;
+ private final Class<ProtoT> protoClass;
+ private Function<DocApiT, ProtoT> encoderFn;
+ private Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodecBuilder(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ this.apiClass = apiClass;
+ this.protoClass = protoClass;
+ }
+
+ static <DocApiT extends Routable, ProtoT extends AbstractMessage> ProtobufCodecBuilder<DocApiT, ProtoT>
+ of(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ return new ProtobufCodecBuilder<>(apiClass, protoClass);
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> encoder(Function<DocApiT, ProtoT> fn) {
+ if (encoderFn != null) {
+ throw new IllegalArgumentException("Encoder already set");
+ }
+ encoderFn = fn;
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoder(ProtoT messagePrototype, Function<ProtoT, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj));
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoderWithRepo(ProtoT messagePrototype, BiFunction<ProtoT, DocumentTypeManager, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj), buf.getTypeRepo());
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodec<DocApiT, ProtoT> build() {
+ Objects.requireNonNull(encoderFn, "Encoder has not been set");
+ Objects.requireNonNull(decoderFn, "Decoder has not been set");
+ return new ProtobufCodec<>(apiClass, encoderFn, decoderFn);
+ }
+ }
+
+ // Protobuf codec helpers for common types
+
+ private static DocapiCommon.GlobalId toProtoGlobalId(GlobalId gid) {
+ return DocapiCommon.GlobalId.newBuilder().setRawGid(ByteString.copyFrom(gid.getRawId())).build();
+ }
+
+ private static GlobalId fromProtoGlobalId(DocapiCommon.GlobalId gid) {
+ return new GlobalId(gid.getRawGid().toByteArray());
+ }
+
+ private static DocapiCommon.BucketId toProtoBucketId(BucketId id) {
+ return DocapiCommon.BucketId.newBuilder().setRawId(id.getRawId()).build();
+ }
+
+ private static BucketId fromProtoBucketId(DocapiCommon.BucketId id) {
+ return new BucketId(id.getRawId());
+ }
+
+ private static DocapiCommon.DocumentId toProtoDocId(DocumentId id) {
+ return DocapiCommon.DocumentId.newBuilder().setId(id.toString()).build();
+ }
+
+ private static DocumentId fromProtoDocId(DocapiCommon.DocumentId id) {
+ return new DocumentId(id.getId());
+ }
+
+ private static DocapiCommon.FieldSet toProtoFieldSet(String rawFieldSpec) {
+ return DocapiCommon.FieldSet.newBuilder().setSpec(rawFieldSpec).build();
+ }
+
+ private static String fromProtoFieldSet(DocapiCommon.FieldSet fieldSet) {
+ return fieldSet.getSpec();
+ }
+
+ private static ByteBuffer serializeDoc(Document doc) {
+ var buf = new GrowableByteBuffer();
+ doc.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiCommon.Document toProtoDocument(Document doc) {
+ // TODO a lot of copying here... Consider adding Document serialization to OutputStream
+ // so that we can serialize directly into a ByteString.Output instance.
+ return toProtoDocument(serializeDoc(doc));
+ }
+
+ private static DocapiCommon.Document toProtoDocument(ByteBuffer rawDocData) {
+ return DocapiCommon.Document.newBuilder()
+ .setPayload(ByteString.copyFrom(rawDocData))
+ .build();
+ }
+
+ private static Document fromProtoDocument(DocapiCommon.Document protoDoc, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoDoc.getPayload().asReadOnlyByteBuffer()));
+ return Document.createDocument(deserializer);
+ }
+
+ private static Document deserializeDoc(ByteBuffer rawDocData, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(rawDocData));
+ return Document.createDocument(deserializer);
+ }
+
+ private static DocapiFeed.TestAndSetCondition toProtoTasCondition(TestAndSetCondition tasCond) {
+ return DocapiFeed.TestAndSetCondition.newBuilder()
+ .setSelection(tasCond.getSelection())
+ .build();
+ }
+
+ private static TestAndSetCondition fromProtoTasCondition(DocapiFeed.TestAndSetCondition protoTasCond) {
+ // Note: the empty (default) string implies "no condition present"
+ return new TestAndSetCondition(protoTasCond.getSelection());
+ }
+
+ private static ByteBuffer serializeUpdate(DocumentUpdate update) {
+ var buf = new GrowableByteBuffer();
+ update.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiFeed.DocumentUpdate toProtoUpdate(DocumentUpdate update) {
+ // TODO also consider DocumentUpdate serialization directly to OutputStream to avoid unneeded copying
+ return DocapiFeed.DocumentUpdate.newBuilder()
+ .setPayload(ByteString.copyFrom(serializeUpdate(update)))
+ .build();
+ }
+
+ private static DocumentUpdate fromProtoUpdate(DocapiFeed.DocumentUpdate protoUpdate, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoUpdate.getPayload().asReadOnlyByteBuffer()));
+ return new DocumentUpdate(deserializer);
+ }
+
+ private static DocapiCommon.DocumentSelection toProtoDocumentSelection(String rawSelection) {
+ return DocapiCommon.DocumentSelection.newBuilder()
+ .setSelection(rawSelection)
+ .build();
+ }
+
+ private static String fromProtoDocumentSelection(DocapiCommon.DocumentSelection protoSelection) {
+ return protoSelection.getSelection();
+ }
+
+ private static DocapiCommon.BucketSpace toProtoBucketSpace(String spaceName) {
+ return DocapiCommon.BucketSpace.newBuilder()
+ .setName(spaceName)
+ .build();
+ }
+
+ private static String fromProtoBucketSpace(DocapiCommon.BucketSpace protoSpace) {
+ return protoSpace.getName();
+ }
+
+ private static DocapiCommon.ClusterState toProtoClusterState(String stateStr) {
+ return DocapiCommon.ClusterState.newBuilder().setStateString(stateStr).build();
+ }
+
+ private static String fromProtoClusterState(DocapiCommon.ClusterState state) {
+ return state.getStateString();
+ }
+
+ // Message codec implementations
+
+ // ---------------------------------------------
+ // Get request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentMessage.class, DocapiFeed.GetDocumentRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.GetDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .build())
+ .decoder(DocapiFeed.GetDocumentRequest.getDefaultInstance(), (protoMsg) ->
+ new GetDocumentMessage(
+ fromProtoDocId(protoMsg.getDocumentId()),
+ fromProtoFieldSet(protoMsg.getFieldSet())))
+ .build();
+ }
+
+ static RoutableFactory createGetDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentReply.class, DocapiFeed.GetDocumentResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiFeed.GetDocumentResponse.newBuilder()
+ .setLastModified(apiReply.getLastModified());
+ var maybeDoc = apiReply.getDocument();
+ if (maybeDoc != null) {
+ builder.setDocument(toProtoDocument(serializeDoc(maybeDoc)));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.GetDocumentResponse.getDefaultInstance(), (protoReply, repo) -> {
+ GetDocumentReply reply;
+ if (protoReply.hasDocument()) {
+ var doc = fromProtoDocument(protoReply.getDocument(), repo);
+ doc.setLastModified(protoReply.getLastModified());
+ reply = new GetDocumentReply(doc);
+ } else {
+ reply = new GetDocumentReply(null);
+ }
+ reply.setLastModified(protoReply.getLastModified());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Put request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createPutDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(PutDocumentMessage.class, DocapiFeed.PutDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.PutDocumentRequest.newBuilder()
+ .setForceAssignTimestamp(apiMsg.getTimestamp())
+ .setCreateIfMissing(apiMsg.getCreateIfNonExistent())
+ .setDocument(toProtoDocument(apiMsg.getDocumentPut().getDocument()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.PutDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var doc = fromProtoDocument(protoMsg.getDocument(), repo);
+ var msg = new PutDocumentMessage(new DocumentPut(doc));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ msg.setTimestamp(protoMsg.getForceAssignTimestamp());
+ msg.setCreateIfNonExistent(protoMsg.getCreateIfMissing());
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createPutDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WriteDocumentReply.class, DocapiFeed.PutDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.PutDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.PutDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Update request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createUpdateDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentMessage.class, DocapiFeed.UpdateDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.UpdateDocumentRequest.newBuilder()
+ .setUpdate(toProtoUpdate(apiMsg.getDocumentUpdate()))
+ .setExpectedOldTimestamp(apiMsg.getOldTimestamp())
+ .setForceAssignTimestamp(apiMsg.getNewTimestamp());
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.UpdateDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new UpdateDocumentMessage(fromProtoUpdate(protoMsg.getUpdate(), repo));
+ msg.setOldTimestamp(protoMsg.getExpectedOldTimestamp());
+ msg.setNewTimestamp(protoMsg.getForceAssignTimestamp());
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createUpdateDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentReply.class, DocapiFeed.UpdateDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.UpdateDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .setWasFound(apiReply.wasFound())
+ .build())
+ .decoder(DocapiFeed.UpdateDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new UpdateDocumentReply();
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ reply.setWasFound(protoReply.getWasFound());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Remove request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentMessage.class, DocapiFeed.RemoveDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.RemoveDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiFeed.RemoveDocumentRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new RemoveDocumentMessage(fromProtoDocId(protoMsg.getDocumentId()));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createRemoveDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentReply.class, DocapiFeed.RemoveDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.RemoveDocumentResponse.newBuilder()
+ .setWasFound(apiReply.wasFound())
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.RemoveDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new RemoveDocumentReply();
+ reply.setWasFound(protoReply.getWasFound());
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // RemoveLocation request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveLocationMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveLocationMessage.class, DocapiFeed.RemoveLocationRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.RemoveLocationRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiFeed.RemoveLocationRequest.getDefaultInstance(), (protoMsg) ->
+ new RemoveLocationMessage(
+ fromProtoDocumentSelection(protoMsg.getSelection()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createRemoveLocationReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentReply.class, DocapiFeed.RemoveLocationResponse.class)
+ .encoder((apiReply) -> DocapiFeed.RemoveLocationResponse.newBuilder().build())
+ .decoder(DocapiFeed.RemoveLocationResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentReply(DocumentProtocol.REPLY_REMOVELOCATION))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // CreateVisitor request and response
+ // ---------------------------------------------
+
+ private static DocapiVisiting.VisitorParameter toProtoVisitorParameter(String key, byte[] value) {
+ return DocapiVisiting.VisitorParameter.newBuilder()
+ .setKey(key)
+ .setValue(ByteString.copyFrom(value))
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorMessage.class, DocapiVisiting.CreateVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.CreateVisitorRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setVisitorLibraryName(apiMsg.getLibraryName())
+ .setInstanceId(apiMsg.getInstanceId())
+ .setControlDestination(apiMsg.getControlDestination())
+ .setDataDestination(apiMsg.getDataDestination())
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .setMaxPendingReplyCount(apiMsg.getMaxPendingReplyCount())
+ .setFromTimestamp(apiMsg.getFromTimestamp())
+ .setToTimestamp(apiMsg.getToTimestamp())
+ .setVisitTombstones(apiMsg.getVisitRemoves())
+ .setVisitInconsistentBuckets(apiMsg.getVisitInconsistentBuckets())
+ .setMaxBucketsPerVisitor(apiMsg.getMaxBucketsPerVisitor());
+ for (var id : apiMsg.getBuckets()) {
+ builder.addBuckets(toProtoBucketId(id));
+ }
+ for (var entry : apiMsg.getParameters().entrySet()) {
+ builder.addParameters(toProtoVisitorParameter(entry.getKey(), entry.getValue()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new CreateVisitorMessage();
+ msg.setBucketSpace(fromProtoBucketSpace(protoMsg.getBucketSpace()));
+ msg.setLibraryName(protoMsg.getVisitorLibraryName());
+ msg.setInstanceId(protoMsg.getInstanceId());
+ msg.setControlDestination(protoMsg.getControlDestination());
+ msg.setDataDestination(protoMsg.getDataDestination());
+ msg.setDocumentSelection(fromProtoDocumentSelection(protoMsg.getSelection()));
+ msg.setFieldSet(fromProtoFieldSet(protoMsg.getFieldSet()));
+ msg.setMaxPendingReplyCount(protoMsg.getMaxPendingReplyCount());
+ msg.setFromTimestamp(protoMsg.getFromTimestamp());
+ msg.setToTimestamp(protoMsg.getToTimestamp());
+ msg.setVisitRemoves(protoMsg.getVisitTombstones());
+ msg.setVisitInconsistentBuckets(protoMsg.getVisitInconsistentBuckets());
+ msg.setMaxBucketsPerVisitor(protoMsg.getMaxBucketsPerVisitor());
+ for (var protoId : protoMsg.getBucketsList()) {
+ msg.getBuckets().add(fromProtoBucketId(protoId));
+ }
+ for (var protoParam : protoMsg.getParametersList()) {
+ msg.getParameters().put(protoParam.getKey(), protoParam.getValue().toByteArray());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorReply.class, DocapiVisiting.CreateVisitorResponse.class)
+ .encoder((apiReply) -> {
+ var stats = apiReply.getVisitorStatistics();
+ return DocapiVisiting.CreateVisitorResponse.newBuilder()
+ .setLastBucket(toProtoBucketId(apiReply.getLastBucket()))
+ .setStatistics(DocapiVisiting.VisitorStatistics.newBuilder()
+ .setBucketsVisited(stats.getBucketsVisited())
+ .setDocumentsVisited(stats.getDocumentsVisited())
+ .setBytesVisited(stats.getBytesVisited())
+ .setDocumentsReturned(stats.getDocumentsReturned())
+ .setBytesReturned(stats.getBytesReturned())
+ .build())
+ .build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR);
+ reply.setLastBucket(fromProtoBucketId(protoReply.getLastBucket()));
+ var protoVs = protoReply.getStatistics();
+ var vs = new VisitorStatistics();
+ vs.setBucketsVisited(protoVs.getBucketsVisited());
+ vs.setDocumentsVisited(protoVs.getDocumentsVisited());
+ vs.setBytesVisited(protoVs.getBytesVisited());
+ vs.setDocumentsReturned(protoVs.getDocumentsReturned());
+ vs.setBytesReturned(protoVs.getBytesReturned());
+ reply.setVisitorStatistics(vs);
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DestroyVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createDestroyVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DestroyVisitorMessage.class, DocapiVisiting.DestroyVisitorRequest.class)
+ .encoder((apiMsg) ->
+ DocapiVisiting.DestroyVisitorRequest.newBuilder()
+ .setInstanceId(apiMsg.getInstanceId())
+ .build())
+ .decoder(DocapiVisiting.DestroyVisitorRequest.getDefaultInstance(),
+ (protoMsg) -> new DestroyVisitorMessage(protoMsg.getInstanceId()))
+ .build();
+ }
+
+ static RoutableFactory createDestroyVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DestroyVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DestroyVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DestroyVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DESTROYVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // MapVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createMapVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(MapVisitorMessage.class, DocapiVisiting.MapVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.MapVisitorRequest.newBuilder();
+ for (var entry : apiMsg.getData().entrySet()) {
+ // FIXME MapVisitorMessage uses Parameters (i.e. string -> bytes) in C++, but string -> string in Java...
+ // ... but due to this, UTF-8 is effectively enforced anyway. Not that anything actually uses this :I
+ builder.addData(toProtoVisitorParameter(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8)));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.MapVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new MapVisitorMessage();
+ for (var param : protoMsg.getDataList()) {
+ msg.getData().put(param.getKey(), param.getValue().toStringUtf8());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createMapVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.MapVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.MapVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.MapVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_MAPVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // QueryResult request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createQueryResultMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(QueryResultMessage.class, DocapiVisiting.QueryResultRequest.class)
+ .encoder((apiMsg) -> {
+ // Serialization of QueryResultMessages is not implemented in Java (receive only)
+ throw new UnsupportedOperationException("Serialization of QueryResultMessage instances is not supported");
+ })
+ .decoder(DocapiVisiting.QueryResultRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new QueryResultMessage();
+ // Explicitly enforce presence of result/summary fields, as our object is not necessarily
+ // well-defined if these have not been initialized.
+ if (!protoMsg.hasSearchResult() || !protoMsg.hasDocumentSummary()) {
+ throw new IllegalArgumentException("Query result does not have all required fields set");
+ }
+ // We have to use toByteArray() instead of asReadOnlyByteBuffer(), as the deserialization routines
+ // try to fetch the raw arrays, which are considered mutable (causing a ReadOnlyBufferException).
+ msg.setSearchResult(new SearchResult(new BufferSerializer(
+ protoMsg.getSearchResult().getPayload().toByteArray())));
+ msg.setSummary(new DocumentSummary(new BufferSerializer(
+ protoMsg.getDocumentSummary().getPayload().toByteArray())));
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createQueryResultReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.QueryResultResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.QueryResultResponse.newBuilder().build())
+ .decoder(DocapiVisiting.QueryResultResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_QUERYRESULT))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // VisitorInfo request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createVisitorInfoMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorInfoMessage.class, DocapiVisiting.VisitorInfoRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.VisitorInfoRequest.newBuilder()
+ .setErrorMessage(apiMsg.getErrorMessage());
+ for (var id : apiMsg.getFinishedBuckets()) {
+ builder.addFinishedBuckets(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.VisitorInfoRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new VisitorInfoMessage();
+ msg.setErrorMessage(protoMsg.getErrorMessage());
+ for (var protoId : protoMsg.getFinishedBucketsList()) {
+ msg.getFinishedBuckets().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createVisitorInfoReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.VisitorInfoResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.VisitorInfoResponse.newBuilder().build())
+ .decoder(DocapiVisiting.VisitorInfoResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_VISITORINFO))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentList request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentListMessage.class, DocapiVisiting.DocumentListRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.DocumentListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()));
+ for (var doc : apiMsg.getDocuments()) {
+ builder.addEntries(DocapiVisiting.DocumentListRequest.Entry.newBuilder()
+ .setTimestamp(doc.getTimestamp())
+ .setIsTombstone(doc.isRemoveEntry())
+ .setDocument(toProtoDocument(doc.getDocument())));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiVisiting.DocumentListRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new DocumentListMessage();
+ msg.setBucketId(fromProtoBucketId(protoMsg.getBucketId()));
+ for (var entry : protoMsg.getEntriesList()) {
+ msg.getDocuments().add(new DocumentListEntry(
+ fromProtoDocument(entry.getDocument(), repo),
+ entry.getTimestamp(),
+ entry.getIsTombstone()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createDocumentListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DocumentListResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DocumentListResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DocumentListResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // EmptyBuckets request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createEmptyBucketsMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(EmptyBucketsMessage.class, DocapiVisiting.EmptyBucketsRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.EmptyBucketsRequest.newBuilder();
+ for (var id : apiMsg.getBucketIds()) {
+ builder.addBucketIds(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.EmptyBucketsRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new EmptyBucketsMessage();
+ for (var protoId : protoMsg.getBucketIdsList()) {
+ msg.getBucketIds().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createEmptyBucketsReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.EmptyBucketsResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.EmptyBucketsResponse.newBuilder().build())
+ .decoder(DocapiVisiting.EmptyBucketsResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_EMPTYBUCKETS))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketList request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListMessage.class, DocapiInspect.GetBucketListRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .build())
+ .decoder(DocapiInspect.GetBucketListRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketListMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListReply.class, DocapiInspect.GetBucketListResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketListResponse.newBuilder();
+ for (var info : apiReply.getBuckets()) {
+ builder.addBucketInfo(DocapiInspect.BucketInformation.newBuilder()
+ .setBucketId(toProtoBucketId(info.getBucketId()))
+ .setInfo(info.getBucketInformation()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketListResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketListReply();
+ for (var info : protoReply.getBucketInfoList()) {
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(
+ fromProtoBucketId(info.getBucketId()),
+ info.getInfo()));
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketState request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketStateMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateMessage.class, DocapiInspect.GetBucketStateRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketStateRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .build())
+ .decoder(DocapiInspect.GetBucketStateRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketStateMessage(fromProtoBucketId(protoMsg.getBucketId())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketStateReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateReply.class, DocapiInspect.GetBucketStateResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketStateResponse.newBuilder();
+ for (var state : apiReply.getBucketState()) {
+ var stateBuilder = DocapiInspect.DocumentState.newBuilder()
+ .setTimestamp(state.getTimestamp())
+ .setIsTombstone(state.isRemoveEntry());
+ if (state.hasDocId()) {
+ stateBuilder.setDocumentId(toProtoDocId(state.getDocId()));
+ } else {
+ stateBuilder.setGlobalId(toProtoGlobalId(state.getGid()));
+ }
+ builder.addStates(stateBuilder);
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketStateResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketStateReply();
+ for (var state : protoReply.getStatesList()) {
+ if (state.hasDocumentId()) {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoDocId(state.getDocumentId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ } else {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoGlobalId(state.getGlobalId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ }
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // StatBucket request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createStatBucketMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketMessage.class, DocapiInspect.StatBucketRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.StatBucketRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiInspect.StatBucketRequest.getDefaultInstance(), (protoMsg) ->
+ new StatBucketMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace()),
+ fromProtoDocumentSelection(protoMsg.getSelection())))
+ .build();
+ }
+
+ static RoutableFactory createStatBucketReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketReply.class, DocapiInspect.StatBucketResponse.class)
+ .encoder((apiReply) ->
+ DocapiInspect.StatBucketResponse.newBuilder()
+ .setResults(apiReply.getResults())
+ .build())
+ .decoder(DocapiInspect.StatBucketResponse.getDefaultInstance(), (protoReply) ->
+ new StatBucketReply(protoReply.getResults()))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // WrongDistribution response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createWrongDistributionReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WrongDistributionReply.class, DocapiCommon.WrongDistributionResponse.class)
+ .encoder((apiReply) ->
+ DocapiCommon.WrongDistributionResponse.newBuilder()
+ .setClusterState(toProtoClusterState(apiReply.getSystemState()))
+ .build())
+ .decoder(DocapiCommon.WrongDistributionResponse.getDefaultInstance(), (protoReply) ->
+ new WrongDistributionReply(fromProtoClusterState(protoReply.getClusterState())))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentIgnored response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentIgnoredReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentIgnoredReply.class, DocapiCommon.DocumentIgnoredResponse.class)
+ .encoder((apiReply) -> DocapiCommon.DocumentIgnoredResponse.newBuilder().build())
+ .decoder(DocapiCommon.DocumentIgnoredResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentIgnoredReply())
+ .build();
+ }
+
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
index fa53a3f8568..73ce4dbbc68 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
@@ -9,6 +9,11 @@ public class StatBucketReply extends DocumentReply {
super(DocumentProtocol.REPLY_STATBUCKET);
}
+ public StatBucketReply(String results) {
+ super(DocumentProtocol.REPLY_STATBUCKET);
+ this.results = results;
+ }
+
public String getResults() {
return results;
}
diff --git a/documentapi/src/protobuf/docapi_common.proto b/documentapi/src/protobuf/docapi_common.proto
new file mode 100644
index 00000000000..5f4cfeb299c
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_common.proto
@@ -0,0 +1,50 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+message BucketSpace {
+ string name = 1;
+}
+
+message BucketId {
+ fixed64 raw_id = 1;
+}
+
+message Document {
+ bytes payload = 1;
+}
+
+message DocumentId {
+ string id = 1;
+}
+
+message FieldSet {
+ string spec = 1;
+}
+
+message GlobalId {
+ // Shall always be 12 bytes (96 bits)
+ bytes raw_gid = 1;
+}
+
+message DocumentSelection {
+ string selection = 1;
+}
+
+message ClusterState {
+ string state_string = 1;
+}
+
+// Polymorphic response type shared by other responses
+message WrongDistributionResponse {
+ ClusterState cluster_state = 1;
+}
+
+// Polymorphic response type shared by other responses
+message DocumentIgnoredResponse {
+ // empty
+}
diff --git a/documentapi/src/protobuf/docapi_feed.proto b/documentapi/src/protobuf/docapi_feed.proto
new file mode 100644
index 00000000000..8d15fd9a536
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_feed.proto
@@ -0,0 +1,71 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+import "docapi_common.proto";
+
+message TestAndSetCondition {
+ string selection = 1;
+}
+
+message DocumentUpdate {
+ bytes payload = 1;
+}
+
+message GetDocumentRequest {
+ DocumentId document_id = 1;
+ FieldSet field_set = 2;
+}
+
+message GetDocumentResponse {
+ Document document = 1;
+ uint64 last_modified = 2;
+}
+
+message PutDocumentRequest {
+ // Note: document contains embedded document ID
+ Document document = 1;
+ TestAndSetCondition condition = 2;
+ bool create_if_missing = 3;
+ uint64 force_assign_timestamp = 4;
+}
+
+message PutDocumentResponse {
+ uint64 modification_timestamp = 1;
+}
+
+message UpdateDocumentRequest {
+ // Note: update contains embedded document ID
+ DocumentUpdate update = 1;
+ TestAndSetCondition condition = 2;
+ uint64 expected_old_timestamp = 3;
+ uint64 force_assign_timestamp = 4;
+}
+
+message UpdateDocumentResponse {
+ bool was_found = 1;
+ uint64 modification_timestamp = 2;
+}
+
+message RemoveDocumentRequest {
+ DocumentId document_id = 1;
+ TestAndSetCondition condition = 2;
+}
+
+message RemoveDocumentResponse {
+ bool was_found = 1;
+ uint64 modification_timestamp = 2;
+}
+
+message RemoveLocationRequest {
+ DocumentSelection selection = 1;
+ BucketSpace bucket_space = 2;
+}
+
+message RemoveLocationResponse {
+ // empty
+}
diff --git a/documentapi/src/protobuf/docapi_inspect.proto b/documentapi/src/protobuf/docapi_inspect.proto
new file mode 100644
index 00000000000..efdc8062e0a
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_inspect.proto
@@ -0,0 +1,48 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+import "docapi_common.proto";
+
+message GetBucketListRequest {
+ BucketId bucket_id = 1;
+ BucketSpace bucket_space = 2;
+}
+
+message BucketInformation {
+ BucketId bucket_id = 1;
+ string info = 2;
+}
+
+message GetBucketListResponse {
+ repeated BucketInformation bucket_info = 1;
+}
+
+message GetBucketStateRequest {
+ BucketId bucket_id = 1;
+}
+
+message DocumentState {
+ DocumentId document_id = 1;
+ GlobalId global_id = 2;
+ uint64 timestamp = 3;
+ bool is_tombstone = 4;
+}
+
+message GetBucketStateResponse {
+ repeated DocumentState states = 1;
+}
+
+message StatBucketRequest {
+ BucketId bucket_id = 1;
+ DocumentSelection selection = 2;
+ BucketSpace bucket_space = 3;
+}
+
+message StatBucketResponse {
+ string results = 1;
+}
diff --git a/documentapi/src/protobuf/docapi_visiting.proto b/documentapi/src/protobuf/docapi_visiting.proto
new file mode 100644
index 00000000000..ecf71ddab55
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_visiting.proto
@@ -0,0 +1,115 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+import "docapi_common.proto";
+
+message VisitorParameter {
+ string key = 1;
+ bytes value = 2;
+}
+
+message CreateVisitorRequest {
+ string visitor_library_name = 1;
+ string instance_id = 2;
+ string control_destination = 3;
+ string data_destination = 4;
+ DocumentSelection selection = 5;
+ uint32 max_pending_reply_count = 6;
+ BucketSpace bucket_space = 7;
+ repeated BucketId buckets = 8;
+ uint64 from_timestamp = 9;
+ uint64 to_timestamp = 10;
+ bool visit_tombstones = 11;
+ FieldSet field_set = 12;
+ bool visit_inconsistent_buckets = 13;
+ uint32 max_buckets_per_visitor = 14;
+ repeated VisitorParameter parameters = 15;
+}
+
+message VisitorStatistics {
+ uint32 buckets_visited = 1;
+ uint64 documents_visited = 2;
+ uint64 bytes_visited = 3;
+ uint64 documents_returned = 4;
+ uint64 bytes_returned = 5;
+}
+
+message CreateVisitorResponse {
+ BucketId last_bucket = 1;
+ VisitorStatistics statistics = 2;
+}
+
+message DestroyVisitorRequest {
+ string instance_id = 1;
+}
+
+message DestroyVisitorResponse {
+ // empty
+}
+
+message VisitorInfoRequest {
+ repeated BucketId finished_buckets = 1;
+ string error_message = 2;
+}
+
+message VisitorInfoResponse {
+ // empty
+}
+
+message MapVisitorRequest {
+ repeated VisitorParameter data = 1;
+}
+
+message MapVisitorResponse {
+ // empty
+}
+
+message SearchResult {
+ bytes payload = 1;
+}
+
+message DocumentSummary {
+ bytes payload = 1;
+}
+
+// We consider streaming search query-related messages to be part of the visiting family
+message QueryResultRequest {
+ SearchResult search_result = 1;
+ DocumentSummary document_summary = 2;
+}
+
+message QueryResultResponse {
+ // empty
+}
+
+// TODO deprecate, only used by "recovery visitor" (?!)
+message DocumentListRequest {
+ message Entry {
+ Document document = 1;
+ uint64 timestamp = 2;
+ bool is_tombstone = 3;
+ }
+
+ BucketId bucket_id = 1;
+ repeated Entry entries = 2;
+}
+
+// TODO deprecate
+message DocumentListResponse {
+ // TODO
+}
+
+// TODO deprecate, not sent by backend
+message EmptyBucketsRequest {
+ repeated BucketId bucket_ids = 1;
+}
+
+// TODO deprecate, not sent by backend
+message EmptyBucketsResponse {
+ // empty
+}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java
index 81904837632..42f200a0b6b 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java
@@ -1,5 +1,5 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol.test;
+package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.component.Version;
import com.yahoo.document.BucketId;
@@ -12,35 +12,6 @@ import com.yahoo.document.GlobalId;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.fieldpathupdate.RemoveFieldPathUpdate;
import com.yahoo.document.idstring.IdString;
-import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage;
-import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply;
-import com.yahoo.documentapi.messagebus.protocol.DestroyVisitorMessage;
-import com.yahoo.documentapi.messagebus.protocol.DocumentIgnoredReply;
-import com.yahoo.documentapi.messagebus.protocol.DocumentListMessage;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.documentapi.messagebus.protocol.DocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.DocumentState;
-import com.yahoo.documentapi.messagebus.protocol.EmptyBucketsMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketListMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketListReply;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketStateMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketStateReply;
-import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.MapVisitorMessage;
-import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.QueryResultMessage;
-import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.RemoveLocationMessage;
-import com.yahoo.documentapi.messagebus.protocol.StatBucketMessage;
-import com.yahoo.documentapi.messagebus.protocol.StatBucketReply;
-import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage;
-import com.yahoo.documentapi.messagebus.protocol.VisitorReply;
-import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import com.yahoo.messagebus.Routable;
import com.yahoo.text.Utf8;
import com.yahoo.vdslib.SearchResult;
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java
new file mode 100644
index 00000000000..f2c039af6c0
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java
@@ -0,0 +1,729 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.component.Version;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.GlobalId;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.fieldpathupdate.RemoveFieldPathUpdate;
+import com.yahoo.document.idstring.IdString;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.text.Utf8;
+import com.yahoo.vdslib.SearchResult;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class Messages80TestCase extends MessagesTestBase {
+
+ @Override
+ protected void registerTests(Map<Integer, RunnableTest> out) {
+ out.put(DocumentProtocol.MESSAGE_CREATEVISITOR, new CreateVisitorMessageTest());
+ out.put(DocumentProtocol.MESSAGE_DESTROYVISITOR, new DestroyVisitorMessageTest());
+ out.put(DocumentProtocol.MESSAGE_DOCUMENTLIST, new DocumentListMessageTest());
+ out.put(DocumentProtocol.MESSAGE_EMPTYBUCKETS, new EmptyBucketsMessageTest());
+ out.put(DocumentProtocol.MESSAGE_GETBUCKETLIST, new GetBucketListMessageTest());
+ out.put(DocumentProtocol.MESSAGE_GETBUCKETSTATE, new GetBucketStateMessageTest());
+ out.put(DocumentProtocol.MESSAGE_GETDOCUMENT, new GetDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_MAPVISITOR, new MapVisitorMessageTest());
+ out.put(DocumentProtocol.MESSAGE_PUTDOCUMENT, new PutDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_QUERYRESULT, new QueryResultMessageTest());
+ out.put(DocumentProtocol.MESSAGE_REMOVEDOCUMENT, new RemoveDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_REMOVELOCATION, new RemoveLocationMessageTest());
+ out.put(DocumentProtocol.MESSAGE_STATBUCKET, new StatBucketMessageTest());
+ out.put(DocumentProtocol.MESSAGE_UPDATEDOCUMENT, new UpdateDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_VISITORINFO, new VisitorInfoMessageTest());
+ out.put(DocumentProtocol.REPLY_CREATEVISITOR, new CreateVisitorReplyTest());
+ out.put(DocumentProtocol.REPLY_DESTROYVISITOR, new DestroyVisitorReplyTest());
+ out.put(DocumentProtocol.REPLY_DOCUMENTIGNORED, new DocumentIgnoredReplyTest());
+ out.put(DocumentProtocol.REPLY_DOCUMENTLIST, new DocumentListReplyTest());
+ out.put(DocumentProtocol.REPLY_EMPTYBUCKETS, new EmptyBucketsReplyTest());
+ out.put(DocumentProtocol.REPLY_GETBUCKETLIST, new GetBucketListReplyTest());
+ out.put(DocumentProtocol.REPLY_GETBUCKETSTATE, new GetBucketStateReplyTest());
+ out.put(DocumentProtocol.REPLY_GETDOCUMENT, new GetDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_MAPVISITOR, new MapVisitorReplyTest());
+ out.put(DocumentProtocol.REPLY_PUTDOCUMENT, new PutDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_QUERYRESULT, new QueryResultReplyTest());
+ out.put(DocumentProtocol.REPLY_REMOVEDOCUMENT, new RemoveDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_REMOVELOCATION, new RemoveLocationReplyTest());
+ out.put(DocumentProtocol.REPLY_STATBUCKET, new StatBucketReplyTest());
+ out.put(DocumentProtocol.REPLY_UPDATEDOCUMENT, new UpdateDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_VISITORINFO, new VisitorInfoReplyTest());
+ out.put(DocumentProtocol.REPLY_WRONGDISTRIBUTION, new WrongDistributionReplyTest());
+ }
+
+ @Override
+ protected Version version() {
+ return new Version(8, 305);
+ }
+
+ @Override
+ protected boolean shouldTestCoverage() {
+ return true;
+ }
+
+ private static void forEachLanguage(Consumer<Language> fun) {
+ for (var lang : MessagesTestBase.LANGUAGES) {
+ fun.accept(lang);
+ }
+ }
+
+ class GetDocumentMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new GetDocumentMessage(new DocumentId("id:ns:testdoc::"), "foo bar");
+ serialize("GetDocumentMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (GetDocumentMessage)deserialize("GetDocumentMessage", DocumentProtocol.MESSAGE_GETDOCUMENT, lang);
+ assertEquals("id:ns:testdoc::", msg2.getDocumentId().toString());
+ assertEquals("foo bar", msg2.getFieldSet());
+ });
+ }
+ }
+
+ class GetDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testDocumentReturnedCase();
+ testEmptyCase();
+ }
+
+ void testDocumentReturnedCase() {
+ var reply = new GetDocumentReply(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::"));
+ reply.setLastModified(1234567L);
+ serialize("GetDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (GetDocumentReply)deserialize("GetDocumentReply", DocumentProtocol.REPLY_GETDOCUMENT, lang);
+ assertEquals(1234567L, reply2.getLastModified());
+ var doc = reply2.getDocument();
+ assertNotNull(doc);
+ assertEquals("testdoc", doc.getDataType().getName());
+ assertEquals("id:ns:testdoc::", doc.getId().toString());
+ assertNotNull(doc.getLastModified());
+ assertEquals(1234567L, doc.getLastModified().longValue());
+ });
+ }
+
+ void testEmptyCase() {
+ var reply = new GetDocumentReply(null);
+ serialize("GetDocumentReply-empty", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (GetDocumentReply)deserialize("GetDocumentReply-empty", DocumentProtocol.REPLY_GETDOCUMENT, lang);
+ assertEquals(0L, reply2.getLastModified());
+ assertNull(reply2.getDocument());
+ });
+ }
+ }
+
+ private static final String CONDITION_STRING = "There's just one condition";
+
+ class PutDocumentMessageTest implements RunnableTest {
+
+ void verifyCreateIfNonExistentFlag() {
+ var msg = new PutDocumentMessage(new DocumentPut(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::")));
+ msg.setCreateIfNonExistent(true);
+ serialize("PutDocumentMessage-create", msg);
+ forEachLanguage((lang) -> {
+ var decoded = (PutDocumentMessage)deserialize("PutDocumentMessage-create", DocumentProtocol.MESSAGE_PUTDOCUMENT, lang);
+ assertTrue(decoded.getCreateIfNonExistent());
+ assertEquals(decoded.getDocumentPut(), decoded.getDocumentPut());
+ });
+ }
+
+ @Override
+ public void run() {
+ var msg = new PutDocumentMessage(new DocumentPut(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::")));
+ msg.setTimestamp(666);
+ msg.setCondition(new TestAndSetCondition(CONDITION_STRING));
+ serialize("PutDocumentMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var deserializedMsg = (PutDocumentMessage)deserialize("PutDocumentMessage", DocumentProtocol.MESSAGE_PUTDOCUMENT, lang);
+ var deserializedDoc = deserializedMsg.getDocumentPut().getDocument();
+ assertNotNull(deserializedDoc);
+ assertEquals(msg.getDocumentPut().getDocument().getDataType().getName(), deserializedDoc.getDataType().getName());
+ assertEquals(msg.getDocumentPut().getDocument().getId().toString(), deserializedDoc.getId().toString());
+ assertEquals(msg.getTimestamp(), deserializedMsg.getTimestamp());
+ assertEquals(msg.getCondition().getSelection(), deserializedMsg.getCondition().getSelection());
+ assertFalse(deserializedMsg.getCreateIfNonExistent());
+ });
+ verifyCreateIfNonExistentFlag();
+ }
+ }
+
+ class PutDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(30);
+ serialize("PutDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var obj = (WriteDocumentReply)deserialize("PutDocumentReply", DocumentProtocol.REPLY_PUTDOCUMENT, lang);
+ assertEquals(30, obj.getHighestModificationTimestamp());
+ });
+ }
+ }
+
+ class UpdateDocumentMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var docType = protocol.getDocumentTypeManager().getDocumentType("testdoc");
+ var update = new DocumentUpdate(docType, new DocumentId("id:ns:testdoc::"));
+ update.addFieldPathUpdate(new RemoveFieldPathUpdate(docType, "intfield", "testdoc.intfield > 0"));
+
+ var msg = new UpdateDocumentMessage(update);
+ msg.setNewTimestamp(777);
+ msg.setOldTimestamp(666);
+ msg.setCondition(new TestAndSetCondition(CONDITION_STRING));
+
+ serialize("UpdateDocumentMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var deserializedMsg = (UpdateDocumentMessage)deserialize("UpdateDocumentMessage", DocumentProtocol.MESSAGE_UPDATEDOCUMENT, lang);
+ assertEquals(msg.getDocumentUpdate(), deserializedMsg.getDocumentUpdate());
+ assertEquals(msg.getNewTimestamp(), deserializedMsg.getNewTimestamp());
+ assertEquals(msg.getOldTimestamp(), deserializedMsg.getOldTimestamp());
+ assertEquals(msg.getCondition().getSelection(), deserializedMsg.getCondition().getSelection());
+ });
+ }
+ }
+
+ class UpdateDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new UpdateDocumentReply();
+ reply.setHighestModificationTimestamp(30);
+ reply.setWasFound(true);
+ serialize("UpdateDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var obj = (UpdateDocumentReply)deserialize("UpdateDocumentReply", DocumentProtocol.REPLY_UPDATEDOCUMENT, lang);
+ assertNotNull(obj);
+ assertEquals(30, reply.getHighestModificationTimestamp());
+ assertTrue(obj.wasFound());
+ });
+ }
+ }
+
+ class RemoveDocumentMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new RemoveDocumentMessage(new DocumentId("id:ns:testdoc::"));
+ msg.setCondition(new TestAndSetCondition(CONDITION_STRING));
+ serialize("RemoveDocumentMessage", msg);
+ forEachLanguage((lang) -> {
+ var deserializedMsg = (RemoveDocumentMessage)deserialize("RemoveDocumentMessage", DocumentProtocol.MESSAGE_REMOVEDOCUMENT, lang);
+ assertEquals(msg.getDocumentId().toString(), deserializedMsg.getDocumentId().toString());
+ assertEquals(msg.getCondition(), deserializedMsg.getCondition());
+ });
+ }
+ }
+
+ class RemoveDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new RemoveDocumentReply();
+ reply.setHighestModificationTimestamp(30);
+ reply.setWasFound(true);
+ serialize("RemoveDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var obj = (RemoveDocumentReply)deserialize("RemoveDocumentReply", DocumentProtocol.REPLY_REMOVEDOCUMENT, lang);
+ assertNotNull(obj);
+ assertEquals(30, obj.getHighestModificationTimestamp());
+ assertTrue(obj.wasFound());
+ });
+ }
+ }
+
+ class RemoveLocationMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new RemoveLocationMessage("id.group == \"mygroup\"", "bjarne");
+ serialize("RemoveLocationMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (RemoveLocationMessage)deserialize("RemoveLocationMessage", DocumentProtocol.MESSAGE_REMOVELOCATION, lang);
+ assertEquals("id.group == \"mygroup\"", msg2.getDocumentSelection());
+ assertEquals("bjarne", msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class RemoveLocationReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testDocumentReply("RemoveLocationReply", DocumentProtocol.REPLY_REMOVELOCATION);
+ }
+ }
+
+ class CreateVisitorMessageTest implements RunnableTest {
+ private static final String BUCKET_SPACE = "bjarne";
+
+ @Override
+ public void run() {
+ var msg = new CreateVisitorMessage("SomeLibrary", "myvisitor", "newyork", "london");
+ msg.setDocumentSelection("true and false or true");
+ msg.getParameters().put("myvar", Utf8.toBytes("somevalue"));
+ msg.getParameters().put("anothervar", Utf8.toBytes("34"));
+ msg.getBuckets().add(new BucketId(16, 1234));
+ msg.setVisitRemoves(true);
+ msg.setVisitInconsistentBuckets(true);
+ msg.setFieldSet("foo bar");
+ msg.setMaxBucketsPerVisitor(2);
+ msg.setBucketSpace(BUCKET_SPACE);
+ msg.setMaxPendingReplyCount(12);
+
+ serialize("CreateVisitorMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var msg2 = (CreateVisitorMessage)deserialize("CreateVisitorMessage", DocumentProtocol.MESSAGE_CREATEVISITOR, lang);
+ assertEquals("SomeLibrary", msg2.getLibraryName());
+ assertEquals("myvisitor", msg2.getInstanceId());
+ assertEquals("newyork", msg2.getControlDestination());
+ assertEquals("london", msg2.getDataDestination());
+ assertEquals("true and false or true", msg2.getDocumentSelection());
+ assertEquals(12, msg2.getMaxPendingReplyCount());
+ assertTrue(msg2.getVisitRemoves());
+ assertEquals("foo bar", msg2.getFieldSet());
+ assertTrue(msg2.getVisitInconsistentBuckets());
+ assertEquals(1, msg2.getBuckets().size());
+ assertEquals(new BucketId(16, 1234), msg2.getBuckets().iterator().next());
+ assertEquals(2, msg2.getParameters().size());
+ assertEquals("somevalue", Utf8.toString(msg2.getParameters().get("myvar")));
+ assertEquals("34", Utf8.toString(msg2.getParameters().get("anothervar")));
+ assertEquals(2, msg2.getMaxBucketsPerVisitor());
+ assertEquals(BUCKET_SPACE, msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class CreateVisitorReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR);
+ reply.setLastBucket(new BucketId(16, 123));
+ reply.getVisitorStatistics().setBucketsVisited(3);
+ reply.getVisitorStatistics().setDocumentsVisited(1000);
+ reply.getVisitorStatistics().setBytesVisited(1024000);
+ reply.getVisitorStatistics().setDocumentsReturned(123);
+ reply.getVisitorStatistics().setBytesReturned(512000);
+
+ serialize("CreateVisitorReply", reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (CreateVisitorReply)deserialize("CreateVisitorReply", DocumentProtocol.REPLY_CREATEVISITOR, lang);
+ assertNotNull(reply2);
+ assertEquals(new BucketId(16, 123), reply2.getLastBucket());
+ assertEquals(3, reply2.getVisitorStatistics().getBucketsVisited());
+ assertEquals(1000, reply2.getVisitorStatistics().getDocumentsVisited());
+ assertEquals(1024000, reply2.getVisitorStatistics().getBytesVisited());
+ assertEquals(123, reply2.getVisitorStatistics().getDocumentsReturned());
+ assertEquals(512000, reply2.getVisitorStatistics().getBytesReturned());
+ });
+ }
+ }
+
+ class DestroyVisitorMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new DestroyVisitorMessage("myvisitor");
+ serialize("DestroyVisitorMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (DestroyVisitorMessage)deserialize("DestroyVisitorMessage", DocumentProtocol.MESSAGE_DESTROYVISITOR, lang);
+ assertEquals("myvisitor", msg2.getInstanceId());
+ });
+ }
+ }
+
+ class DestroyVisitorReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("DestroyVisitorReply", DocumentProtocol.REPLY_DESTROYVISITOR);
+ }
+ }
+
+ class MapVisitorMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new MapVisitorMessage();
+ msg.getData().put("foo", "3");
+ msg.getData().put("bar", "5");
+ serialize("MapVisitorMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (MapVisitorMessage) deserialize("MapVisitorMessage", DocumentProtocol.MESSAGE_MAPVISITOR, lang);
+ assertEquals(2, msg2.getData().size());
+ assertEquals("3", msg2.getData().get("foo"));
+ assertEquals("5", msg2.getData().get("bar"));
+ });
+ }
+ }
+
+ class MapVisitorReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("MapVisitorReply", DocumentProtocol.REPLY_MAPVISITOR);
+ }
+ }
+
+ class QueryResultMessageTest implements RunnableTest {
+ @Override
+ public void run() throws Exception {
+ test_result_with_match_features();
+
+ Routable routable = deserialize("QueryResultMessage-1", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ QueryResultMessage msg = (QueryResultMessage)routable;
+ assertEquals(0, msg.getResult().getHitCount());
+
+ routable = deserialize("QueryResultMessage-2", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ msg = (QueryResultMessage)routable;
+ assertEquals(2, msg.getResult().getHitCount());
+ com.yahoo.vdslib.SearchResult.Hit h = msg.getResult().getHit(0);
+ assertEquals(89.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+ h = msg.getResult().getHit(1);
+ assertEquals(109.0, h.getRank(), 1E-6);
+ assertEquals("doc17", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+
+ routable = deserialize("QueryResultMessage-3", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ msg = (QueryResultMessage)routable;
+ assertEquals(2, msg.getResult().getHitCount());
+ h = msg.getResult().getHit(0);
+ assertEquals(109.0, h.getRank(), 1E-6);
+ assertEquals("doc17", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+ h = msg.getResult().getHit(1);
+ assertEquals(89.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+
+ routable = deserialize("QueryResultMessage-4", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ msg = (QueryResultMessage)routable;
+ assertEquals(3, msg.getResult().getHitCount());
+ h = msg.getResult().getHit(0);
+ assertTrue(h instanceof SearchResult.HitWithSortBlob);
+ assertEquals(89.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ byte[] b = ((SearchResult.HitWithSortBlob)h).getSortBlob();
+ assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '2' }, b);
+
+ h = msg.getResult().getHit(1);
+ assertTrue(h instanceof SearchResult.HitWithSortBlob);
+ assertEquals(109.0, h.getRank(), 1E-6);
+ assertEquals("doc17", h.getDocId());
+ b = ((SearchResult.HitWithSortBlob)h).getSortBlob();
+ assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '1' }, b);
+
+ h = msg.getResult().getHit(2);
+ assertTrue(h instanceof SearchResult.HitWithSortBlob);
+ assertEquals(90.0, h.getRank(), 1E-6);
+ assertEquals("doc18", h.getDocId());
+ b = ((SearchResult.HitWithSortBlob)h).getSortBlob();
+ assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '3' }, b);
+ }
+
+ void assertEqualsData(byte[] exp, byte[] act) {
+ assertEquals(exp.length, act.length);
+ for (int i = 0; i < exp.length; ++i) {
+ assertEquals(exp[i], act[i]);
+ }
+ }
+
+ void test_result_with_match_features() {
+ Routable routable = deserialize("QueryResultMessage-6", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ var msg = (QueryResultMessage)routable;
+ assertEquals(2, msg.getResult().getHitCount());
+
+ var h = msg.getResult().getHit(0);
+ assertTrue(h instanceof SearchResult.Hit);
+ assertEquals(7.0, h.getRank(), 1E-6);
+ assertEquals("doc2", h.getDocId());
+ assertTrue(h.getMatchFeatures().isPresent());
+ var mf = h.getMatchFeatures().get();
+ assertEquals(12.0, mf.field("foo").asDouble(), 1E-6);
+ assertEqualsData(new byte[] { 'T', 'h', 'e', 'r', 'e' }, mf.field("bar").asData());
+
+ h = msg.getResult().getHit(1);
+ assertTrue(h instanceof SearchResult.Hit);
+ assertEquals(5.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ assertTrue(h.getMatchFeatures().isPresent());
+ mf = h.getMatchFeatures().get();
+ assertEquals(1.0, mf.field("foo").asDouble(), 1E-6);
+ assertEqualsData(new byte[] { 'H', 'i' }, mf.field("bar").asData());
+ }
+ }
+
+ class QueryResultReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("QueryResultReply", DocumentProtocol.REPLY_QUERYRESULT);
+ }
+ }
+
+ class VisitorInfoMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new VisitorInfoMessage();
+ msg.getFinishedBuckets().add(new BucketId(16, 1));
+ msg.getFinishedBuckets().add(new BucketId(16, 2));
+ msg.getFinishedBuckets().add(new BucketId(16, 4));
+ msg.setErrorMessage("error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6");
+
+ serialize("VisitorInfoMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var msg2 = (VisitorInfoMessage)deserialize("VisitorInfoMessage", DocumentProtocol.MESSAGE_VISITORINFO, lang);
+ assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 1)));
+ assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 2)));
+ assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 4)));
+ assertEquals("error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6", msg2.getErrorMessage());
+ });
+ }
+ }
+
+ class VisitorInfoReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("VisitorInfoReply", DocumentProtocol.REPLY_VISITORINFO);
+ }
+ }
+
+ class DocumentListMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new DocumentListMessage();
+ msg.setBucketId(new BucketId(17, 1234));
+ var doc = new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:scheme:testdoc:n=1234:1");
+ msg.getDocuments().add(new DocumentListEntry(doc, 1234, true));
+
+ serialize("DocumentListMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (DocumentListMessage) deserialize("DocumentListMessage", DocumentProtocol.MESSAGE_DOCUMENTLIST, lang);
+ assertEquals(new BucketId(17, 1234), msg2.getBucketId());
+ assertEquals(1, msg2.getDocuments().size());
+ var entry = msg2.getDocuments().get(0);
+ assertEquals("id:scheme:testdoc:n=1234:1", entry.getDocument().getId().toString());
+ assertEquals(1234, entry.getTimestamp());
+ assertTrue(entry.isRemoveEntry());
+ });
+ }
+ }
+
+ class DocumentListReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("DocumentListReply", DocumentProtocol.REPLY_DOCUMENTLIST);
+ }
+ }
+
+ class EmptyBucketsMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var bids = new ArrayList<BucketId>();
+ for (int i = 0; i < 13; ++i) {
+ bids.add(new BucketId(16, i));
+ }
+ var ebm = new EmptyBucketsMessage(bids);
+ serialize("EmptyBucketsMessage", ebm);
+ forEachLanguage((lang) -> {
+ var ebm2 = (EmptyBucketsMessage)deserialize("EmptyBucketsMessage", DocumentProtocol.MESSAGE_EMPTYBUCKETS, lang);
+ assertEquals(13, ebm2.getBucketIds().size());
+ for (int i = 0; i < 13; ++i) {
+ assertEquals(new BucketId(16, i), ebm2.getBucketIds().get(i));
+ }
+ });
+ }
+ }
+
+ class EmptyBucketsReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("EmptyBucketsReply", DocumentProtocol.REPLY_EMPTYBUCKETS);
+ }
+ }
+
+ class GetBucketListMessageTest implements RunnableTest {
+ private static final String BUCKET_SPACE = "beartato";
+
+ @Override
+ public void run() {
+ var msg = new GetBucketListMessage(new BucketId(16, 123));
+ msg.setBucketSpace(BUCKET_SPACE);
+ serialize("GetBucketListMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (GetBucketListMessage)deserialize("GetBucketListMessage", DocumentProtocol.MESSAGE_GETBUCKETLIST, lang);
+ assertEquals(new BucketId(16, 123), msg2.getBucketId());
+ assertEquals(BUCKET_SPACE, msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class GetBucketListReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new GetBucketListReply();
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(16, 123), "foo"));
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(17, 1123), "bar"));
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(18, 11123), "zoink"));
+
+ serialize("GetBucketListReply", reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (GetBucketListReply)deserialize("GetBucketListReply", DocumentProtocol.REPLY_GETBUCKETLIST, lang);
+ assertEquals(3, reply2.getBuckets().size());
+ assertEquals(reply2.getBuckets().get(0), new GetBucketListReply.BucketInfo(new BucketId(16, 123), "foo"));
+ assertEquals(reply2.getBuckets().get(1), new GetBucketListReply.BucketInfo(new BucketId(17, 1123), "bar"));
+ assertEquals(reply2.getBuckets().get(2), new GetBucketListReply.BucketInfo(new BucketId(18, 11123), "zoink"));
+ });
+ }
+ }
+
+ class GetBucketStateMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new GetBucketStateMessage(new BucketId(16, 666));
+ serialize("GetBucketStateMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var msg2 = (GetBucketStateMessage)deserialize("GetBucketStateMessage", DocumentProtocol.MESSAGE_GETBUCKETSTATE, lang);
+ assertEquals(16, msg2.getBucketId().getUsedBits());
+ assertEquals(4611686018427388570L, msg2.getBucketId().getId());
+ });
+ }
+ }
+
+ class GetBucketStateReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var foo = new GlobalId(IdString.createIdString("id:ns:testdoc::foo"));
+ var bar = new GlobalId(IdString.createIdString("id:ns:testdoc::bar"));
+ var baz = new DocumentId("id:ns:testdoc::baz");
+
+ var reply = new GetBucketStateReply();
+ var state = new ArrayList<DocumentState>(3);
+ state.add(new DocumentState(foo, 777, false));
+ state.add(new DocumentState(bar, 888, true));
+ state.add(new DocumentState(baz, 999, false));
+ reply.setBucketState(state);
+
+ serialize("GetBucketStateReply", reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (GetBucketStateReply)deserialize("GetBucketStateReply", DocumentProtocol.REPLY_GETBUCKETSTATE, lang);
+ assertEquals(3, reply2.getBucketState().size());
+
+ assertEquals(777, reply2.getBucketState().get(0).getTimestamp());
+ assertEquals(foo, reply2.getBucketState().get(0).getGid());
+ assertFalse(reply2.getBucketState().get(0).hasDocId());
+ assertFalse(reply2.getBucketState().get(0).isRemoveEntry());
+
+ assertEquals(888, reply2.getBucketState().get(1).getTimestamp());
+ assertEquals(bar, reply2.getBucketState().get(1).getGid());
+ assertFalse(reply2.getBucketState().get(1).hasDocId());
+ assertTrue(reply2.getBucketState().get(1).isRemoveEntry());
+
+ assertEquals(999, reply2.getBucketState().get(2).getTimestamp());
+ assertTrue(reply2.getBucketState().get(2).hasDocId());
+ assertEquals(new GlobalId(baz.getGlobalId()), reply2.getBucketState().get(2).getGid());
+ assertEquals(baz, reply2.getBucketState().get(2).getDocId());
+ assertFalse(reply2.getBucketState().get(2).isRemoveEntry());
+ });
+ }
+ }
+
+ class StatBucketMessageTest implements RunnableTest {
+ private static final String BUCKET_SPACE = "andrei";
+
+ @Override
+ public void run() {
+ var msg = new StatBucketMessage(new BucketId(16, 123), "id.user=123");
+ msg.setBucketSpace(BUCKET_SPACE);
+ serialize("StatBucketMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (StatBucketMessage)deserialize("StatBucketMessage", DocumentProtocol.MESSAGE_STATBUCKET, lang);
+ assertEquals(new BucketId(16, 123), msg2.getBucketId());
+ assertEquals("id.user=123", msg2.getDocumentSelection());
+ assertEquals(BUCKET_SPACE, msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class StatBucketReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new StatBucketReply();
+ msg.setResults("These are the votes of the Norwegian jury");
+ serialize("StatBucketReply", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (StatBucketReply)deserialize("StatBucketReply", DocumentProtocol.REPLY_STATBUCKET, lang);
+ assertEquals("These are the votes of the Norwegian jury", msg2.getResults());
+ });
+ }
+ }
+
+ class WrongDistributionReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new WrongDistributionReply("distributor:3 storage:2");
+ serialize("WrongDistributionReply", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (WrongDistributionReply)deserialize("WrongDistributionReply", DocumentProtocol.REPLY_WRONGDISTRIBUTION, lang);
+ assertEquals("distributor:3 storage:2", reply2.getSystemState());
+ });
+ }
+ }
+
+ class DocumentIgnoredReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new DocumentIgnoredReply();
+ serialize("DocumentIgnoredReply", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (DocumentIgnoredReply)deserialize("DocumentIgnoredReply", DocumentProtocol.REPLY_DOCUMENTIGNORED, lang);
+ assertNotNull(reply2);
+ });
+ }
+ }
+
+ private void testDocumentReply(String filename, int type) {
+ var reply = new DocumentReply(type);
+ serialize(filename, reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (DocumentReply)deserialize(filename, type, lang);
+ assertNotNull(reply2);
+ });
+ }
+
+ private void testVisitorReply(String filename, int type) {
+ VisitorReply reply = new VisitorReply(type);
+ serialize(filename, reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (VisitorReply)deserialize(filename, type, lang);
+ assertNotNull(reply2);
+ });
+ }
+
+}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java
index f15b0fe3995..16938ab843a 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java
@@ -1,10 +1,11 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol.test;
+package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.component.Version;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentTypeManagerConfigurer;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.test.TestFileUtil;
import com.yahoo.messagebus.Routable;
import org.junit.Test;
@@ -20,7 +21,7 @@ import static org.junit.Assert.*;
*/
public abstract class MessagesTestBase {
- protected enum Language {
+ public enum Language {
JAVA,
CPP
}
diff --git a/documentapi/src/tests/messages/CMakeLists.txt b/documentapi/src/tests/messages/CMakeLists.txt
index dec61432e4b..3428c34786c 100644
--- a/documentapi/src/tests/messages/CMakeLists.txt
+++ b/documentapi/src/tests/messages/CMakeLists.txt
@@ -8,6 +8,16 @@ vespa_add_executable(documentapi_messages60_test_app TEST
documentapi
)
vespa_add_test(NAME documentapi_messages60_test_app COMMAND documentapi_messages60_test_app)
+
+vespa_add_executable(documentapi_messages80_test_app TEST
+ SOURCES
+ testbase.cpp
+ messages80test.cpp
+ DEPENDS
+ documentapi
+)
+vespa_add_test(NAME documentapi_messages80_test_app COMMAND documentapi_messages80_test_app)
+
vespa_add_executable(documentapi_error_codes_test_app_app TEST
SOURCES
error_codes_test.cpp
diff --git a/documentapi/src/tests/messages/messages60test.cpp b/documentapi/src/tests/messages/messages60test.cpp
index 99ecb3644a5..281e1123e54 100644
--- a/documentapi/src/tests/messages/messages60test.cpp
+++ b/documentapi/src/tests/messages/messages60test.cpp
@@ -751,7 +751,7 @@ Messages60Test::testVisitorInfoMessage()
bool
Messages60Test::testDestroyVisitorReply()
{
- return tryDocumentReply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR);
+ return tryVisitorReply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR);
}
bool
@@ -922,23 +922,6 @@ Messages60Test::testRemoveLocationReply()
////////////////////////////////////////////////////////////////////////////////
bool
-Messages60Test::tryDocumentReply(const string &filename, uint32_t type)
-{
- DocumentReply tmp(type);
-
- EXPECT_EQUAL((uint32_t)5, serialize(filename, tmp));
-
- for (uint32_t lang = 0; lang < NUM_LANGUAGES; ++lang) {
- mbus::Routable::UP obj = deserialize(filename, type, lang);
- if (EXPECT_TRUE(obj)) {
- DocumentReply *ref = dynamic_cast<DocumentReply*>(obj.get());
- EXPECT_TRUE(ref != NULL);
- }
- }
- return true;
-}
-
-bool
Messages60Test::tryVisitorReply(const string &filename, uint32_t type)
{
VisitorReply tmp(type);
diff --git a/documentapi/src/tests/messages/messages60test.h b/documentapi/src/tests/messages/messages60test.h
index 88bc88097eb..d1060a83962 100644
--- a/documentapi/src/tests/messages/messages60test.h
+++ b/documentapi/src/tests/messages/messages60test.h
@@ -6,9 +6,9 @@
class Messages60Test : public TestBase {
protected:
- const vespalib::Version getVersion() const override { return vespalib::Version(6, 221); }
+ vespalib::Version getVersion() const override { return vespalib::Version(6, 221); }
bool shouldTestCoverage() const override { return true; }
- bool tryDocumentReply(const string &filename, uint32_t type);
+
bool tryVisitorReply(const string &filename, uint32_t type);
static size_t serializedLength(const string & str) { return sizeof(int32_t) + str.size(); }
@@ -23,7 +23,6 @@ public:
bool testDocumentIgnoredReply();
bool testDocumentListMessage();
bool testDocumentListReply();
- bool testDocumentSummaryMessage();
bool testEmptyBucketsMessage();
bool testEmptyBucketsReply();
bool testGetBucketListMessage();
@@ -42,7 +41,6 @@ public:
bool testRemoveDocumentReply();
bool testRemoveLocationMessage();
bool testRemoveLocationReply();
- bool testSearchResultMessage();
bool testStatBucketMessage();
bool testStatBucketReply();
bool testUpdateDocumentMessage();
diff --git a/documentapi/src/tests/messages/messages80test.cpp b/documentapi/src/tests/messages/messages80test.cpp
new file mode 100644
index 00000000000..9b97f332318
--- /dev/null
+++ b/documentapi/src/tests/messages/messages80test.cpp
@@ -0,0 +1,908 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "testbase.h"
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/datatype/documenttype.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/select/parser.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/update/fieldpathupdates.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/vespalib/test/insertion_operators.h>
+#include <vespa/vespalib/util/featureset.h>
+#include <array>
+
+using document::DataType;
+using document::DocumentTypeRepo;
+using vespalib::FeatureValues;
+
+// TODO rewrite to GTest!
+class Messages80Test : public TestBase {
+protected:
+ vespalib::Version getVersion() const override {
+ // Must be as high--or higher--than the v8 protocol version specified in documentprocotol.cpp
+ // (and equal to its corresponding value in the Java implementation).
+ return {8, 305};
+ }
+ bool shouldTestCoverage() const override { return true; }
+
+ bool try_visitor_reply(const string& filename, uint32_t type);
+
+ static constexpr std::array<uint32_t, 2> languages() noexcept {
+ return {TestBase::LANG_CPP, TestBase::LANG_JAVA};
+ }
+
+public:
+ Messages80Test();
+ ~Messages80Test() override = default;
+
+ bool test_create_visitor_message();
+ bool test_create_visitor_reply();
+ bool test_destroy_visitor_message();
+ bool test_destroy_visitor_reply();
+ bool test_document_ignored_reply();
+ bool test_document_list_message();
+ bool test_document_list_reply();
+ bool test_empty_buckets_message();
+ bool test_empty_buckets_reply();
+ bool test_get_bucket_list_message();
+ bool test_get_bucket_list_reply();
+ bool test_get_bucket_state_message();
+ bool test_get_bucket_state_reply();
+ bool test_get_document_message();
+ bool test_get_document_reply();
+ bool test_map_visitor_message();
+ bool test_map_visitor_reply();
+ bool test_put_document_message();
+ bool test_put_document_reply();
+ bool test_query_result_message();
+ bool test_query_result_reply();
+ bool test_remove_document_message();
+ bool test_remove_document_reply();
+ bool test_remove_location_message();
+ bool test_remove_location_reply();
+ bool test_stat_bucket_message();
+ bool test_stat_bucket_reply();
+ bool test_update_document_message();
+ bool test_update_document_reply();
+ bool test_visitor_info_message();
+ bool test_visitor_info_reply();
+ bool test_wrong_distribution_reply();
+
+ void do_test_get_reply_with_doc();
+ void do_test_empty_get_reply();
+};
+
+namespace {
+
+std::vector<char> doc1_mf_data{'H', 'i'};
+std::vector<char> doc2_mf_data{'T', 'h', 'e', 'r', 'e'};
+
+}
+
+Messages80Test::Messages80Test() {
+ putTest(DocumentProtocol::MESSAGE_CREATEVISITOR, TEST_METHOD(Messages80Test::test_create_visitor_message));
+ putTest(DocumentProtocol::MESSAGE_DESTROYVISITOR, TEST_METHOD(Messages80Test::test_destroy_visitor_message));
+ putTest(DocumentProtocol::MESSAGE_DOCUMENTLIST, TEST_METHOD(Messages80Test::test_document_list_message));
+ putTest(DocumentProtocol::MESSAGE_EMPTYBUCKETS, TEST_METHOD(Messages80Test::test_empty_buckets_message));
+ putTest(DocumentProtocol::MESSAGE_GETBUCKETLIST, TEST_METHOD(Messages80Test::test_get_bucket_list_message));
+ putTest(DocumentProtocol::MESSAGE_GETBUCKETSTATE, TEST_METHOD(Messages80Test::test_get_bucket_state_message));
+ putTest(DocumentProtocol::MESSAGE_GETDOCUMENT, TEST_METHOD(Messages80Test::test_get_document_message));
+ putTest(DocumentProtocol::MESSAGE_MAPVISITOR, TEST_METHOD(Messages80Test::test_map_visitor_message));
+ putTest(DocumentProtocol::MESSAGE_PUTDOCUMENT, TEST_METHOD(Messages80Test::test_put_document_message));
+ putTest(DocumentProtocol::MESSAGE_QUERYRESULT, TEST_METHOD(Messages80Test::test_query_result_message));
+ putTest(DocumentProtocol::MESSAGE_REMOVEDOCUMENT, TEST_METHOD(Messages80Test::test_remove_document_message));
+ putTest(DocumentProtocol::MESSAGE_REMOVELOCATION, TEST_METHOD(Messages80Test::test_remove_location_message));
+ putTest(DocumentProtocol::MESSAGE_STATBUCKET, TEST_METHOD(Messages80Test::test_stat_bucket_message));
+ putTest(DocumentProtocol::MESSAGE_UPDATEDOCUMENT, TEST_METHOD(Messages80Test::test_update_document_message));
+ putTest(DocumentProtocol::MESSAGE_VISITORINFO, TEST_METHOD(Messages80Test::test_visitor_info_message));
+
+ putTest(DocumentProtocol::REPLY_CREATEVISITOR, TEST_METHOD(Messages80Test::test_create_visitor_reply));
+ putTest(DocumentProtocol::REPLY_DESTROYVISITOR, TEST_METHOD(Messages80Test::test_destroy_visitor_reply));
+ putTest(DocumentProtocol::REPLY_DOCUMENTIGNORED, TEST_METHOD(Messages80Test::test_document_ignored_reply));
+ putTest(DocumentProtocol::REPLY_DOCUMENTLIST, TEST_METHOD(Messages80Test::test_document_list_reply));
+ putTest(DocumentProtocol::REPLY_EMPTYBUCKETS, TEST_METHOD(Messages80Test::test_empty_buckets_reply));
+ putTest(DocumentProtocol::REPLY_GETBUCKETLIST, TEST_METHOD(Messages80Test::test_get_bucket_list_reply));
+ putTest(DocumentProtocol::REPLY_GETBUCKETSTATE, TEST_METHOD(Messages80Test::test_get_bucket_state_reply));
+ putTest(DocumentProtocol::REPLY_GETDOCUMENT, TEST_METHOD(Messages80Test::test_get_document_reply));
+ putTest(DocumentProtocol::REPLY_MAPVISITOR, TEST_METHOD(Messages80Test::test_map_visitor_reply));
+ putTest(DocumentProtocol::REPLY_PUTDOCUMENT, TEST_METHOD(Messages80Test::test_put_document_reply));
+ putTest(DocumentProtocol::REPLY_QUERYRESULT, TEST_METHOD(Messages80Test::test_query_result_reply));
+ putTest(DocumentProtocol::REPLY_REMOVEDOCUMENT, TEST_METHOD(Messages80Test::test_remove_document_reply));
+ putTest(DocumentProtocol::REPLY_REMOVELOCATION, TEST_METHOD(Messages80Test::test_remove_location_reply));
+ putTest(DocumentProtocol::REPLY_STATBUCKET, TEST_METHOD(Messages80Test::test_stat_bucket_reply));
+ putTest(DocumentProtocol::REPLY_UPDATEDOCUMENT, TEST_METHOD(Messages80Test::test_update_document_reply));
+ putTest(DocumentProtocol::REPLY_VISITORINFO, TEST_METHOD(Messages80Test::test_visitor_info_reply));
+ putTest(DocumentProtocol::REPLY_WRONGDISTRIBUTION, TEST_METHOD(Messages80Test::test_wrong_distribution_reply));
+}
+
+namespace {
+
+document::Document::SP
+createDoc(const DocumentTypeRepo& repo, const string& type_name, const string& id) {
+ return std::make_shared<document::Document>(repo, *repo.getDocumentType(type_name), document::DocumentId(id));
+}
+
+}
+
+bool Messages80Test::test_get_document_message() {
+ GetDocumentMessage tmp(document::DocumentId("id:ns:testdoc::"), "foo bar");
+ EXPECT_EQUAL(280u, sizeof(GetDocumentMessage)); // FIXME doesn't belong here
+ serialize("GetDocumentMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetDocumentMessage", DocumentProtocol::MESSAGE_GETDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetDocumentMessage&>(*obj);
+ EXPECT_EQUAL(ref.getDocumentId().toString(), "id:ns:testdoc::");
+ EXPECT_EQUAL(ref.getFieldSet(), "foo bar");
+ }
+ }
+ return true;
+}
+
+void Messages80Test::do_test_get_reply_with_doc() {
+ auto doc = createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::");
+ GetDocumentReply tmp(doc);
+ tmp.setLastModified(1234567);
+
+ EXPECT_EQUAL(128u, sizeof(GetDocumentReply)); // FIXME doesn't belong here!
+ serialize("GetDocumentReply", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetDocumentReply", DocumentProtocol::REPLY_GETDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getLastModified(), 1234567ULL); // FIXME signed vs. unsigned... -_-
+ ASSERT_TRUE(ref.hasDocument());
+ auto& doc2 = ref.getDocument();
+ EXPECT_EQUAL(doc2.getType().getName(), "testdoc");
+ EXPECT_EQUAL(doc2.getId().toString(), "id:ns:testdoc::");
+ EXPECT_EQUAL(doc2.getLastModified(), 1234567LL); // FIXME signed vs. unsigned... -_-
+ }
+ }
+}
+
+void Messages80Test::do_test_empty_get_reply() {
+ GetDocumentReply tmp;
+ serialize("GetDocumentReply-empty", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetDocumentReply-empty", DocumentProtocol::REPLY_GETDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getLastModified(), 0ULL);
+ EXPECT_FALSE(ref.hasDocument());
+ }
+ }
+}
+
+bool Messages80Test::test_get_document_reply() {
+ TEST_DO(do_test_get_reply_with_doc());
+ TEST_DO(do_test_empty_get_reply());
+ return true;
+}
+
+bool Messages80Test::test_put_document_message() {
+ auto doc = createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::");
+ PutDocumentMessage msg(doc);
+
+ msg.setTimestamp(666);
+ msg.setCondition(TestAndSetCondition("There's just one condition"));
+
+ // FIXME these don't belong here!
+ EXPECT_EQUAL(64u, sizeof(vespalib::string));
+ EXPECT_EQUAL(sizeof(vespalib::string), sizeof(TestAndSetCondition));
+ EXPECT_EQUAL(112u, sizeof(DocumentMessage));
+ EXPECT_EQUAL(sizeof(TestAndSetCondition) + sizeof(DocumentMessage), sizeof(TestAndSetMessage));
+ EXPECT_EQUAL(sizeof(TestAndSetMessage) + 32, sizeof(PutDocumentMessage));
+
+ serialize("PutDocumentMessage", msg);
+
+ for (auto lang : languages()) {
+ auto routableUp = deserialize("PutDocumentMessage", DocumentProtocol::MESSAGE_PUTDOCUMENT, lang);
+ if (EXPECT_TRUE(routableUp)) {
+ auto& deserializedMsg = dynamic_cast<PutDocumentMessage &>(*routableUp);
+
+ EXPECT_EQUAL(deserializedMsg.getDocument().getType().getName(), msg.getDocument().getType().getName());
+ EXPECT_EQUAL(deserializedMsg.getDocument().getId().toString(), msg.getDocument().getId().toString());
+ EXPECT_EQUAL(deserializedMsg.getTimestamp(), msg.getTimestamp());
+ EXPECT_GREATER(deserializedMsg.getApproxSize(), 0u);
+ EXPECT_EQUAL(deserializedMsg.getCondition().getSelection(), msg.getCondition().getSelection());
+ EXPECT_FALSE(deserializedMsg.get_create_if_non_existent());
+ }
+ }
+
+ //-------------------------------------------------------------------------
+
+ PutDocumentMessage msg2(createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::"));
+ msg2.set_create_if_non_existent(true);
+ serialize("PutDocumentMessage-create", msg2);
+ for (auto lang : languages()) {
+ auto obj = deserialize("PutDocumentMessage-create", DocumentProtocol::MESSAGE_PUTDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& decoded = dynamic_cast<PutDocumentMessage&>(*obj);
+ EXPECT_TRUE(decoded.get_create_if_non_existent());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_put_document_reply() {
+ WriteDocumentReply reply(DocumentProtocol::REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(30);
+
+ serialize("PutDocumentReply", reply);
+ EXPECT_EQUAL(sizeof(WriteDocumentReply), 112u); // FIXME doesn't belong here!
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("PutDocumentReply", DocumentProtocol::REPLY_PUTDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<WriteDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_update_document_message() {
+ const DocumentTypeRepo& repo = getTypeRepo();
+ const document::DocumentType& docType = *repo.getDocumentType("testdoc");
+
+ auto doc_update = std::make_shared<document::DocumentUpdate>(repo, docType, document::DocumentId("id:ns:testdoc::"));
+ doc_update->addFieldPathUpdate(std::make_unique<document::RemoveFieldPathUpdate>("intfield", "testdoc.intfield > 0"));
+
+ UpdateDocumentMessage msg(std::move(doc_update));
+ msg.setOldTimestamp(666u);
+ msg.setNewTimestamp(777u);
+ msg.setCondition(TestAndSetCondition("There's just one condition"));
+
+ EXPECT_EQUAL(sizeof(TestAndSetMessage) + 32, sizeof(UpdateDocumentMessage)); // FIXME doesn't belong here!
+ serialize("UpdateDocumentMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("UpdateDocumentMessage", DocumentProtocol::MESSAGE_UPDATEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& decoded = dynamic_cast<UpdateDocumentMessage&>(*obj);
+ EXPECT_EQUAL(decoded.getDocumentUpdate(), msg.getDocumentUpdate());
+ EXPECT_EQUAL(decoded.getOldTimestamp(), msg.getOldTimestamp());
+ EXPECT_EQUAL(decoded.getNewTimestamp(), msg.getNewTimestamp());
+ EXPECT_GREATER(decoded.getApproxSize(), 0u); // Actual value depends on protobuf size
+ EXPECT_EQUAL(decoded.getCondition().getSelection(), msg.getCondition().getSelection());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_update_document_reply() {
+ UpdateDocumentReply reply;
+ reply.setWasFound(true);
+ reply.setHighestModificationTimestamp(30);
+
+ serialize("UpdateDocumentReply", reply);
+ EXPECT_EQUAL(120u, sizeof(UpdateDocumentReply)); // FIXME doesn't belong here!
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("UpdateDocumentReply", DocumentProtocol::REPLY_UPDATEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<UpdateDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u);
+ EXPECT_TRUE(ref.wasFound());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_document_message() {
+ RemoveDocumentMessage msg(document::DocumentId("id:ns:testdoc::"));
+ msg.setCondition(TestAndSetCondition("There's just one condition"));
+
+ EXPECT_EQUAL(sizeof(TestAndSetMessage) + 104, sizeof(RemoveDocumentMessage)); // FIXME doesn't belong here!
+ serialize("RemoveDocumentMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveDocumentMessage", DocumentProtocol::MESSAGE_REMOVEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<RemoveDocumentMessage &>(*obj);
+ EXPECT_EQUAL(ref.getDocumentId().toString(), "id:ns:testdoc::");
+ EXPECT_EQUAL(ref.getCondition().getSelection(), msg.getCondition().getSelection());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_document_reply() {
+ RemoveDocumentReply reply;
+ std::vector<uint64_t> ts;
+ reply.setWasFound(true);
+ reply.setHighestModificationTimestamp(30);
+ EXPECT_EQUAL(120u, sizeof(RemoveDocumentReply)); // FIXME doesn't belong here!
+
+ serialize("RemoveDocumentReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveDocumentReply", DocumentProtocol::REPLY_REMOVEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<RemoveDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u);
+ EXPECT_TRUE(ref.wasFound());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_location_message() {
+ document::BucketIdFactory factory;
+ document::select::Parser parser(getTypeRepo(), factory);
+ RemoveLocationMessage msg(factory, parser, "id.group == \"mygroup\"");
+ msg.setBucketSpace("bjarne");
+ serialize("RemoveLocationMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveLocationMessage", DocumentProtocol::MESSAGE_REMOVELOCATION, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<RemoveLocationMessage&>(*obj);
+ EXPECT_EQUAL(ref.getDocumentSelection(), "id.group == \"mygroup\"");
+ EXPECT_EQUAL(ref.getBucketSpace(), "bjarne");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_location_reply() {
+ DocumentReply tmp(DocumentProtocol::REPLY_REMOVELOCATION);
+ serialize("RemoveLocationReply", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveLocationReply", DocumentProtocol::REPLY_REMOVELOCATION, lang);
+ EXPECT_TRUE(obj);
+ }
+ return true;
+}
+
+bool Messages80Test::test_create_visitor_message() {
+ CreateVisitorMessage tmp("SomeLibrary", "myvisitor", "newyork", "london");
+ tmp.setDocumentSelection("true and false or true");
+ tmp.getParameters().set("myvar", "somevalue");
+ tmp.getParameters().set("anothervar", uint64_t(34));
+ tmp.getBuckets().emplace_back(16, 1234);
+ tmp.setVisitRemoves(true);
+ tmp.setVisitInconsistentBuckets(true);
+ tmp.setFieldSet("foo bar");
+ tmp.setMaxBucketsPerVisitor(2);
+ tmp.setMaximumPendingReplyCount(12);
+ tmp.setBucketSpace("bjarne");
+
+ serialize("CreateVisitorMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("CreateVisitorMessage", DocumentProtocol::MESSAGE_CREATEVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<CreateVisitorMessage&>(*obj);
+
+ EXPECT_EQUAL(ref.getLibraryName(), "SomeLibrary");
+ EXPECT_EQUAL(ref.getInstanceId(), "myvisitor");
+ EXPECT_EQUAL(ref.getControlDestination(), "newyork");
+ EXPECT_EQUAL(ref.getDataDestination(), "london");
+ EXPECT_EQUAL(ref.getDocumentSelection(), "true and false or true");
+ EXPECT_EQUAL(ref.getFieldSet(), "foo bar");
+ EXPECT_EQUAL(ref.getMaximumPendingReplyCount(), uint32_t(12));
+ EXPECT_TRUE(ref.visitRemoves());
+ EXPECT_TRUE(ref.visitInconsistentBuckets());
+ ASSERT_EQUAL(ref.getBuckets().size(), size_t(1));
+ EXPECT_EQUAL(ref.getBuckets()[0], document::BucketId(16, 1234));
+ EXPECT_EQUAL(ref.getParameters().get("myvar"), "somevalue");
+ EXPECT_EQUAL(ref.getParameters().get("anothervar", uint64_t(1)), uint64_t(34));
+ EXPECT_EQUAL(ref.getMaxBucketsPerVisitor(), uint32_t(2));
+ EXPECT_EQUAL(ref.getBucketSpace(), "bjarne");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_create_visitor_reply() {
+ CreateVisitorReply reply(DocumentProtocol::REPLY_CREATEVISITOR);
+ reply.setLastBucket(document::BucketId(16, 123));
+ vdslib::VisitorStatistics vs;
+ vs.setBucketsVisited(3);
+ vs.setDocumentsVisited(1000);
+ vs.setBytesVisited(1024000);
+ vs.setDocumentsReturned(123);
+ vs.setBytesReturned(512000);
+ reply.setVisitorStatistics(vs);
+
+ serialize("CreateVisitorReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("CreateVisitorReply", DocumentProtocol::REPLY_CREATEVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<CreateVisitorReply&>(*obj);
+ EXPECT_EQUAL(ref.getLastBucket(), document::BucketId(16, 123));
+ EXPECT_EQUAL(ref.getVisitorStatistics().getBucketsVisited(), (uint32_t)3);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getDocumentsVisited(), (uint64_t)1000);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getBytesVisited(), (uint64_t)1024000);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getDocumentsReturned(), (uint64_t)123);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getBytesReturned(), (uint64_t)512000);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_destroy_visitor_message() {
+ DestroyVisitorMessage tmp("myvisitor");
+ serialize("DestroyVisitorMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("DestroyVisitorMessage", DocumentProtocol::MESSAGE_DESTROYVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<DestroyVisitorMessage&>(*obj);
+ EXPECT_EQUAL(ref.getInstanceId(), "myvisitor");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_destroy_visitor_reply() {
+ return try_visitor_reply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR);
+}
+
+bool Messages80Test::test_map_visitor_message() {
+ MapVisitorMessage tmp;
+ tmp.getData().set("foo", 3);
+ tmp.getData().set("bar", 5);
+
+ serialize("MapVisitorMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("MapVisitorMessage", DocumentProtocol::MESSAGE_MAPVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<MapVisitorMessage&>(*obj);
+ EXPECT_EQUAL(ref.getData().size(), 2u);
+ EXPECT_EQUAL(ref.getData().get("foo", 0), 3);
+ EXPECT_EQUAL(ref.getData().get("bar", 0), 5);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_map_visitor_reply() {
+ return try_visitor_reply("MapVisitorReply", DocumentProtocol::REPLY_MAPVISITOR);
+}
+
+bool Messages80Test::test_query_result_message() {
+ QueryResultMessage srm;
+ vdslib::SearchResult& sr(srm.getSearchResult());
+ EXPECT_EQUAL(srm.getSequenceId(), 0u);
+ EXPECT_EQUAL(sr.getHitCount(), 0u);
+ EXPECT_EQUAL(sr.getAggregatorList().getSerializedSize(), 4u);
+ EXPECT_EQUAL(sr.getSerializedSize(), 20u);
+ EXPECT_EQUAL(srm.getApproxSize(), 28u);
+
+ serialize("QueryResultMessage-1", srm);
+
+ // Serialization is only implemented in C++
+ {
+ auto routable = deserialize("QueryResultMessage-1", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ vdslib::SearchResult& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dm.getSequenceId(), size_t(0));
+ EXPECT_EQUAL(dr.getHitCount(), size_t(0));
+ }
+
+ sr.addHit(0, "doc1", 89);
+ sr.addHit(1, "doc17", 109);
+ serialize("QueryResultMessage-2", srm);
+
+ const char* doc_id;
+ vdslib::SearchResult::RankType rank;
+
+ {
+ auto routable = deserialize("QueryResultMessage-2", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(2));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ }
+
+ sr.sort();
+ serialize("QueryResultMessage-3", srm);
+
+ {
+ auto routable = deserialize("QueryResultMessage-3", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(2));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ }
+
+ QueryResultMessage srm2;
+ vdslib::SearchResult& sr2(srm2.getSearchResult());
+ sr2.addHit(0, "doc1", 89, "sortdata2", 9);
+ sr2.addHit(1, "doc17", 109, "sortdata1", 9);
+ sr2.addHit(2, "doc18", 90, "sortdata3", 9);
+ serialize("QueryResultMessage-4", srm2);
+
+ {
+ auto routable = deserialize("QueryResultMessage-4", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(3));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ dr.getHit(2, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90));
+ EXPECT_EQUAL(strcmp("doc18", doc_id), 0);
+ }
+
+ sr2.sort();
+ const void* buf;
+ size_t sz;
+ sr2.getHit(0, doc_id, rank);
+ sr2.getSortBlob(0, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata1", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ sr2.getHit(1, doc_id, rank);
+ sr2.getSortBlob(1, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata2", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ sr2.getHit(2, doc_id, rank);
+ sr2.getSortBlob(2, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata3", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90));
+ EXPECT_EQUAL(strcmp("doc18", doc_id), 0);
+
+ serialize("QueryResultMessage-5", srm2);
+ {
+ auto routable = deserialize("QueryResultMessage-5", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(3));
+ dr.getHit(0, doc_id, rank);
+ dr.getSortBlob(0, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata1", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ dr.getSortBlob(1, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata2", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ dr.getHit(2, doc_id, rank);
+ dr.getSortBlob(2, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata3", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90));
+ EXPECT_EQUAL(strcmp("doc18", doc_id), 0);
+ }
+
+ QueryResultMessage qrm3;
+ auto& sr3 = qrm3.getSearchResult();
+ sr3.addHit(0, "doc1", 5);
+ sr3.addHit(1, "doc2", 7);
+ FeatureValues mf;
+ mf.names.emplace_back("foo");
+ mf.names.emplace_back("bar");
+ mf.values.resize(4);
+ mf.values[0].set_double(1.0);
+ mf.values[1].set_data({doc1_mf_data.data(), doc1_mf_data.size()});
+ mf.values[2].set_double(12.0);
+ mf.values[3].set_data({doc2_mf_data.data(), doc2_mf_data.size()});
+ sr3.set_match_features(FeatureValues(mf));
+ sr3.sort();
+
+ serialize("QueryResultMessage-6", qrm3);
+ {
+ auto routable = deserialize("QueryResultMessage-6", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(2));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(vdslib::SearchResult::RankType(7), rank);
+ EXPECT_EQUAL(strcmp("doc2", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(vdslib::SearchResult::RankType(5), rank);
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ auto mfv = dr.get_match_feature_values(0);
+ EXPECT_EQUAL(mfv.size(), 2u);
+ EXPECT_EQUAL(mfv[0].as_double(), 12.0);
+ EXPECT_EQUAL(mfv[1].as_data().make_string(), "There");
+ mfv = dr.get_match_feature_values(1);
+ EXPECT_EQUAL(mfv.size(), 2u);
+ EXPECT_EQUAL(mfv[0].as_double(), 1.0);
+ EXPECT_EQUAL(mfv[1].as_data().make_string(), "Hi");
+ const auto& mf_names = dr.get_match_features().names;
+ EXPECT_EQUAL(mf_names.size(), 2u);
+ EXPECT_EQUAL(mf_names[0], "foo");
+ EXPECT_EQUAL(mf_names[1], "bar");
+ }
+ return true;
+}
+
+bool Messages80Test::test_query_result_reply() {
+ return try_visitor_reply("QueryResultReply", DocumentProtocol::REPLY_QUERYRESULT);
+}
+
+bool Messages80Test::test_visitor_info_message() {
+ VisitorInfoMessage tmp;
+ tmp.getFinishedBuckets().emplace_back(16, 1);
+ tmp.getFinishedBuckets().emplace_back(16, 2);
+ tmp.getFinishedBuckets().emplace_back(16, 4);
+ string utf8 = "error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6"; // FIXME utf-8 literal
+ tmp.setErrorMessage(utf8);
+
+ serialize("VisitorInfoMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("VisitorInfoMessage", DocumentProtocol::MESSAGE_VISITORINFO, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<VisitorInfoMessage&>(*obj);
+ ASSERT_EQUAL(ref.getFinishedBuckets().size(), 3u);
+ EXPECT_EQUAL(ref.getFinishedBuckets()[0], document::BucketId(16, 1));
+ EXPECT_EQUAL(ref.getFinishedBuckets()[1], document::BucketId(16, 2));
+ EXPECT_EQUAL(ref.getFinishedBuckets()[2], document::BucketId(16, 4));
+ EXPECT_EQUAL(ref.getErrorMessage(), utf8);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_visitor_info_reply() {
+ return try_visitor_reply("VisitorInfoReply", DocumentProtocol::REPLY_VISITORINFO);
+}
+
+bool Messages80Test::test_document_list_message() {
+ auto doc = createDoc(getTypeRepo(), "testdoc", "id:scheme:testdoc:n=1234:1");
+ DocumentListMessage::Entry entry(1234, std::move(doc), true);
+ DocumentListMessage tmp(document::BucketId(17, 1234));
+ tmp.getDocuments().push_back(std::move(entry));
+
+ serialize("DocumentListMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("DocumentListMessage", DocumentProtocol::MESSAGE_DOCUMENTLIST, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<DocumentListMessage&>(*obj);
+ ASSERT_EQUAL(ref.getDocuments().size(), 1u);
+ EXPECT_EQUAL(ref.getDocuments()[0].getDocument()->getId().toString(), "id:scheme:testdoc:n=1234:1");
+ EXPECT_EQUAL(ref.getDocuments()[0].getTimestamp(), 1234);
+ EXPECT_TRUE(ref.getDocuments()[0].isRemoveEntry());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_document_list_reply() {
+ return try_visitor_reply("DocumentListReply", DocumentProtocol::REPLY_DOCUMENTLIST);
+}
+
+bool Messages80Test::test_empty_buckets_message() {
+ std::vector<document::BucketId> bids;
+ for (size_t i=0; i < 13; ++i) {
+ bids.emplace_back(16, i);
+ }
+ EmptyBucketsMessage msg(bids);
+
+ serialize("EmptyBucketsMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("EmptyBucketsMessage", DocumentProtocol::MESSAGE_EMPTYBUCKETS, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<EmptyBucketsMessage&>(*obj);
+ ASSERT_EQUAL(ref.getBucketIds().size(), 13u);
+ for (size_t i = 0; i < 13; ++i) {
+ EXPECT_EQUAL(ref.getBucketIds()[i], document::BucketId(16, i));
+ }
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_empty_buckets_reply() {
+ return try_visitor_reply("EmptyBucketsReply", DocumentProtocol::REPLY_EMPTYBUCKETS);
+}
+
+bool Messages80Test::test_get_bucket_list_message() {
+ GetBucketListMessage msg(document::BucketId(16, 123));
+ msg.setBucketSpace("beartato");
+
+ serialize("GetBucketListMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketListMessage", DocumentProtocol::MESSAGE_GETBUCKETLIST, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketListMessage&>(*obj);
+ EXPECT_EQUAL(ref.getBucketId(), document::BucketId(16, 123));
+ EXPECT_EQUAL(ref.getBucketSpace(), "beartato");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_get_bucket_list_reply() {
+ GetBucketListReply reply;
+ reply.getBuckets().emplace_back(document::BucketId(16, 123), "foo");
+ reply.getBuckets().emplace_back(document::BucketId(17, 1123), "bar");
+ reply.getBuckets().emplace_back(document::BucketId(18, 11123), "zoink");
+
+ serialize("GetBucketListReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketListReply", DocumentProtocol::REPLY_GETBUCKETLIST, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketListReply&>(*obj);
+ ASSERT_EQUAL(ref.getBuckets().size(), 3u);
+ EXPECT_EQUAL(ref.getBuckets()[0], GetBucketListReply::BucketInfo(document::BucketId(16, 123), "foo"));
+ EXPECT_EQUAL(ref.getBuckets()[1], GetBucketListReply::BucketInfo(document::BucketId(17, 1123), "bar"));
+ EXPECT_EQUAL(ref.getBuckets()[2], GetBucketListReply::BucketInfo(document::BucketId(18, 11123), "zoink"));
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_get_bucket_state_message() {
+ GetBucketStateMessage tmp;
+ tmp.setBucketId(document::BucketId(16, 666));
+
+ serialize("GetBucketStateMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketStateMessage", DocumentProtocol::MESSAGE_GETBUCKETSTATE, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketStateMessage&>(*obj);
+ EXPECT_EQUAL(ref.getBucketId().getUsedBits(), 16u);
+ EXPECT_EQUAL(ref.getBucketId().getId(), 4611686018427388570ULL);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_get_bucket_state_reply() {
+ auto foo = document::DocumentId("id:ns:testdoc::foo").getGlobalId();
+ auto bar = document::DocumentId("id:ns:testdoc::bar").getGlobalId();
+ auto baz = document::DocumentId("id:ns:testdoc::baz");
+
+ GetBucketStateReply reply;
+ reply.getBucketState().emplace_back(foo, 777, false);
+ reply.getBucketState().emplace_back(bar, 888, true);
+ reply.getBucketState().emplace_back(baz, 999, false);
+ serialize("GetBucketStateReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketStateReply", DocumentProtocol::REPLY_GETBUCKETSTATE, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketStateReply&>(*obj);
+ ASSERT_EQUAL(ref.getBucketState().size(), 3u);
+ EXPECT_EQUAL(ref.getBucketState()[0].getTimestamp(), 777u);
+ EXPECT_FALSE(ref.getBucketState()[0].getDocumentId());
+ EXPECT_EQUAL(ref.getBucketState()[0].getGlobalId(), foo);
+ EXPECT_FALSE(ref.getBucketState()[0].isRemoveEntry());
+
+ EXPECT_EQUAL(ref.getBucketState()[1].getTimestamp(), 888u);
+ EXPECT_FALSE(ref.getBucketState()[1].getDocumentId());
+ EXPECT_EQUAL(ref.getBucketState()[1].getGlobalId(), bar);
+ EXPECT_TRUE(ref.getBucketState()[1].isRemoveEntry());
+
+ EXPECT_EQUAL(ref.getBucketState()[2].getTimestamp(), 999u);
+ EXPECT_EQUAL(ref.getBucketState()[2].getGlobalId(), baz.getGlobalId());
+ EXPECT_FALSE(ref.getBucketState()[2].isRemoveEntry());
+ ASSERT_TRUE(ref.getBucketState()[2].getDocumentId());
+ EXPECT_EQUAL(*ref.getBucketState()[2].getDocumentId(), baz);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_stat_bucket_message() {
+ StatBucketMessage msg(document::BucketId(16, 123), "id.user=123");
+ msg.setBucketSpace("andrei");
+
+ serialize("StatBucketMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("StatBucketMessage", DocumentProtocol::MESSAGE_STATBUCKET, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<StatBucketMessage&>(*obj);
+ EXPECT_EQUAL(ref.getBucketId(), document::BucketId(16, 123));
+ EXPECT_EQUAL(ref.getDocumentSelection(), "id.user=123");
+ EXPECT_EQUAL(ref.getBucketSpace(), "andrei");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_stat_bucket_reply() {
+ StatBucketReply msg;
+ msg.setResults("These are the votes of the Norwegian jury");
+
+ serialize("StatBucketReply", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("StatBucketReply", DocumentProtocol::REPLY_STATBUCKET, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<StatBucketReply&>(*obj);
+ EXPECT_EQUAL(ref.getResults(), "These are the votes of the Norwegian jury");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_wrong_distribution_reply() {
+ WrongDistributionReply tmp("distributor:3 storage:2");
+
+ serialize("WrongDistributionReply", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("WrongDistributionReply", DocumentProtocol::REPLY_WRONGDISTRIBUTION, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<WrongDistributionReply&>(*obj);
+ EXPECT_EQUAL(ref.getSystemState(), "distributor:3 storage:2");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_document_ignored_reply() {
+ DocumentIgnoredReply tmp;
+ serialize("DocumentIgnoredReply", tmp);
+ for (auto lang : languages()) {
+ auto obj = deserialize("DocumentIgnoredReply", DocumentProtocol::REPLY_DOCUMENTIGNORED, lang);
+ EXPECT_TRUE(obj);
+ }
+ return true;
+}
+
+bool Messages80Test::try_visitor_reply(const string& filename, uint32_t type) {
+ VisitorReply tmp(type);
+ serialize(filename, tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize(filename, type, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto* ptr = dynamic_cast<VisitorReply*>(obj.get());
+ EXPECT_TRUE(ptr);
+ }
+ }
+ return true;
+}
+
+// TODO rewrite to Gtest
+TEST_APPHOOK(Messages80Test);
diff --git a/documentapi/src/tests/messages/testbase.cpp b/documentapi/src/tests/messages/testbase.cpp
index 4ea770e7309..e5647ceaef8 100644
--- a/documentapi/src/tests/messages/testbase.cpp
+++ b/documentapi/src/tests/messages/testbase.cpp
@@ -25,6 +25,8 @@ TestBase::TestBase() :
{
}
+TestBase::~TestBase() = default;
+
int
TestBase::Main()
{
@@ -34,16 +36,15 @@ TestBase::Main()
LOG(info, "Running tests for version %s.", getVersion().toString().c_str());
// Run registered tests.
- for (std::map<uint32_t, TEST_METHOD_PT>::iterator it = _tests.begin();
- it != _tests.end(); ++it)
- {
- LOG(info, "Running test for routable type %d.", it->first);
- EXPECT_TRUE( (this->*(it->second))() );
+ for (const auto& test : _tests) {
+ LOG(info, "Running test for routable type %d.", test.first);
+ EXPECT_TRUE( (this->*(test.second))() );
TEST_FLUSH();
}
// Test routable type coverage.
- std::vector<uint32_t> expected, actual;
+ std::vector<uint32_t> expected;
+ std::vector<uint32_t> actual;
EXPECT_TRUE(testCoverage(expected, actual));
expected.push_back(0);
EXPECT_TRUE(!testCoverage(expected, actual));
@@ -58,10 +59,8 @@ TestBase::Main()
_protocol.getRoutableTypes(getVersion(), expected);
actual.clear();
- for (std::map<uint32_t, TEST_METHOD_PT>::iterator it = _tests.begin();
- it != _tests.end(); ++it)
- {
- actual.push_back(it->first);
+ for (const auto& test : _tests) {
+ actual.push_back(test.first);
}
if (shouldTestCoverage()) {
EXPECT_TRUE(testCoverage(expected, actual, true));
@@ -100,13 +99,11 @@ TestBase::testCoverage(const std::vector<uint32_t> &expected, const std::vector<
bool ret = true;
std::vector<uint32_t> lst(actual);
- for (std::vector<uint32_t>::const_iterator it = expected.begin();
- it != expected.end(); ++it)
- {
- std::vector<uint32_t>::iterator occ = std::find(lst.begin(), lst.end(), *it);
+ for (uint32_t wanted : expected) {
+ auto occ = std::find(lst.begin(), lst.end(), wanted);
if (occ == lst.end()) {
if (report) {
- LOG(error, "Routable type %d is registered in DocumentProtocol but not tested.", *it);
+ LOG(error, "Routable type %d is registered in DocumentProtocol but not tested.", wanted);
}
ret = false;
} else {
@@ -115,10 +112,8 @@ TestBase::testCoverage(const std::vector<uint32_t> &expected, const std::vector<
}
if (!lst.empty()) {
if (report) {
- for (std::vector<uint32_t>::iterator it = lst.begin();
- it != lst.end(); ++it)
- {
- LOG(error, "Routable type %d is tested but not registered in DocumentProtocol.", *it);
+ for (uint32_t missing : lst) {
+ LOG(error, "Routable type %d is tested but not registered in DocumentProtocol.", missing);
}
}
ret = false;
@@ -151,7 +146,7 @@ TestBase::serialize(const string &filename, const mbus::Routable &routable, Tamp
return 0;
}
mbus::Routable::UP obj = _protocol.decode(version, blob);
- if (!EXPECT_TRUE(obj.get() != NULL)) {
+ if (!EXPECT_TRUE(obj.get() != nullptr)) {
LOG(error, "Protocol failed to decode serialized data.");
return 0;
}
@@ -172,7 +167,7 @@ TestBase::deserialize(const string &filename, uint32_t classId, uint32_t lang)
mbus::Blob blob = readFile(path);
if (!EXPECT_TRUE(blob.size() != 0)) {
LOG(error, "Could not open file '%s' for reading.", path.c_str());
- return mbus::Routable::UP();
+ return {};
}
mbus::Routable::UP ret = _protocol.decode(version, blob);
@@ -180,7 +175,7 @@ TestBase::deserialize(const string &filename, uint32_t classId, uint32_t lang)
LOG(error, "Unable to decode class %d", classId);
} else if (!EXPECT_TRUE(classId == ret->getType())) {
LOG(error, "Expected class %d, got %d.", classId, ret->getType());
- return mbus::Routable::UP();
+ return {};
}
return ret;
}
diff --git a/documentapi/src/tests/messages/testbase.h b/documentapi/src/tests/messages/testbase.h
index 313f2d1f293..ad371bcc3bc 100644
--- a/documentapi/src/tests/messages/testbase.h
+++ b/documentapi/src/tests/messages/testbase.h
@@ -36,8 +36,8 @@ protected:
};
TestBase();
- virtual ~TestBase() { /* empty */ }
- virtual const vespalib::Version getVersion() const = 0;
+ ~TestBase() override;
+ virtual vespalib::Version getVersion() const = 0;
virtual bool shouldTestCoverage() const = 0;
TestBase &putTest(uint32_t type, TEST_METHOD_PT test);
int Main() override;
diff --git a/documentapi/src/vespa/documentapi/CMakeLists.txt b/documentapi/src/vespa/documentapi/CMakeLists.txt
index a3a2815cc4f..1d0b3784a9d 100644
--- a/documentapi/src/vespa/documentapi/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/CMakeLists.txt
@@ -7,3 +7,5 @@ vespa_add_library(documentapi
INSTALL lib64
DEPENDS
)
+
+vespa_add_target_package_dependency(documentapi Protobuf)
diff --git a/documentapi/src/vespa/documentapi/messagebus/.gitignore b/documentapi/src/vespa/documentapi/messagebus/.gitignore
index d58390943e2..488f2e6355d 100644
--- a/documentapi/src/vespa/documentapi/messagebus/.gitignore
+++ b/documentapi/src/vespa/documentapi/messagebus/.gitignore
@@ -2,3 +2,5 @@
Makefile
config-*.cpp
config-*.h
+*.pb.cc
+*.pb.h
diff --git a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
index c3198ba7b2b..d59fd56037d 100644
--- a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
@@ -1,12 +1,35 @@
# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+find_package(Protobuf REQUIRED)
+
+# .proto files are in a higher-level directory as they are shared across languages
+set(documentapi_messagebus_PROTOBUF_REL_PATH "../../../protobuf")
+PROTOBUF_GENERATE_CPP(documentapi_messagebus_PROTOBUF_SRCS documentapi_messagebus_PROTOBUF_HDRS
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_common.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_feed.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_inspect.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_visiting.proto")
+
+vespa_add_source_target(protobufgen_documentapi_messagebus DEPENDS
+ ${documentapi_messagebus_PROTOBUF_SRCS}
+ ${documentapi_messagebus_PROTOBUF_HDRS})
+
+vespa_suppress_warnings_for_protobuf_sources(SOURCES ${documentapi_messagebus_PROTOBUF_SRCS})
+
+# protoc explicitly annotates methods with inline, which triggers -Werror=inline when
+# the header file grows over a certain size.
+set_source_files_properties(routable_factories_8.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline")
+
vespa_add_library(documentapi_documentapimessagebus OBJECT
SOURCES
documentprotocol.cpp
replymerger.cpp
+ routable_factories_8.cpp
routablefactories60.cpp
routablerepository.cpp
routingpolicyfactories.cpp
routingpolicyrepository.cpp
+ ${documentapi_messagebus_PROTOBUF_SRCS}
DEPENDS
documentapi_documentapipolicies
)
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
index f16f63029ee..f53594c6d32 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
@@ -1,16 +1,17 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "replymerger.h"
+#include "routable_factories_8.h"
#include "routablefactories60.h"
-#include "routingpolicyfactories.h"
#include "routablerepository.h"
+#include "routingpolicyfactories.h"
#include "routingpolicyrepository.h"
-#include "replymerger.h"
#include <vespa/document/util/stringutil.h>
#include <vespa/documentapi/documentapi.h>
-#include <vespa/vespalib/util/exceptions.h>
#include <vespa/messagebus/error.h>
-#include <sstream>
+#include <vespa/vespalib/util/exceptions.h>
#include <cassert>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".documentprotocol");
@@ -40,47 +41,98 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo,
putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>());
putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>());
+ add_legacy_v6_factories();
+ add_v8_factories();
+}
+
+DocumentProtocol::~DocumentProtocol() = default;
+
+void
+DocumentProtocol::add_legacy_v6_factories()
+{
// Prepare version specifications to use when adding routable factories.
vespalib::VersionSpecification version6(6, 221);
-
std::vector<vespalib::VersionSpecification> from6 = { version6 };
// Add 6.x serialization
- putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6);
- putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6);
- putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6);
- putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6);
- putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6);
- putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6);
- putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6);
- putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6);
+ putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6);
+ putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6);
+ putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6);
+ putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6);
+ putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6);
+ putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6);
+ putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6);
putRoutableFactory(REPLY_WRONGDISTRIBUTION, std::make_shared<RoutableFactories60::WrongDistributionReplyFactory>(), from6);
}
-DocumentProtocol::~DocumentProtocol() = default;
+void
+DocumentProtocol::add_v8_factories()
+{
+ vespalib::VersionSpecification version8(8, 304);
+ std::vector<vespalib::VersionSpecification> from8 = { version8 };
+
+ using RF8 = messagebus::RoutableFactories80;
+ auto put_v8_factory = [&](auto msg_id, auto factory) {
+ putRoutableFactory(msg_id, std::move(factory), from8);
+ };
+
+ put_v8_factory(MESSAGE_CREATEVISITOR, RF8::create_visitor_message_factory());
+ put_v8_factory(MESSAGE_DESTROYVISITOR, RF8::destroy_visitor_message_factory());
+ put_v8_factory(MESSAGE_DOCUMENTLIST, RF8::document_list_message_factory(_repo));
+ put_v8_factory(MESSAGE_EMPTYBUCKETS, RF8::empty_buckets_message_factory());
+ put_v8_factory(MESSAGE_GETBUCKETLIST, RF8::get_bucket_list_message_factory());
+ put_v8_factory(MESSAGE_GETBUCKETSTATE, RF8::get_bucket_state_message_factory());
+ put_v8_factory(MESSAGE_GETDOCUMENT, RF8::get_document_message_factory());
+ put_v8_factory(MESSAGE_MAPVISITOR, RF8::map_visitor_message_factory());
+ put_v8_factory(MESSAGE_PUTDOCUMENT, RF8::put_document_message_factory(_repo));
+ put_v8_factory(MESSAGE_QUERYRESULT, RF8::query_result_message_factory());
+ put_v8_factory(MESSAGE_REMOVEDOCUMENT, RF8::remove_document_message_factory());
+ put_v8_factory(MESSAGE_REMOVELOCATION, RF8::remove_location_message_factory(_repo));
+ put_v8_factory(MESSAGE_STATBUCKET, RF8::stat_bucket_message_factory());
+ put_v8_factory(MESSAGE_UPDATEDOCUMENT, RF8::update_document_message_factory(_repo));
+ put_v8_factory(MESSAGE_VISITORINFO, RF8::visitor_info_message_factory());
+ put_v8_factory(REPLY_CREATEVISITOR, RF8::create_visitor_reply_factory());
+ put_v8_factory(REPLY_DESTROYVISITOR, RF8::destroy_visitor_reply_factory());
+ put_v8_factory(REPLY_DOCUMENTIGNORED, RF8::document_ignored_reply_factory());
+ put_v8_factory(REPLY_DOCUMENTLIST, RF8::document_list_reply_factory());
+ put_v8_factory(REPLY_EMPTYBUCKETS, RF8::empty_buckets_reply_factory());
+ put_v8_factory(REPLY_GETBUCKETLIST, RF8::get_bucket_list_reply_factory());
+ put_v8_factory(REPLY_GETBUCKETSTATE, RF8::get_bucket_state_reply_factory());
+ put_v8_factory(REPLY_GETDOCUMENT, RF8::get_document_reply_factory(_repo));
+ put_v8_factory(REPLY_MAPVISITOR, RF8::map_visitor_reply_factory());
+ put_v8_factory(REPLY_PUTDOCUMENT, RF8::put_document_reply_factory());
+ put_v8_factory(REPLY_QUERYRESULT, RF8::query_result_reply_factory());
+ put_v8_factory(REPLY_REMOVEDOCUMENT, RF8::remove_document_reply_factory());
+ put_v8_factory(REPLY_REMOVELOCATION, RF8::remove_location_reply_factory());
+ put_v8_factory(REPLY_STATBUCKET, RF8::stat_bucket_reply_factory());
+ put_v8_factory(REPLY_UPDATEDOCUMENT, RF8::update_document_reply_factory());
+ put_v8_factory(REPLY_VISITORINFO, RF8::visitor_info_reply_factory());
+ put_v8_factory(REPLY_WRONGDISTRIBUTION, RF8::wrong_distribution_reply_factory());
+}
mbus::IRoutingPolicy::UP
DocumentProtocol::createPolicy(const mbus::string &name, const mbus::string &param) const
@@ -99,13 +151,9 @@ mbus::Blob
DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable &routable) const
{
mbus::Blob blob(_routableRepository->encode(version, routable));
- // When valgrind reports errors of uninitialized data being written to
- // the network, it is useful to be able to see the serialized data to
- // try to identify what bits are uninitialized.
if (LOG_WOULD_LOG(spam)) {
std::ostringstream message;
- document::StringUtil::printAsHex(
- message, blob.data(), blob.size());
+ document::StringUtil::printAsHex(message, blob.data(), blob.size());
LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s",
routable.getProtocol().c_str(), routable.getType(),
version.toString().c_str(), message.str().c_str());
@@ -120,7 +168,7 @@ DocumentProtocol::decode(const vespalib::Version &version, mbus::BlobRef data) c
return _routableRepository->decode(version, data);
} catch (vespalib::Exception &e) {
LOG(warning, "%s", e.getMessage().c_str());
- return mbus::Routable::UP();
+ return {};
}
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
index c771e86031d..b9658750ada 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
@@ -294,6 +294,10 @@ public:
mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string &param) const override;
mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override;
mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override;
+
+private:
+ void add_legacy_v6_factories();
+ void add_v8_factories();
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
index bd2f245ad15..12c6a50fa24 100644
--- a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
+++ b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
@@ -22,18 +22,13 @@ class IRoutableFactory {
protected:
IRoutableFactory() = default;
public:
- /**
- * Convenience typedefs.
- */
using UP = std::unique_ptr<IRoutableFactory>;
using SP = std::shared_ptr<IRoutableFactory>;
IRoutableFactory(const IRoutableFactory &) = delete;
IRoutableFactory & operator = (const IRoutableFactory &) = delete;
- /**
- * Virtual destructor required for inheritance.
- */
- virtual ~IRoutableFactory() { }
+
+ virtual ~IRoutableFactory() = default;
/**
* This method encodes the content of the given routable into a byte buffer that can later be decoded
@@ -45,11 +40,11 @@ public:
* @param out The buffer to write into.
* @return True if the routable could be encoded.
*/
- virtual bool encode(const mbus::Routable &obj,
- vespalib::GrowableByteBuffer &out) const = 0;
+ [[nodiscard]] virtual bool encode(const mbus::Routable &obj,
+ vespalib::GrowableByteBuffer &out) const = 0;
/**
- * This method decodes the given byte bufer to a routable.
+ * This method decodes the given byte buffer to a routable.
*
* This method is NOT exception safe. Return null to signal failure.
*
@@ -57,7 +52,7 @@ public:
* @param loadTypes The set of configured load types.
* @return The decoded routable.
*/
- virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0;
+ [[nodiscard]] virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0;
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
index 3d713c95da8..c47c2421fcb 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
@@ -18,7 +18,7 @@ protected:
*
* @return A document reply that corresponds to this message.
*/
- virtual DocumentReply::UP doCreateReply() const = 0;
+ [[nodiscard]] virtual DocumentReply::UP doCreateReply() const = 0;
public:
/**
@@ -37,16 +37,16 @@ public:
*
* @return The created reply.
*/
- std::unique_ptr<mbus::Reply> createReply() const;
+ [[nodiscard]] std::unique_ptr<mbus::Reply> createReply() const;
/**
* Returns the priority of this message.
*
* @return The priority.
*/
- Priority::Value getPriority() const { return _priority; };
+ [[nodiscard]] Priority::Value getPriority() const { return _priority; };
- uint8_t priority() const override { return (uint8_t)_priority; }
+ [[nodiscard]] uint8_t priority() const override { return (uint8_t)_priority; }
/**
* Sets the priority tag for this message.
@@ -55,7 +55,7 @@ public:
*/
void setPriority(Priority::Value p) { _priority = p; };
- uint32_t getApproxSize() const override;
+ [[nodiscard]] uint32_t getApproxSize() const override;
void setApproxSize(uint32_t approxSize) {
_approxSize = approxSize;
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
index 755e523c065..2c5833beb20 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
@@ -14,13 +14,12 @@ EmptyBucketsMessage::EmptyBucketsMessage(const std::vector<document::BucketId> &
{
}
-EmptyBucketsMessage::~EmptyBucketsMessage() {
-}
+EmptyBucketsMessage::~EmptyBucketsMessage() = default;
void
-EmptyBucketsMessage::setBucketIds(const std::vector<document::BucketId> &bucketIds)
+EmptyBucketsMessage::setBucketIds(std::vector<document::BucketId> bucketIds)
{
- _bucketIds = bucketIds;
+ _bucketIds = std::move(bucketIds);
}
void
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
index 7cecd1e1a2b..7078efc778c 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
@@ -27,7 +27,7 @@ public:
std::vector<document::BucketId> &getBucketIds() { return _bucketIds; }
const std::vector<document::BucketId> &getBucketIds() const { return _bucketIds; }
- void setBucketIds(const std::vector<document::BucketId> &bucketIds);
+ void setBucketIds(std::vector<document::BucketId> bucketIds);
void resize(uint32_t size);
uint32_t getType() const override;
string toString() const override { return "emptybucketsmessage"; }
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
index dbab28e3172..1ebc867156e 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
@@ -30,8 +30,8 @@ public:
*
* @param document The document to put.
*/
- PutDocumentMessage(DocumentSP document);
- ~PutDocumentMessage();
+ explicit PutDocumentMessage(DocumentSP document);
+ ~PutDocumentMessage() override;
/**
* Returns the document to put.
@@ -39,7 +39,7 @@ public:
* @return The document.
*/
const DocumentSP & getDocumentSP() const { return _document; }
- DocumentSP stealDocument() { return std::move(_document); }
+ [[nodiscard]] DocumentSP stealDocument() { return std::move(_document); }
const document::Document & getDocument() const { return *_document; }
/**
@@ -68,7 +68,7 @@ public:
string toString() const override { return "putdocumentmessage"; }
void set_create_if_non_existent(bool value) noexcept { _create_if_non_existent = value; }
- bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; }
+ [[nodiscard]] bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; }
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
index 87c3456d62c..4608350f8a6 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
@@ -10,7 +10,7 @@ namespace document { class BucketIdFactory; }
namespace documentapi {
/**
- * Message (VDS only) to remove an entire location for users using user or group schemes for their documents.
+ * Message to remove an entire location for users using user or group schemes for their documents.
* A location in this context is either a user id or a group name.
*/
class RemoveLocationMessage : public DocumentMessage {
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
index cd0cb5e1d3a..da3219d999f 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
@@ -74,12 +74,16 @@ public:
const vdslib::Parameters& getParameters() const { return _params; }
vdslib::Parameters& getParameters() { return _params; }
void setParameters(const vdslib::Parameters& params) { _params = params; }
+ void setParameters(vdslib::Parameters&& params) noexcept { _params = std::move(params); }
uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; }
void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; }
const std::vector<document::BucketId>& getBuckets() const { return _buckets; }
std::vector<document::BucketId>& getBuckets() { return _buckets; }
+ void setBuckets(std::vector<document::BucketId> buckets) noexcept {
+ _buckets = std::move(buckets);
+ }
const document::BucketId getBucketId() const { return *_buckets.begin(); }
@@ -196,6 +200,9 @@ public:
std::vector<document::BucketId>& getFinishedBuckets() { return _finishedBuckets; }
const std::vector<document::BucketId>& getFinishedBuckets() const { return _finishedBuckets; }
+ void setFinishedBuckets(std::vector<document::BucketId> buckets) noexcept {
+ _finishedBuckets = std::move(buckets);
+ }
const string& getErrorMessage() const { return _errorMessage; }
void setErrorMessage(const string& errorMessage) { _errorMessage = errorMessage; };
@@ -224,6 +231,7 @@ public:
vdslib::Parameters& getData() { return _data; };
const vdslib::Parameters& getData() const { return _data; };
+ void setData(vdslib::Parameters&& data) noexcept { _data = std::move(data); }
uint32_t getApproxSize() const override;
uint32_t getType() const override;
@@ -246,9 +254,9 @@ public:
Entry(const Entry& other);
Entry(const document::DocumentTypeRepo &repo, document::ByteBuffer& buf);
- int64_t getTimestamp() { return _timestamp; }
- const DocumentSP & getDocument() { return _document; }
- bool isRemoveEntry() { return _removeEntry; }
+ int64_t getTimestamp() const noexcept { return _timestamp; }
+ const DocumentSP & getDocument() const noexcept { return _document; }
+ bool isRemoveEntry() const noexcept { return _removeEntry; }
void serialize(vespalib::GrowableByteBuffer& buf) const;
private:
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
new file mode 100644
index 00000000000..399801526ba
--- /dev/null
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
@@ -0,0 +1,883 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "routable_factories_8.h"
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/select/parser.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/util/serializableexceptions.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/documentapi/messagebus/docapi_common.pb.h>
+#include <vespa/documentapi/messagebus/docapi_feed.pb.h>
+#include <vespa/documentapi/messagebus/docapi_visiting.pb.h>
+#include <vespa/documentapi/messagebus/docapi_inspect.pb.h>
+#include <vespa/vespalib/objects/nbostream.h>
+
+namespace documentapi::messagebus {
+
+namespace {
+
+// Protobuf codec helpers for common types
+
+void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) {
+ dest.set_raw_id(src.getRawId());
+}
+
+document::BucketId get_bucket_id(const protobuf::BucketId& src) {
+ return document::BucketId(src.raw_id());
+}
+
+void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) {
+ auto doc_id = src.toString();
+ dest.set_id(doc_id.data(), doc_id.size());
+}
+
+document::DocumentId get_document_id(const protobuf::DocumentId& src) {
+ return document::DocumentId(src.id());
+}
+
+// TODO DocumentAPI should be extended to use actual document::FieldSet enums instead of always passing strings.
+void set_raw_field_set(protobuf::FieldSet& dest, vespalib::stringref src) {
+ dest.set_spec(src.data(), src.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_raw_field_set(const protobuf::FieldSet& src) noexcept {
+ return src.spec();
+}
+
+void set_raw_selection(protobuf::DocumentSelection& dest, vespalib::stringref src) {
+ dest.set_selection(src.data(), src.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_raw_selection(const protobuf::DocumentSelection& src) noexcept {
+ return src.selection();
+}
+
+void set_bucket_space(protobuf::BucketSpace& dest, vespalib::stringref space_name) {
+ dest.set_name(space_name.data(), space_name.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_bucket_space(const protobuf::BucketSpace& src) noexcept {
+ return src.name();
+}
+
+void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
+ char tmp[document::GlobalId::LENGTH];
+ memcpy(tmp, src.get(), document::GlobalId::LENGTH);
+ dest.set_raw_gid(tmp, document::GlobalId::LENGTH);
+}
+
+document::GlobalId get_global_id(const protobuf::GlobalId& src) {
+ if (src.raw_gid().size() != document::GlobalId::LENGTH) {
+ throw document::DeserializeException("Unexpected serialized protobuf GlobalId size");
+ }
+ return document::GlobalId(src.raw_gid().data()); // By copy
+}
+
+documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) {
+ return documentapi::TestAndSetCondition(src.selection());
+}
+
+void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) {
+ dest.set_selection(src.getSelection().data(), src.getSelection().size());
+}
+
+std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src_doc.payload().empty()) {
+ vespalib::nbostream doc_buf(src_doc.payload().data(), src_doc.payload().size());
+ return std::make_shared<document::Document>(type_repo, doc_buf);
+ }
+ return {};
+}
+
+std::shared_ptr<document::Document> get_document_or_throw(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ auto doc = get_document(src_doc, type_repo);
+ if (!doc) [[unlikely]] {
+ throw document::DeserializeException("Message does not contain a required document object", VESPA_STRLOC);
+ }
+ return doc;
+}
+
+void set_document(protobuf::Document& target_doc, const document::Document& src_doc) {
+ vespalib::nbostream stream;
+ src_doc.serialize(stream);
+ target_doc.set_payload(stream.peek(), stream.size());
+}
+
+void set_update(protobuf::DocumentUpdate& dest, const document::DocumentUpdate& src) {
+ vespalib::nbostream stream;
+ src.serializeHEAD(stream);
+ dest.set_payload(stream.peek(), stream.size());
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::DocumentUpdate& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src.payload().empty()) {
+ return document::DocumentUpdate::createHEAD(
+ type_repo, vespalib::nbostream(src.payload().data(), src.payload().size()));
+ }
+ return {};
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update_or_throw(const protobuf::DocumentUpdate& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ auto upd = get_update(src, type_repo);
+ if (!upd) [[unlikely]] {
+ throw document::DeserializeException("Message does not contain a required document update object", VESPA_STRLOC);
+ }
+ return upd;
+}
+
+template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn>
+requires std::is_invocable_r_v<void, EncodeFn, const DocApiType&, ProtobufType&> &&
+ std::is_invocable_r_v<std::unique_ptr<DocApiType>, DecodeFn, const ProtobufType&>
+class ProtobufRoutableFactory final : public IRoutableFactory {
+ EncodeFn _encode_fn;
+ DecodeFn _decode_fn;
+public:
+ template <typename EncFn, typename DecFn>
+ ProtobufRoutableFactory(EncFn&& enc_fn, DecFn&& dec_fn) noexcept
+ : _encode_fn(std::forward<EncFn>(enc_fn)),
+ _decode_fn(std::forward<DecFn>(dec_fn))
+ {}
+ ~ProtobufRoutableFactory() override = default;
+
+ bool encode(const mbus::Routable& obj, vespalib::GrowableByteBuffer& out) const override {
+ ::google::protobuf::Arena arena;
+ auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena);
+
+ _encode_fn(dynamic_cast<const DocApiType&>(obj), *proto_obj);
+
+ const auto sz = proto_obj->ByteSizeLong();
+ assert(sz <= INT32_MAX);
+ auto* buf = reinterpret_cast<uint8_t*>(out.allocate(sz));
+ return proto_obj->SerializeWithCachedSizesToArray(buf);
+ }
+
+ mbus::Routable::UP decode(document::ByteBuffer& in) const override {
+ ::google::protobuf::Arena arena;
+ auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena);
+ const auto buf_size = in.getRemaining();
+ assert(buf_size <= INT_MAX);
+ bool ok = proto_obj->ParseFromArray(in.getBufferAtPos(), buf_size);
+ if (!ok) {
+ return {}; // Malformed protobuf payload
+ }
+ auto msg = _decode_fn(*proto_obj);
+ if constexpr (std::is_base_of_v<DocumentMessage, DocApiType>) {
+ msg->setApproxSize(buf_size); // Wire size is a proxy for in-memory size
+ }
+ return msg;
+ }
+};
+
+template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn>
+auto make_codec(EncodeFn&& enc_fn, DecodeFn&& dec_fn) {
+ return std::make_shared<ProtobufRoutableFactory<DocApiType, ProtobufType, EncodeFn, DecodeFn>>(
+ std::forward<EncodeFn>(enc_fn), std::forward<DecodeFn>(dec_fn));
+}
+
+} // anon ns
+
+// ---------------------------------------------
+// Get request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_message_factory() {
+ return make_codec<GetDocumentMessage, protobuf::GetDocumentRequest>(
+ [](const GetDocumentMessage& src, protobuf::GetDocumentRequest& dest) {
+ set_document_id(*dest.mutable_document_id(), src.getDocumentId());
+ set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet());
+ },
+ [](const protobuf::GetDocumentRequest& src) {
+ return std::make_unique<GetDocumentMessage>(get_document_id(src.document_id()), get_raw_field_set(src.field_set()));
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<GetDocumentReply, protobuf::GetDocumentResponse>(
+ [](const GetDocumentReply& src, protobuf::GetDocumentResponse& dest) {
+ if (src.hasDocument()) {
+ set_document(*dest.mutable_document(), src.getDocument());
+ }
+ dest.set_last_modified(src.getLastModified());
+ },
+ [type_repo = std::move(repo)](const protobuf::GetDocumentResponse& src) {
+ auto msg = std::make_unique<GetDocumentReply>();
+ if (src.has_document()) {
+ auto doc = get_document(src.document(), *type_repo);
+ doc->setLastModified(static_cast<int64_t>(src.last_modified()));
+ msg->setDocument(std::move(doc));
+ }
+ msg->setLastModified(src.last_modified());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Put request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<PutDocumentMessage, protobuf::PutDocumentRequest>(
+ [](const PutDocumentMessage& src, protobuf::PutDocumentRequest& dest) {
+ dest.set_force_assign_timestamp(src.getTimestamp());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ if (src.getDocumentSP()) { // This should always be present in practice
+ set_document(*dest.mutable_document(), src.getDocument());
+ }
+ dest.set_create_if_missing(src.get_create_if_non_existent());
+ },
+ [type_repo = std::move(repo)](const protobuf::PutDocumentRequest& src) {
+ auto msg = std::make_unique<PutDocumentMessage>();
+ msg->setDocument(get_document_or_throw(src.document(), *type_repo));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ msg->setTimestamp(src.force_assign_timestamp());
+ msg->set_create_if_non_existent(src.create_if_missing());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_reply_factory() {
+ return make_codec<WriteDocumentReply, protobuf::PutDocumentResponse>(
+ [](const WriteDocumentReply& src, protobuf::PutDocumentResponse& dest) {
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::PutDocumentResponse& src) {
+ auto msg = std::make_unique<WriteDocumentReply>(DocumentProtocol::REPLY_PUTDOCUMENT);
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Update request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<UpdateDocumentMessage, protobuf::UpdateDocumentRequest>(
+ [](const UpdateDocumentMessage& src, protobuf::UpdateDocumentRequest& dest) {
+ set_update(*dest.mutable_update(), src.getDocumentUpdate());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ dest.set_expected_old_timestamp(src.getOldTimestamp());
+ dest.set_force_assign_timestamp(src.getNewTimestamp());
+ },
+ [type_repo = std::move(repo)](const protobuf::UpdateDocumentRequest& src) {
+ auto msg = std::make_unique<UpdateDocumentMessage>();
+ msg->setDocumentUpdate(get_update_or_throw(src.update(), *type_repo));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ msg->setOldTimestamp(src.expected_old_timestamp());
+ msg->setNewTimestamp(src.force_assign_timestamp());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_reply_factory() {
+ return make_codec<UpdateDocumentReply, protobuf::UpdateDocumentResponse>(
+ [](const UpdateDocumentReply& src, protobuf::UpdateDocumentResponse& dest) {
+ dest.set_was_found(src.wasFound());
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::UpdateDocumentResponse& src) {
+ auto msg = std::make_unique<UpdateDocumentReply>();
+ msg->setWasFound(src.was_found());
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Remove request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_message_factory() {
+ return make_codec<RemoveDocumentMessage, protobuf::RemoveDocumentRequest>(
+ [](const RemoveDocumentMessage& src, protobuf::RemoveDocumentRequest& dest) {
+ set_document_id(*dest.mutable_document_id(), src.getDocumentId());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ },
+ [](const protobuf::RemoveDocumentRequest& src) {
+ auto msg = std::make_unique<RemoveDocumentMessage>();
+ msg->setDocumentId(get_document_id(src.document_id()));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_reply_factory() {
+ return make_codec<RemoveDocumentReply, protobuf::RemoveDocumentResponse>(
+ [](const RemoveDocumentReply& src, protobuf::RemoveDocumentResponse& dest) {
+ dest.set_was_found(src.wasFound());
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::RemoveDocumentResponse& src) {
+ auto msg = std::make_unique<RemoveDocumentReply>();
+ msg->setWasFound(src.was_found());
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// RemoveLocation request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory>
+RoutableFactories80::remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<RemoveLocationMessage, protobuf::RemoveLocationRequest>(
+ [](const RemoveLocationMessage& src, protobuf::RemoveLocationRequest& dest) {
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [type_repo = std::move(repo)](const protobuf::RemoveLocationRequest& src) {
+ document::BucketIdFactory factory;
+ document::select::Parser parser(*type_repo, factory);
+ auto msg = std::make_unique<RemoveLocationMessage>(factory, parser, get_raw_selection(src.selection()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_location_reply_factory() {
+ return make_codec<DocumentReply, protobuf::RemoveLocationResponse>(
+ []([[maybe_unused]] const DocumentReply& src, [[maybe_unused]] protobuf::RemoveLocationResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::RemoveLocationResponse& src) {
+ // The lack of 1-1 type mapping is pretty awkward :I
+ return std::make_unique<DocumentReply>(DocumentProtocol::REPLY_REMOVELOCATION);
+ }
+ );
+}
+
+// ---------------------------------------------
+// CreateVisitor request and response
+// ---------------------------------------------
+
+namespace {
+
+void set_bucket_id_vector(::google::protobuf::RepeatedPtrField<protobuf::BucketId>& dest,
+ const std::vector<document::BucketId>& src)
+{
+ assert(src.size() <= INT_MAX);
+ dest.Reserve(static_cast<int>(src.size()));
+ for (const auto& bucket_id : src) {
+ set_bucket_id(*dest.Add(), bucket_id);
+ }
+}
+
+std::vector<document::BucketId> get_bucket_id_vector(const ::google::protobuf::RepeatedPtrField<protobuf::BucketId>& src) {
+ std::vector<document::BucketId> ids;
+ ids.reserve(src.size());
+ for (const auto& proto_bucket : src) {
+ ids.emplace_back(proto_bucket.raw_id());
+ }
+ return ids;
+}
+
+void set_visitor_params(::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& dest,
+ const vdslib::Parameters& src)
+{
+ assert(src.size() <= INT_MAX);
+ dest.Reserve(static_cast<int>(src.size()));
+ for (const auto& kv : src) {
+ auto* proto_kv = dest.Add();
+ proto_kv->set_key(kv.first.data(), kv.first.size());
+ proto_kv->set_value(kv.second.data(), kv.second.size());
+ }
+}
+
+vdslib::Parameters get_visitor_params(const ::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& src) {
+ vdslib::Parameters params;
+ for (const auto& proto_kv : src) {
+ params.set(proto_kv.key(), proto_kv.value());
+ }
+ return params;
+}
+
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_message_factory() {
+ return make_codec<CreateVisitorMessage, protobuf::CreateVisitorRequest>(
+ [](const CreateVisitorMessage& src, protobuf::CreateVisitorRequest& dest) {
+ dest.set_visitor_library_name(src.getLibraryName().data(), src.getLibraryName().size());
+ dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size());
+ dest.set_control_destination(src.getControlDestination().data(), src.getControlDestination().size());
+ dest.set_data_destination(src.getDataDestination().data(), src.getDataDestination().size());
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ dest.set_max_pending_reply_count(src.getMaximumPendingReplyCount());
+
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ set_bucket_id_vector(*dest.mutable_buckets(), src.getBuckets());
+
+ dest.set_from_timestamp(src.getFromTimestamp());
+ dest.set_to_timestamp(src.getToTimestamp());
+ dest.set_visit_tombstones(src.visitRemoves());
+ set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet());
+ dest.set_visit_inconsistent_buckets(src.visitInconsistentBuckets());
+ dest.set_max_buckets_per_visitor(src.getMaxBucketsPerVisitor());
+
+ set_visitor_params(*dest.mutable_parameters(), src.getParameters());
+ },
+ [](const protobuf::CreateVisitorRequest& src) {
+ auto msg = std::make_unique<CreateVisitorMessage>();
+ msg->setLibraryName(src.visitor_library_name());
+ msg->setInstanceId(src.instance_id());
+ msg->setControlDestination(src.control_destination());
+ msg->setDataDestination(src.data_destination());
+ msg->setDocumentSelection(get_raw_selection(src.selection()));
+ msg->setMaximumPendingReplyCount(src.max_pending_reply_count());
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ msg->setBuckets(get_bucket_id_vector(src.buckets()));
+ msg->setFromTimestamp(src.from_timestamp());
+ msg->setToTimestamp(src.to_timestamp());
+ msg->setVisitRemoves(src.visit_tombstones());
+ msg->setFieldSet(get_raw_field_set(src.field_set()));
+ msg->setVisitInconsistentBuckets(src.visit_inconsistent_buckets());
+ msg->setMaxBucketsPerVisitor(src.max_buckets_per_visitor());
+ msg->setVisitorDispatcherVersion(50); // Hard-coded; same as for v6 serialization
+ msg->setParameters(get_visitor_params(src.parameters()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_reply_factory() {
+ return make_codec<CreateVisitorReply, protobuf::CreateVisitorResponse>(
+ [](const CreateVisitorReply& src, protobuf::CreateVisitorResponse& dest) {
+ set_bucket_id(*dest.mutable_last_bucket(), src.getLastBucket());
+ const auto& vs = src.getVisitorStatistics();
+ auto* stats = dest.mutable_statistics();
+ stats->set_buckets_visited(vs.getBucketsVisited());
+ stats->set_documents_visited(vs.getDocumentsVisited());
+ stats->set_bytes_visited(vs.getBytesVisited());
+ stats->set_documents_returned(vs.getDocumentsReturned());
+ stats->set_bytes_returned(vs.getBytesReturned());
+ },
+ [](const protobuf::CreateVisitorResponse& src) {
+ auto reply = std::make_unique<CreateVisitorReply>(DocumentProtocol::REPLY_CREATEVISITOR);
+ reply->setLastBucket(get_bucket_id(src.last_bucket()));
+ const auto& vs = src.statistics();
+ vdslib::VisitorStatistics stats;
+ stats.setBucketsVisited(vs.buckets_visited());
+ stats.setDocumentsVisited(vs.documents_visited());
+ stats.setBytesVisited(vs.bytes_visited());
+ stats.setDocumentsReturned(vs.documents_returned());
+ stats.setBytesReturned(vs.bytes_returned());
+ reply->setVisitorStatistics(stats);
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// DestroyVisitor request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_message_factory() {
+ return make_codec<DestroyVisitorMessage, protobuf::DestroyVisitorRequest>(
+ [](const DestroyVisitorMessage& src, protobuf::DestroyVisitorRequest& dest) {
+ dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size());
+ },
+ [](const protobuf::DestroyVisitorRequest& src) {
+ auto msg = std::make_unique<DestroyVisitorMessage>();
+ msg->setInstanceId(src.instance_id());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_reply_factory() {
+ return make_codec<VisitorReply, protobuf::DestroyVisitorResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DestroyVisitorResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DestroyVisitorResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DESTROYVISITOR);
+ }
+ );
+}
+
+// ---------------------------------------------
+// MapVisitor request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_message_factory() {
+ return make_codec<MapVisitorMessage, protobuf::MapVisitorRequest>(
+ [](const MapVisitorMessage& src, protobuf::MapVisitorRequest& dest) {
+ set_visitor_params(*dest.mutable_data(), src.getData());
+ },
+ [](const protobuf::MapVisitorRequest& src) {
+ auto msg = std::make_unique<MapVisitorMessage>();
+ msg->setData(get_visitor_params(src.data()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_reply_factory() {
+ return make_codec<VisitorReply, protobuf::MapVisitorResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::MapVisitorResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::MapVisitorResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_MAPVISITOR);
+ }
+ );
+}
+
+// ---------------------------------------------
+// QueryResult request and response
+// ---------------------------------------------
+
+namespace {
+
+void set_search_result(protobuf::SearchResult& dest, const vdslib::SearchResult& src) {
+ // We treat these as opaque blobs for now. Should ideally be protobuf as well.
+ vespalib::GrowableByteBuffer buf;
+ src.serialize(buf);
+ assert(buf.position() <= INT_MAX);
+ dest.set_payload(buf.getBuffer(), buf.position());
+}
+
+void set_document_summary(protobuf::DocumentSummary& dest, const vdslib::DocumentSummary& src) {
+ // We treat these as opaque blobs for now. Should ideally be protobuf as well.
+ vespalib::GrowableByteBuffer buf;
+ src.serialize(buf);
+ assert(buf.position() <= INT_MAX);
+ dest.set_payload(buf.getBuffer(), buf.position());
+}
+
+document::ByteBuffer wrap_as_buffer(std::string_view buf) {
+ assert(buf.size() <= UINT32_MAX);
+ return {buf.data(), static_cast<uint32_t>(buf.size())};
+}
+
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_message_factory() {
+ return make_codec<QueryResultMessage, protobuf::QueryResultRequest>(
+ [](const QueryResultMessage& src, protobuf::QueryResultRequest& dest) {
+ set_search_result(*dest.mutable_search_result(), src.getSearchResult());
+ set_document_summary(*dest.mutable_document_summary(), src.getDocumentSummary());
+ },
+ [](const protobuf::QueryResultRequest& src) {
+ auto msg = std::make_unique<QueryResultMessage>();
+ // Explicitly enforce presence of result/summary fields, as our object is not necessarily
+ // well-defined if these have not been initialized.
+ if (!src.has_search_result() || !src.has_document_summary()) {
+ throw document::DeserializeException("Query result does not have all required fields set", VESPA_STRLOC);
+ }
+ {
+ auto buf_view = wrap_as_buffer(src.search_result().payload()); // Must be lvalue
+ msg->getSearchResult().deserialize(buf_view);
+ }
+ {
+ auto buf_view = wrap_as_buffer(src.document_summary().payload()); // Also lvalue
+ msg->getDocumentSummary().deserialize(buf_view);
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_reply_factory() {
+ return make_codec<VisitorReply, protobuf::QueryResultResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::QueryResultResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::QueryResultResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_QUERYRESULT);
+ }
+ );
+}
+
+// ---------------------------------------------
+// VisitorInfo request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_message_factory() {
+ return make_codec<VisitorInfoMessage, protobuf::VisitorInfoRequest>(
+ [](const VisitorInfoMessage& src, protobuf::VisitorInfoRequest& dest) {
+ set_bucket_id_vector(*dest.mutable_finished_buckets(), src.getFinishedBuckets());
+ dest.set_error_message(src.getErrorMessage());
+ },
+ [](const protobuf::VisitorInfoRequest& src) {
+ auto msg = std::make_unique<VisitorInfoMessage>();
+ msg->setFinishedBuckets(get_bucket_id_vector(src.finished_buckets()));
+ msg->setErrorMessage(src.error_message());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_reply_factory() {
+ return make_codec<VisitorReply, protobuf::VisitorInfoResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::VisitorInfoResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::VisitorInfoResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_VISITORINFO);
+ }
+ );
+}
+
+// ---------------------------------------------
+// DocumentList request and response
+// TODO deprecate
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory>
+RoutableFactories80::document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<DocumentListMessage, protobuf::DocumentListRequest>(
+ [](const DocumentListMessage& src, protobuf::DocumentListRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ for (const auto& doc : src.getDocuments()) {
+ auto* proto_entry = dest.add_entries();
+ proto_entry->set_timestamp(doc.getTimestamp());
+ proto_entry->set_is_tombstone(doc.isRemoveEntry());
+ set_document(*proto_entry->mutable_document(), *doc.getDocument());
+ }
+ },
+ [type_repo = std::move(repo)](const protobuf::DocumentListRequest& src) {
+ auto msg = std::make_unique<DocumentListMessage>();
+ msg->setBucketId(get_bucket_id(src.bucket_id()));
+ for (const auto& proto_entry : src.entries()) {
+ auto doc = get_document_or_throw(proto_entry.document(), *type_repo);
+ msg->getDocuments().emplace_back(proto_entry.timestamp(), std::move(doc), proto_entry.is_tombstone());
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::document_list_reply_factory() {
+ return make_codec<VisitorReply, protobuf::DocumentListResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DocumentListResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DocumentListResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DOCUMENTLIST);
+ }
+ );
+}
+
+// ---------------------------------------------
+// EmptyBuckets request and response
+// TODO this should be deprecated
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_message_factory() {
+ return make_codec<EmptyBucketsMessage, protobuf::EmptyBucketsRequest>(
+ [](const EmptyBucketsMessage& src, protobuf::EmptyBucketsRequest& dest) {
+ set_bucket_id_vector(*dest.mutable_bucket_ids(), src.getBucketIds());
+ },
+ [](const protobuf::EmptyBucketsRequest& src) {
+ auto msg = std::make_unique<EmptyBucketsMessage>();
+ msg->setBucketIds(get_bucket_id_vector(src.bucket_ids()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_reply_factory() {
+ return make_codec<VisitorReply, protobuf::EmptyBucketsResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::EmptyBucketsResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::EmptyBucketsResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_EMPTYBUCKETS); // ugh
+ }
+ );
+}
+
+// ---------------------------------------------
+// GetBucketList request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_message_factory() {
+ return make_codec<GetBucketListMessage, protobuf::GetBucketListRequest>(
+ [](const GetBucketListMessage& src, protobuf::GetBucketListRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [](const protobuf::GetBucketListRequest& src) {
+ auto msg = std::make_unique<GetBucketListMessage>(get_bucket_id(src.bucket_id()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_reply_factory() {
+ return make_codec<GetBucketListReply, protobuf::GetBucketListResponse>(
+ [](const GetBucketListReply& src, protobuf::GetBucketListResponse& dest) {
+ auto* proto_info = dest.mutable_bucket_info();
+ assert(src.getBuckets().size() <= INT_MAX);
+ proto_info->Reserve(static_cast<int>(src.getBuckets().size()));
+ for (const auto& info : src.getBuckets()) {
+ auto* entry = proto_info->Add();
+ set_bucket_id(*entry->mutable_bucket_id(), info._bucket);
+ entry->set_info(info._bucketInformation.data(), info._bucketInformation.size());
+ }
+ },
+ [](const protobuf::GetBucketListResponse& src) {
+ auto reply = std::make_unique<GetBucketListReply>();
+ reply->getBuckets().reserve(src.bucket_info_size());
+ for (const auto& proto_info : src.bucket_info()) {
+ GetBucketListReply::BucketInfo info;
+ info._bucket = get_bucket_id(proto_info.bucket_id());
+ info._bucketInformation = proto_info.info();
+ reply->getBuckets().emplace_back(std::move(info));
+ }
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// GetBucketState request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_message_factory() {
+ return make_codec<GetBucketStateMessage, protobuf::GetBucketStateRequest>(
+ [](const GetBucketStateMessage& src, protobuf::GetBucketStateRequest& dest) {
+ // FIXME misses bucket space, but does not seem to be in use?
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ },
+ [](const protobuf::GetBucketStateRequest& src) {
+ return std::make_unique<GetBucketStateMessage>(get_bucket_id(src.bucket_id()));
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_reply_factory() {
+ return make_codec<GetBucketStateReply, protobuf::GetBucketStateResponse>(
+ [](const GetBucketStateReply& src, protobuf::GetBucketStateResponse& dest) {
+ assert(src.getBucketState().size() <= INT_MAX);
+ auto* proto_states = dest.mutable_states();
+ proto_states->Reserve(static_cast<int>(src.getBucketState().size()));
+ for (const auto& state : src.getBucketState()) {
+ auto* ps = proto_states->Add();
+ if (state.getDocumentId()) {
+ set_document_id(*ps->mutable_document_id(), *state.getDocumentId());
+ } else {
+ set_global_id(*ps->mutable_global_id(), state.getGlobalId());
+ }
+ ps->set_timestamp(state.getTimestamp());
+ ps->set_is_tombstone(state.isRemoveEntry());
+ }
+ },
+ [](const protobuf::GetBucketStateResponse& src) {
+ auto reply = std::make_unique<GetBucketStateReply>();
+ reply->getBucketState().reserve(src.states_size());
+ for (const auto& proto_state : src.states()) {
+ if (proto_state.has_document_id()) {
+ reply->getBucketState().emplace_back(get_document_id(proto_state.document_id()),
+ proto_state.timestamp(), proto_state.is_tombstone());
+ } else {
+ reply->getBucketState().emplace_back(get_global_id(proto_state.global_id()),
+ proto_state.timestamp(), proto_state.is_tombstone());
+ }
+ }
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// StatBucket request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_message_factory() {
+ return make_codec<StatBucketMessage, protobuf::StatBucketRequest>(
+ [](const StatBucketMessage& src, protobuf::StatBucketRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [](const protobuf::StatBucketRequest& src) {
+ auto msg = std::make_unique<StatBucketMessage>();
+ msg->setBucketId(get_bucket_id(src.bucket_id()));
+ msg->setDocumentSelection(get_raw_selection(src.selection()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_reply_factory() {
+ return make_codec<StatBucketReply, protobuf::StatBucketResponse>(
+ [](const StatBucketReply& src, protobuf::StatBucketResponse& dest) {
+ dest.set_results(src.getResults());
+ },
+ [](const protobuf::StatBucketResponse& src) {
+ auto reply = std::make_unique<StatBucketReply>();
+ reply->setResults(src.results());
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// WrongDistribution response (no request type)
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::wrong_distribution_reply_factory() {
+ return make_codec<WrongDistributionReply, protobuf::WrongDistributionResponse>(
+ [](const WrongDistributionReply& src, protobuf::WrongDistributionResponse& dest) {
+ dest.mutable_cluster_state()->set_state_string(src.getSystemState());
+ },
+ [](const protobuf::WrongDistributionResponse& src) {
+ auto reply = std::make_unique<WrongDistributionReply>();
+ reply->setSystemState(src.cluster_state().state_string());
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// DocumentIgnored response (no request type)
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::document_ignored_reply_factory() {
+ return make_codec<DocumentIgnoredReply, protobuf::DocumentIgnoredResponse>(
+ []([[maybe_unused]] const DocumentIgnoredReply& src, [[maybe_unused]] protobuf::DocumentIgnoredResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DocumentIgnoredResponse& src) {
+ return std::make_unique<DocumentIgnoredReply>();
+ }
+ );
+}
+
+} // documentapi::messagebus
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
new file mode 100644
index 00000000000..76da2f7dc9f
--- /dev/null
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
@@ -0,0 +1,72 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "iroutablefactory.h"
+
+namespace document { class DocumentTypeRepo; }
+
+namespace documentapi::messagebus {
+
+class RoutableFactories80 {
+public:
+ RoutableFactories80() = delete;
+
+ // CRUD messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_reply_factory();
+
+ // Visitor-related messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_reply_factory();
+
+ // Inspection-related messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_reply_factory();
+
+ // Polymorphic reply messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> wrong_distribution_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_ignored_reply_factory();
+};
+
+}
diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
index 54774f142ba..3e1ae07f7ca 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
@@ -14,11 +14,13 @@ RoutableRepository::VersionMap::VersionMap() :
_factoryVersions()
{ }
+RoutableRepository::VersionMap::~VersionMap() = default;
+
bool
RoutableRepository::VersionMap::putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory)
{
bool ret = _factoryVersions.find(version) != _factoryVersions.end();
- _factoryVersions[version] = factory;
+ _factoryVersions[version] = std::move(factory);
return ret;
}
@@ -28,14 +30,14 @@ RoutableRepository::VersionMap::getFactory(const vespalib::Version &version) con
{
const vespalib::VersionSpecification versionSpec{version.getMajor(), version.getMinor(), version.getMicro()};
- std::vector< std::pair<vespalib::VersionSpecification, IRoutableFactory::SP> > candidates;
- for (auto & entry : _factoryVersions) {
+ std::vector<std::pair<vespalib::VersionSpecification, IRoutableFactory::SP>> candidates;
+ for (const auto & entry : _factoryVersions) {
if (entry.first.compareTo(versionSpec) <= 0) {
- candidates.push_back(std::make_pair(entry.first, entry.second));
+ candidates.emplace_back(entry.first, entry.second);
}
}
if (candidates.empty()) {
- return IRoutableFactory::SP();
+ return {};
}
return std::max_element(candidates.begin(), candidates.end(),
@@ -54,7 +56,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
{
if (data.size() == 0) {
LOG(error, "Received empty byte array for deserialization.");
- return mbus::Routable::UP();
+ return {};
}
document::ByteBuffer in(data.data(), data.size());
@@ -64,7 +66,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
if (!factory) {
LOG(error, "No routable factory found for routable type %d (version %s).",
type, version.toString().c_str());
- return mbus::Routable::UP();
+ return {};
}
mbus::Routable::UP ret = factory->decode(in);
if (!ret) {
@@ -74,7 +76,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
std::ostringstream ost;
document::StringUtil::printAsHex(ost, data.data(), data.size());
LOG(error, "%s", ost.str().c_str());
- return mbus::Routable::UP();
+ return {};
}
return ret;
}
@@ -107,7 +109,7 @@ RoutableRepository::putFactory(const vespalib::VersionSpecification &version,
uint32_t type, IRoutableFactory::SP factory)
{
std::lock_guard guard(_lock);
- if (_factoryTypes[type].putFactory(version, factory)) {
+ if (_factoryTypes[type].putFactory(version, std::move(factory))) {
_cache.clear();
}
}
@@ -117,17 +119,17 @@ RoutableRepository::getFactory(const vespalib::Version &version, uint32_t type)
{
std::lock_guard guard(_lock);
CacheKey cacheKey(version, type);
- FactoryCache::const_iterator cit = _cache.find(cacheKey);
+ auto cit = _cache.find(cacheKey);
if (cit != _cache.end()) {
return cit->second;
}
- TypeMap::const_iterator vit = _factoryTypes.find(type);
+ auto vit = _factoryTypes.find(type);
if (vit == _factoryTypes.end()) {
- return IRoutableFactory::SP();
+ return {};
}
- IRoutableFactory::SP factory = vit->second.getFactory(version);
+ auto factory = vit->second.getFactory(version);
if (!factory) {
- return IRoutableFactory::SP();
+ return {};
}
_cache[cacheKey] = factory;
return factory;
diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
index 5060d1b3817..62b5103d8f5 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
@@ -27,8 +27,9 @@ private:
public:
VersionMap();
- bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory);
- IRoutableFactory::SP getFactory(const vespalib::Version &version) const;
+ ~VersionMap();
+ [[nodiscard]] bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory);
+ [[nodiscard]] IRoutableFactory::SP getFactory(const vespalib::Version &version) const;
};
using CacheKey = std::pair<vespalib::Version, uint32_t>;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 0373609e806..3ab62542ace 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -2,7 +2,6 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.jrt.Acceptor;
import com.yahoo.jrt.ListenFailedException;
@@ -301,15 +300,34 @@ public class RPCNetwork implements Network, MethodHandler {
return false;
}
+ private static Version deriveSupportedProtocolVersion() {
+ // This is a very leaky abstraction, but since MessageBus only exchanges versions
+ // (and not a set of supported protocols), we have to do this workaround.
+ // Disallow-version MUST be lower than that used as a protocol lower bound in
+ // DocumentProtocol.java and the exact same as that used in C++ for the same purposes.
+ // ... Or else!
+ // TODO remove this glorious hack once protobuf protocol is enabled by default
+ var maybeEnvVal = System.getenv("VESPA_MBUS_DOCUMENTAPI_USE_PROTOBUF");
+ if ("true".equals(maybeEnvVal) || "yes".equals(maybeEnvVal)) {
+ return new Version(8, 304); // _Allows_ new protobuf protocol
+ }
+ return new Version(8, 303); // _Disallows_ new protobuf protocol
+ }
+
+ private static final Version REPORTED_VERSION = deriveSupportedProtocolVersion();
+
/**
- * Returns the version of this network. This gets called when the "mbus.getVersion" method is invoked on this
- * network, and is separated into its own function so that unit tests can override it to simulate other versions
- * than current.
+ * Returns the (protocol) version of this network. This gets called when the "mbus.getVersion" method is invoked
+ * on this network, and is separated into its own function so that unit tests can override it to simulate other
+ * versions than current.
+ *
+ * Note that this version reflects the highest supported <em>protocol</em> version, and is not necessarily
+ * 1-1 with the actual Vespa release version of the underlying binary.
*
* @return the version to claim to be
*/
protected Version getVersion() {
- return Vtag.currentVersion;
+ return REPORTED_VERSION;
}
/**
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index cacd18430a7..f626e2c325b 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -18,6 +18,8 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <cstdlib>
+#include <string_view>
#include <thread>
#include <vespa/log/log.h>
@@ -25,6 +27,7 @@ LOG_SETUP(".rpcnetwork");
using vespalib::make_string;
using namespace std::chrono_literals;
+using namespace std::string_view_literals;
namespace mbus {
@@ -148,10 +151,26 @@ RPCNetwork::flushTargetPool()
_targetPool->flushTargets(true);
}
+namespace {
+
+[[nodiscard]] vespalib::Version derive_supported_protocol_version() {
+ // TODO remove this hilariously leaky abstraction once protobuf protocol is the default :D
+ // Disallow-version MUST be lower than that used as a protocol lower bound in documentprotocol.cpp
+ // and the exact same as that used in Java for the same purposes. Or else!
+ const char* maybe_env_val = getenv("VESPA_MBUS_DOCUMENTAPI_USE_PROTOBUF");
+ if (maybe_env_val && (("true"sv == maybe_env_val) || ("yes"sv == maybe_env_val))) {
+ return {8, 304}; // _Allows_ new protobuf protocol
+ }
+ return {8, 303}; // _Disallows_ new protobuf protocol
+}
+
+}
+
const vespalib::Version &
RPCNetwork::getVersion() const
{
- return vespalib::Vtag::currentVersion;
+ static vespalib::Version reported_version = derive_supported_protocol_version();
+ return reported_version;
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 05ccaecb2c5..40590d4545f 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -98,11 +98,15 @@ private:
protected:
/**
- * Returns the version of this network. This gets called when the
+ * Returns the (protocol) version of this network. This gets called when the
* "mbus.getVersion" method is invoked on this network, and is separated
* into its own function so that unit tests can override it to simulate
* other versions than current.
*
+ * Note that this version reflects the highest supported protocol version, and
+ * is not necessarily 1-1 with the actual Vespa release version of the
+ * underlying binary.
+ *
* @return The version to claim to be.
*/
virtual const vespalib::Version &getVersion() const;
diff --git a/vdslib/src/vespa/vdslib/container/parameters.cpp b/vdslib/src/vespa/vdslib/container/parameters.cpp
index 236b4970396..b5fbe96566d 100644
--- a/vdslib/src/vespa/vdslib/container/parameters.cpp
+++ b/vdslib/src/vespa/vdslib/container/parameters.cpp
@@ -19,6 +19,12 @@ Parameters::Parameters(document::ByteBuffer& buffer)
deserialize(buffer);
}
+Parameters::Parameters(const Parameters&) = default;
+Parameters& Parameters::operator=(const Parameters&) = default;
+
+Parameters::Parameters(Parameters&&) noexcept = default;
+Parameters& Parameters::operator=(Parameters&&) noexcept = default;
+
Parameters::~Parameters() = default;
size_t Parameters::getSerializedSize() const
diff --git a/vdslib/src/vespa/vdslib/container/parameters.h b/vdslib/src/vespa/vdslib/container/parameters.h
index d28e2cd9890..60ae3028719 100644
--- a/vdslib/src/vespa/vdslib/container/parameters.h
+++ b/vdslib/src/vespa/vdslib/container/parameters.h
@@ -46,6 +46,11 @@ public:
explicit Parameters(document::ByteBuffer& buffer);
~Parameters() override;
+ Parameters(const Parameters&);
+ Parameters& operator=(const Parameters&);
+ Parameters(Parameters&&) noexcept;
+ Parameters& operator=(Parameters&&) noexcept;
+
bool operator==(const Parameters &other) const;
size_t getSerializedSize() const;