summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-02-01 15:25:45 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-02-16 13:42:49 +0000
commite0195ce27f47717ad5ba59ea59ab027de31d703f (patch)
tree44a13aa38fcbf95b23df91e8051c1d2b8bb2f688
parent42b1512d4913778dde06ebe0b1a08257ead3155a (diff)
Add new Protobuf-based MessageBus DocumentAPI protocol
This adds an entirely new implementation of the internal MessageBus DocumentAPI protocol, which shall be functionally 1-to-1 compatible with the existing legacy protocol. New protobuf schemas have been added to the top-level documentapi module, which are separated into different domains of responsibility: * CRUD messages * Visiting messages * Data inspection messages As well as a schema for shared, common message types. Both C++ and Java protocol implementations separate serialization and deserialization into a codec abstraction per message type, which hides the boilerplate required for Protobuf buffer management. The Java version is a tad more verbose due to generics type-erasure. This protocol does _not_ currently support lazy (de-)serialization in Java, as the existing mechanisms for doing so are inherently tied to the legacy protocol version. Performance tests will decide if we need to introduce such functionality to the new protocol version. To avoid having the new protocol go live in production, this commit changes the semantics of how MessageBus version reporting works (at least for the near future); instead of reporting the current Vespa _release_ version, it reports the highest supported _protocol_ version. This lets us conditionally enable the new protocol by reporting a MessageBus version greater than or equal to the protocol version _iff_ the protocol should be active. The new protocol is disabled by default. Other changes: * Protocol tests have been moved up one package directory level to be aligned with the actual package of the classes they test. This allows for using package-protected constructors in the serialization tests. * `DocumentDeserializer` now exposes the underlying document type repo/manager. This is done to detangle `Document`/`DocumentUpdate` deserialization from the underlying wire buffer management. * `RemoveLocationMessage` at long last contains a bucket space, which was forgotten when we initially added this concept to the other messages, and where the pain of adding it in later was too big (not so anymore!). Unit tests for both C++ and Java have been hoisted from the legacy test suite, cleaned up and extended with additional cases. The C++ tests use the old unit test kit and should receive a good follow-up washing and GTest-rewrite. **Important**: due to how MessageBus protocol versioning works, the final protocol version is _not_ yet decided, as setting it requires syncing against our build systems. A follow-up commit will assign the final version as well as include all required binary test files.
-rw-r--r--container-dev/pom.xml4
-rw-r--r--container-documentapi/pom.xml7
-rw-r--r--document/abi-spec.json4
-rw-r--r--document/src/main/java/com/yahoo/document/Document.java2
-rw-r--r--document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java3
-rw-r--r--document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java2
-rw-r--r--document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java5
-rw-r--r--documentapi-dependencies/pom.xml4
-rw-r--r--documentapi/abi-spec.json6
-rw-r--r--documentapi/pom.xml4
-rw-r--r--documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java5
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java48
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java14
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java931
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java5
-rw-r--r--documentapi/src/protobuf/docapi_common.proto50
-rw-r--r--documentapi/src/protobuf/docapi_feed.proto71
-rw-r--r--documentapi/src/protobuf/docapi_inspect.proto48
-rw-r--r--documentapi/src/protobuf/docapi_visiting.proto115
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java)31
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java729
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java)5
-rw-r--r--documentapi/src/tests/messages/CMakeLists.txt10
-rw-r--r--documentapi/src/tests/messages/messages60test.cpp19
-rw-r--r--documentapi/src/tests/messages/messages60test.h6
-rw-r--r--documentapi/src/tests/messages/messages80test.cpp908
-rw-r--r--documentapi/src/tests/messages/testbase.cpp39
-rw-r--r--documentapi/src/tests/messages/testbase.h4
-rw-r--r--documentapi/src/vespa/documentapi/CMakeLists.txt2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/.gitignore2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt23
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp134
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.h4
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h17
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h10
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp7
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h8
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/visitor.h14
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp883
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h72
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp30
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routablerepository.h5
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java28
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp21
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h6
-rw-r--r--vdslib/src/vespa/vdslib/container/parameters.cpp6
-rw-r--r--vdslib/src/vespa/vdslib/container/parameters.h5
50 files changed, 4181 insertions, 181 deletions
diff --git a/container-dev/pom.xml b/container-dev/pom.xml
index 8c45a124e26..c3ceb0e8d07 100644
--- a/container-dev/pom.xml
+++ b/container-dev/pom.xml
@@ -104,6 +104,10 @@
<version>${project.version}</version>
<exclusions>
<exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.yahoo.vespa</groupId>
<artifactId>http-utils</artifactId>
</exclusion>
diff --git a/container-documentapi/pom.xml b/container-documentapi/pom.xml
index fbccc1a5184..7e6bd749b4a 100644
--- a/container-documentapi/pom.xml
+++ b/container-documentapi/pom.xml
@@ -37,6 +37,13 @@
<scope>provided</scope>
</dependency>
+ <!-- documentapi needs protobuf runtime, and it's not provided from the container -->
+ <!-- TODO: Remove this when we have a better solution for protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/document/abi-spec.json b/document/abi-spec.json
index 899c107a242..ca06e2547d7 100644
--- a/document/abi-spec.json
+++ b/document/abi-spec.json
@@ -2724,7 +2724,8 @@
"abstract"
],
"methods" : [
- "public abstract com.yahoo.io.GrowableByteBuffer getBuf()"
+ "public abstract com.yahoo.io.GrowableByteBuffer getBuf()",
+ "public abstract com.yahoo.document.DocumentTypeManager getTypeRepo()"
],
"fields" : [ ]
},
@@ -3015,6 +3016,7 @@
],
"methods" : [
"public final com.yahoo.document.DocumentTypeManager getDocumentTypeManager()",
+ "public com.yahoo.document.DocumentTypeManager getTypeRepo()",
"public void read(com.yahoo.document.Document)",
"public void read(com.yahoo.vespa.objects.FieldBase, com.yahoo.document.Document)",
"public void read(com.yahoo.vespa.objects.FieldBase, com.yahoo.document.datatypes.FieldValue)",
diff --git a/document/src/main/java/com/yahoo/document/Document.java b/document/src/main/java/com/yahoo/document/Document.java
index 4bb93426994..294750f40f3 100644
--- a/document/src/main/java/com/yahoo/document/Document.java
+++ b/document/src/main/java/com/yahoo/document/Document.java
@@ -123,6 +123,7 @@ public class Document extends StructuredFieldValue {
}
public int getSerializedSize() throws SerializationException {
+ // TODO shouldn't this be createHead()?
DocumentSerializer data = DocumentSerializerFactory.create6(new GrowableByteBuffer(64 * 1024, 2.0f));
data.write(this);
return data.getBuf().position();
@@ -135,6 +136,7 @@ public class Document extends StructuredFieldValue {
public final int getApproxSize() { return 4096; }
public void serialize(OutputStream out) throws SerializationException {
+ // TODO shouldn't this be createHead()?
DocumentSerializer writer = DocumentSerializerFactory.create6(new GrowableByteBuffer(64 * 1024, 2.0f));
writer.write(this);
GrowableByteBuffer data = writer.getBuf();
diff --git a/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java b/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java
index 59849d88c28..f6e3a75c7b2 100644
--- a/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java
+++ b/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.serialization;
+import com.yahoo.document.DocumentTypeManager;
import com.yahoo.io.GrowableByteBuffer;
/**
@@ -17,5 +18,7 @@ public interface DocumentDeserializer extends DocumentReader, DocumentUpdateRead
*/
GrowableByteBuffer getBuf();
+ DocumentTypeManager getTypeRepo();
+
}
diff --git a/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java b/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java
index 9d52bc4aead..faaad95d5e1 100644
--- a/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java
+++ b/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java
@@ -15,6 +15,6 @@ public interface DocumentSerializer extends DocumentWriter, SpanNodeWriter, Anno
/**
* Returns the underlying buffer used for serialization.
*/
- public GrowableByteBuffer getBuf();
+ GrowableByteBuffer getBuf();
}
diff --git a/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java b/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java
index 5f24a2d8f60..b508f0d2c7c 100644
--- a/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java
+++ b/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java
@@ -91,6 +91,11 @@ public class VespaDocumentDeserializer6 extends BufferSerializer implements Docu
final public DocumentTypeManager getDocumentTypeManager() { return manager; }
+ @Override
+ public DocumentTypeManager getTypeRepo() {
+ return manager;
+ }
+
public void read(Document document) {
read(null, document);
}
diff --git a/documentapi-dependencies/pom.xml b/documentapi-dependencies/pom.xml
index 7fd73017d8b..99b4616ffe7 100644
--- a/documentapi-dependencies/pom.xml
+++ b/documentapi-dependencies/pom.xml
@@ -16,6 +16,10 @@
<dependencies>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>annotations</artifactId>
<version>${project.version}</version>
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 0252da8a4d1..d00c89ae737 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -2078,6 +2078,7 @@
"public void <init>(com.yahoo.document.DocumentId, long, boolean)",
"public void <init>(com.yahoo.document.GlobalId, long, boolean)",
"public void <init>(com.yahoo.vespa.objects.Deserializer)",
+ "public boolean hasDocId()",
"public com.yahoo.document.DocumentId getDocId()",
"public com.yahoo.document.GlobalId getGid()",
"public long getTimestamp()",
@@ -2509,11 +2510,13 @@
"public"
],
"methods" : [
+ "public void <init>(java.lang.String, java.lang.String)",
"public void <init>(java.lang.String)",
"public java.lang.String getDocumentSelection()",
"public com.yahoo.documentapi.messagebus.protocol.DocumentReply createReply()",
"public int getType()",
- "public com.yahoo.document.BucketId getBucketId()"
+ "public com.yahoo.document.BucketId getBucketId()",
+ "public java.lang.String getBucketSpace()"
],
"fields" : [ ]
},
@@ -3072,6 +3075,7 @@
],
"methods" : [
"public void <init>()",
+ "public void <init>(java.lang.String)",
"public java.lang.String getResults()",
"public void setResults(java.lang.String)"
],
diff --git a/documentapi/pom.xml b/documentapi/pom.xml
index 4b026d7f359..9188b803ca7 100644
--- a/documentapi/pom.xml
+++ b/documentapi/pom.xml
@@ -47,6 +47,10 @@
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
<groupId>com.helger.maven</groupId>
<artifactId>ph-javacc-maven-plugin</artifactId>
<executions>
diff --git a/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
new file mode 100644
index 00000000000..4331a3d461e
--- /dev/null
+++ b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java
@@ -0,0 +1,5 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package ai.vespa.documentapi.protobuf;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index ec49a0c570f..061d9e9afb9 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -256,9 +256,9 @@ public class DocumentProtocol implements Protocol {
private DocumentProtocol(DocumentTypeManager docMan, String configId,
DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) {
- if (docMan != null)
+ if (docMan != null) {
this.docMan = docMan;
- else {
+ } else {
this.docMan = new DocumentTypeManager();
DocumentTypeManagerConfigurer.configure(this.docMan, configId);
}
@@ -275,6 +275,11 @@ public class DocumentProtocol implements Protocol {
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
+ registerLegacyV6Factories();
+ registerV8Factories();
+ }
+
+ private void registerLegacyV6Factories() {
// Prepare version specifications to use when adding routable factories.
VersionSpecification version6 = new VersionSpecification(6, 221);
@@ -311,11 +316,48 @@ public class DocumentProtocol implements Protocol {
putRoutableFactory(REPLY_REMOVELOCATION, new RoutableFactories60.RemoveLocationReplyFactory(), from6);
putRoutableFactory(REPLY_STATBUCKET, new RoutableFactories60.StatBucketReplyFactory(), from6);
putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
- putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6);
putRoutableFactory(REPLY_VISITORINFO, new RoutableFactories60.VisitorInfoReplyFactory(), from6);
putRoutableFactory(REPLY_WRONGDISTRIBUTION, new RoutableFactories60.WrongDistributionReplyFactory(), from6);
}
+ private void registerV8Factories() {
+ var version8 = new VersionSpecification(8, 304); // Must be same as in C++ impl
+ var from8 = Collections.singletonList(version8);
+
+ putRoutableFactory(MESSAGE_CREATEVISITOR, RoutableFactories80.createCreateVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_DOCUMENTLIST, RoutableFactories80.createDocumentListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETLIST, RoutableFactories80.createGetBucketListMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_GETDOCUMENT, RoutableFactories80.createGetDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_MAPVISITOR, RoutableFactories80.createMapVisitorMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_PUTDOCUMENT, RoutableFactories80.createPutDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_QUERYRESULT, RoutableFactories80.createQueryResultMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_REMOVELOCATION, RoutableFactories80.createRemoveLocationMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_STATBUCKET, RoutableFactories80.createStatBucketMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentMessageFactory(), from8);
+ putRoutableFactory(MESSAGE_VISITORINFO, RoutableFactories80.createVisitorInfoMessageFactory(), from8);
+ putRoutableFactory(REPLY_CREATEVISITOR, RoutableFactories80.createCreateVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTIGNORED, RoutableFactories80.createDocumentIgnoredReplyFactory(), from8);
+ putRoutableFactory(REPLY_DOCUMENTLIST, RoutableFactories80.createDocumentListReplyFactory(), from8);
+ putRoutableFactory(REPLY_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETLIST, RoutableFactories80.createGetBucketListReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateReplyFactory(), from8);
+ putRoutableFactory(REPLY_GETDOCUMENT, RoutableFactories80.createGetDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_MAPVISITOR, RoutableFactories80.createMapVisitorReplyFactory(), from8);
+ putRoutableFactory(REPLY_PUTDOCUMENT, RoutableFactories80.createPutDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_QUERYRESULT, RoutableFactories80.createQueryResultReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_REMOVELOCATION, RoutableFactories80.createRemoveLocationReplyFactory(), from8);
+ putRoutableFactory(REPLY_STATBUCKET, RoutableFactories80.createStatBucketReplyFactory(), from8);
+ putRoutableFactory(REPLY_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentReplyFactory(), from8);
+ putRoutableFactory(REPLY_VISITORINFO, RoutableFactories80.createVisitorInfoReplyFactory(), from8);
+ putRoutableFactory(REPLY_WRONGDISTRIBUTION, RoutableFactories80.createWrongDistributionReplyFactory(), from8);
+ }
+
/**
* Adds a new routable factory to this protocol. This method is thread-safe, and may be invoked on a protocol object
* that is already in use by a message bus instance. Notice that the name you supply for a factory is the
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
index 727d1e4cd89..10ca1930317 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java
@@ -39,6 +39,8 @@ public class DocumentState implements Comparable<DocumentState> {
removeEntry = buf.getByte(null)>0;
}
+ public boolean hasDocId() { return docId != null; }
+
public DocumentId getDocId() {
return docId;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
index 957e65c54e1..25862eb39f3 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java
@@ -6,15 +6,17 @@ import com.yahoo.document.select.BucketSelector;
import java.util.Set;
/**
- * Message (VDS only) to remove an entire location for users using n= or g= schemes.
+ * Message to remove an entire location for users using n= or g= schemes.
* We use a document selection so the user can specify a subset of those documents to be deleted
* if they wish.
*/
public class RemoveLocationMessage extends DocumentMessage {
String documentSelection;
BucketId bucketId;
+ private final String bucketSpace;
- public RemoveLocationMessage(String documentSelection) {
+ public RemoveLocationMessage(String documentSelection, String bucketSpace) {
+ this.bucketSpace = bucketSpace;
try {
this.documentSelection = documentSelection;
BucketSelector bucketSel = new BucketSelector(new BucketIdFactory());
@@ -32,6 +34,10 @@ public class RemoveLocationMessage extends DocumentMessage {
}
}
+ public RemoveLocationMessage(String documentSelection) {
+ this(documentSelection, FixedBucketSpaces.defaultSpace());
+ }
+
public String getDocumentSelection() {
return documentSelection;
}
@@ -49,4 +55,8 @@ public class RemoveLocationMessage extends DocumentMessage {
public BucketId getBucketId() {
return bucketId;
}
+
+ public String getBucketSpace() {
+ return bucketSpace;
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
new file mode 100644
index 00000000000..e03a7a05a4b
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java
@@ -0,0 +1,931 @@
+package com.yahoo.documentapi.messagebus.protocol;
+
+import ai.vespa.documentapi.protobuf.DocapiCommon;
+import ai.vespa.documentapi.protobuf.DocapiFeed;
+import ai.vespa.documentapi.protobuf.DocapiInspect;
+import ai.vespa.documentapi.protobuf.DocapiVisiting;
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.GlobalId;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.serialization.DocumentDeserializer;
+import com.yahoo.document.serialization.DocumentDeserializerFactory;
+import com.yahoo.document.serialization.DocumentSerializer;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.vdslib.DocumentSummary;
+import com.yahoo.vdslib.SearchResult;
+import com.yahoo.vdslib.VisitorStatistics;
+import com.yahoo.vespa.objects.BufferSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Implementation of MessageBus message request/response serialization built around Protocol Buffers.
+ */
+abstract class RoutableFactories80 {
+
+ private static class ProtobufCodec<DocApiT extends Routable, ProtoT extends AbstractMessage> implements RoutableFactory {
+
+ private final Class<DocApiT> apiClass;
+ private final Function<DocApiT, ProtoT> encoderFn;
+ private final Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodec(Class<DocApiT> apiClass,
+ Function<DocApiT, ProtoT> encoderFn,
+ Function<DocumentDeserializer, DocApiT> decoderFn) {
+ this.apiClass = apiClass;
+ this.encoderFn = encoderFn;
+ this.decoderFn = decoderFn;
+ }
+
+ @Override
+ public boolean encode(Routable obj, DocumentSerializer out) {
+ try {
+ var protoMsg = encoderFn.apply(apiClass.cast(obj));
+ var protoStream = CodedOutputStream.newInstance(out.getBuf().getByteBuffer()); // Not AutoCloseable...
+ try {
+ protoMsg.writeTo(protoStream);
+ } finally {
+ protoStream.flush();
+ }
+ } catch (IOException | UnsupportedOperationException e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Routable decode(DocumentDeserializer in) {
+ return decoderFn.apply(in);
+ }
+ }
+
+ private static class ProtobufCodecBuilder<DocApiT extends Routable, ProtoT extends AbstractMessage> {
+
+ private final Class<DocApiT> apiClass;
+ private final Class<ProtoT> protoClass;
+ private Function<DocApiT, ProtoT> encoderFn;
+ private Function<DocumentDeserializer, DocApiT> decoderFn;
+
+ ProtobufCodecBuilder(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ this.apiClass = apiClass;
+ this.protoClass = protoClass;
+ }
+
+ static <DocApiT extends Routable, ProtoT extends AbstractMessage> ProtobufCodecBuilder<DocApiT, ProtoT>
+ of(Class<DocApiT> apiClass, Class<ProtoT> protoClass) {
+ return new ProtobufCodecBuilder<>(apiClass, protoClass);
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> encoder(Function<DocApiT, ProtoT> fn) {
+ if (encoderFn != null) {
+ throw new IllegalArgumentException("Encoder already set");
+ }
+ encoderFn = fn;
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoder(ProtoT messagePrototype, Function<ProtoT, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj));
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodecBuilder<DocApiT, ProtoT> decoderWithRepo(ProtoT messagePrototype, BiFunction<ProtoT, DocumentTypeManager, DocApiT> fn) {
+ if (decoderFn != null) {
+ throw new IllegalArgumentException("Decoder already set");
+ }
+ decoderFn = (buf) -> {
+ try {
+ var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer());
+ return fn.apply(protoClass.cast(protoObj), buf.getTypeRepo());
+ } catch (IOException e) {
+ return null;
+ }
+ };
+ return this;
+ }
+
+ ProtobufCodec<DocApiT, ProtoT> build() {
+ Objects.requireNonNull(encoderFn, "Encoder has not been set");
+ Objects.requireNonNull(decoderFn, "Decoder has not been set");
+ return new ProtobufCodec<>(apiClass, encoderFn, decoderFn);
+ }
+ }
+
+ // Protobuf codec helpers for common types
+
+ private static DocapiCommon.GlobalId toProtoGlobalId(GlobalId gid) {
+ return DocapiCommon.GlobalId.newBuilder().setRawGid(ByteString.copyFrom(gid.getRawId())).build();
+ }
+
+ private static GlobalId fromProtoGlobalId(DocapiCommon.GlobalId gid) {
+ return new GlobalId(gid.getRawGid().toByteArray());
+ }
+
+ private static DocapiCommon.BucketId toProtoBucketId(BucketId id) {
+ return DocapiCommon.BucketId.newBuilder().setRawId(id.getRawId()).build();
+ }
+
+ private static BucketId fromProtoBucketId(DocapiCommon.BucketId id) {
+ return new BucketId(id.getRawId());
+ }
+
+ private static DocapiCommon.DocumentId toProtoDocId(DocumentId id) {
+ return DocapiCommon.DocumentId.newBuilder().setId(id.toString()).build();
+ }
+
+ private static DocumentId fromProtoDocId(DocapiCommon.DocumentId id) {
+ return new DocumentId(id.getId());
+ }
+
+ private static DocapiCommon.FieldSet toProtoFieldSet(String rawFieldSpec) {
+ return DocapiCommon.FieldSet.newBuilder().setSpec(rawFieldSpec).build();
+ }
+
+ private static String fromProtoFieldSet(DocapiCommon.FieldSet fieldSet) {
+ return fieldSet.getSpec();
+ }
+
+ private static ByteBuffer serializeDoc(Document doc) {
+ var buf = new GrowableByteBuffer();
+ doc.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiCommon.Document toProtoDocument(Document doc) {
+ // TODO a lot of copying here... Consider adding Document serialization to OutputStream
+ // so that we can serialize directly into a ByteString.Output instance.
+ return toProtoDocument(serializeDoc(doc));
+ }
+
+ private static DocapiCommon.Document toProtoDocument(ByteBuffer rawDocData) {
+ return DocapiCommon.Document.newBuilder()
+ .setPayload(ByteString.copyFrom(rawDocData))
+ .build();
+ }
+
+ private static Document fromProtoDocument(DocapiCommon.Document protoDoc, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoDoc.getPayload().asReadOnlyByteBuffer()));
+ return Document.createDocument(deserializer);
+ }
+
+ private static Document deserializeDoc(ByteBuffer rawDocData, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(rawDocData));
+ return Document.createDocument(deserializer);
+ }
+
+ private static DocapiFeed.TestAndSetCondition toProtoTasCondition(TestAndSetCondition tasCond) {
+ return DocapiFeed.TestAndSetCondition.newBuilder()
+ .setSelection(tasCond.getSelection())
+ .build();
+ }
+
+ private static TestAndSetCondition fromProtoTasCondition(DocapiFeed.TestAndSetCondition protoTasCond) {
+ // Note: the empty (default) string implies "no condition present"
+ return new TestAndSetCondition(protoTasCond.getSelection());
+ }
+
+ private static ByteBuffer serializeUpdate(DocumentUpdate update) {
+ var buf = new GrowableByteBuffer();
+ update.serialize(buf);
+ buf.flip();
+ return buf.getByteBuffer();
+ }
+
+ private static DocapiFeed.DocumentUpdate toProtoUpdate(DocumentUpdate update) {
+ // TODO also consider DocumentUpdate serialization directly to OutputStream to avoid unneeded copying
+ return DocapiFeed.DocumentUpdate.newBuilder()
+ .setPayload(ByteString.copyFrom(serializeUpdate(update)))
+ .build();
+ }
+
+ private static DocumentUpdate fromProtoUpdate(DocapiFeed.DocumentUpdate protoUpdate, DocumentTypeManager repo) {
+ var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoUpdate.getPayload().asReadOnlyByteBuffer()));
+ return new DocumentUpdate(deserializer);
+ }
+
+ private static DocapiCommon.DocumentSelection toProtoDocumentSelection(String rawSelection) {
+ return DocapiCommon.DocumentSelection.newBuilder()
+ .setSelection(rawSelection)
+ .build();
+ }
+
+ private static String fromProtoDocumentSelection(DocapiCommon.DocumentSelection protoSelection) {
+ return protoSelection.getSelection();
+ }
+
+ private static DocapiCommon.BucketSpace toProtoBucketSpace(String spaceName) {
+ return DocapiCommon.BucketSpace.newBuilder()
+ .setName(spaceName)
+ .build();
+ }
+
+ private static String fromProtoBucketSpace(DocapiCommon.BucketSpace protoSpace) {
+ return protoSpace.getName();
+ }
+
+ private static DocapiCommon.ClusterState toProtoClusterState(String stateStr) {
+ return DocapiCommon.ClusterState.newBuilder().setStateString(stateStr).build();
+ }
+
+ private static String fromProtoClusterState(DocapiCommon.ClusterState state) {
+ return state.getStateString();
+ }
+
+ // Message codec implementations
+
+ // ---------------------------------------------
+ // Get request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentMessage.class, DocapiFeed.GetDocumentRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.GetDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .build())
+ .decoder(DocapiFeed.GetDocumentRequest.getDefaultInstance(), (protoMsg) ->
+ new GetDocumentMessage(
+ fromProtoDocId(protoMsg.getDocumentId()),
+ fromProtoFieldSet(protoMsg.getFieldSet())))
+ .build();
+ }
+
+ static RoutableFactory createGetDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetDocumentReply.class, DocapiFeed.GetDocumentResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiFeed.GetDocumentResponse.newBuilder()
+ .setLastModified(apiReply.getLastModified());
+ var maybeDoc = apiReply.getDocument();
+ if (maybeDoc != null) {
+ builder.setDocument(toProtoDocument(serializeDoc(maybeDoc)));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.GetDocumentResponse.getDefaultInstance(), (protoReply, repo) -> {
+ GetDocumentReply reply;
+ if (protoReply.hasDocument()) {
+ var doc = fromProtoDocument(protoReply.getDocument(), repo);
+ doc.setLastModified(protoReply.getLastModified());
+ reply = new GetDocumentReply(doc);
+ } else {
+ reply = new GetDocumentReply(null);
+ }
+ reply.setLastModified(protoReply.getLastModified());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Put request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createPutDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(PutDocumentMessage.class, DocapiFeed.PutDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.PutDocumentRequest.newBuilder()
+ .setForceAssignTimestamp(apiMsg.getTimestamp())
+ .setCreateIfMissing(apiMsg.getCreateIfNonExistent())
+ .setDocument(toProtoDocument(apiMsg.getDocumentPut().getDocument()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.PutDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var doc = fromProtoDocument(protoMsg.getDocument(), repo);
+ var msg = new PutDocumentMessage(new DocumentPut(doc));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ msg.setTimestamp(protoMsg.getForceAssignTimestamp());
+ msg.setCreateIfNonExistent(protoMsg.getCreateIfMissing());
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createPutDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WriteDocumentReply.class, DocapiFeed.PutDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.PutDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.PutDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Update request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createUpdateDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentMessage.class, DocapiFeed.UpdateDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.UpdateDocumentRequest.newBuilder()
+ .setUpdate(toProtoUpdate(apiMsg.getDocumentUpdate()))
+ .setExpectedOldTimestamp(apiMsg.getOldTimestamp())
+ .setForceAssignTimestamp(apiMsg.getNewTimestamp());
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiFeed.UpdateDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new UpdateDocumentMessage(fromProtoUpdate(protoMsg.getUpdate(), repo));
+ msg.setOldTimestamp(protoMsg.getExpectedOldTimestamp());
+ msg.setNewTimestamp(protoMsg.getForceAssignTimestamp());
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createUpdateDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(UpdateDocumentReply.class, DocapiFeed.UpdateDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.UpdateDocumentResponse.newBuilder()
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .setWasFound(apiReply.wasFound())
+ .build())
+ .decoder(DocapiFeed.UpdateDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new UpdateDocumentReply();
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ reply.setWasFound(protoReply.getWasFound());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // Remove request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveDocumentMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentMessage.class, DocapiFeed.RemoveDocumentRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiFeed.RemoveDocumentRequest.newBuilder()
+ .setDocumentId(toProtoDocId(apiMsg.getDocumentId()));
+ if (apiMsg.getCondition().isPresent()) {
+ builder.setCondition(toProtoTasCondition(apiMsg.getCondition()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiFeed.RemoveDocumentRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new RemoveDocumentMessage(fromProtoDocId(protoMsg.getDocumentId()));
+ if (protoMsg.hasCondition()) {
+ msg.setCondition(fromProtoTasCondition(protoMsg.getCondition()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createRemoveDocumentReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveDocumentReply.class, DocapiFeed.RemoveDocumentResponse.class)
+ .encoder((apiReply) ->
+ DocapiFeed.RemoveDocumentResponse.newBuilder()
+ .setWasFound(apiReply.wasFound())
+ .setModificationTimestamp(apiReply.getHighestModificationTimestamp())
+ .build())
+ .decoder(DocapiFeed.RemoveDocumentResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new RemoveDocumentReply();
+ reply.setWasFound(protoReply.getWasFound());
+ reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp());
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // RemoveLocation request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createRemoveLocationMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(RemoveLocationMessage.class, DocapiFeed.RemoveLocationRequest.class)
+ .encoder((apiMsg) ->
+ DocapiFeed.RemoveLocationRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiFeed.RemoveLocationRequest.getDefaultInstance(), (protoMsg) ->
+ new RemoveLocationMessage(
+ fromProtoDocumentSelection(protoMsg.getSelection()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createRemoveLocationReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentReply.class, DocapiFeed.RemoveLocationResponse.class)
+ .encoder((apiReply) -> DocapiFeed.RemoveLocationResponse.newBuilder().build())
+ .decoder(DocapiFeed.RemoveLocationResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentReply(DocumentProtocol.REPLY_REMOVELOCATION))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // CreateVisitor request and response
+ // ---------------------------------------------
+
+ private static DocapiVisiting.VisitorParameter toProtoVisitorParameter(String key, byte[] value) {
+ return DocapiVisiting.VisitorParameter.newBuilder()
+ .setKey(key)
+ .setValue(ByteString.copyFrom(value))
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorMessage.class, DocapiVisiting.CreateVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.CreateVisitorRequest.newBuilder()
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setVisitorLibraryName(apiMsg.getLibraryName())
+ .setInstanceId(apiMsg.getInstanceId())
+ .setControlDestination(apiMsg.getControlDestination())
+ .setDataDestination(apiMsg.getDataDestination())
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet()))
+ .setMaxPendingReplyCount(apiMsg.getMaxPendingReplyCount())
+ .setFromTimestamp(apiMsg.getFromTimestamp())
+ .setToTimestamp(apiMsg.getToTimestamp())
+ .setVisitTombstones(apiMsg.getVisitRemoves())
+ .setVisitInconsistentBuckets(apiMsg.getVisitInconsistentBuckets())
+ .setMaxBucketsPerVisitor(apiMsg.getMaxBucketsPerVisitor());
+ for (var id : apiMsg.getBuckets()) {
+ builder.addBuckets(toProtoBucketId(id));
+ }
+ for (var entry : apiMsg.getParameters().entrySet()) {
+ builder.addParameters(toProtoVisitorParameter(entry.getKey(), entry.getValue()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new CreateVisitorMessage();
+ msg.setBucketSpace(fromProtoBucketSpace(protoMsg.getBucketSpace()));
+ msg.setLibraryName(protoMsg.getVisitorLibraryName());
+ msg.setInstanceId(protoMsg.getInstanceId());
+ msg.setControlDestination(protoMsg.getControlDestination());
+ msg.setDataDestination(protoMsg.getDataDestination());
+ msg.setDocumentSelection(fromProtoDocumentSelection(protoMsg.getSelection()));
+ msg.setFieldSet(fromProtoFieldSet(protoMsg.getFieldSet()));
+ msg.setMaxPendingReplyCount(protoMsg.getMaxPendingReplyCount());
+ msg.setFromTimestamp(protoMsg.getFromTimestamp());
+ msg.setToTimestamp(protoMsg.getToTimestamp());
+ msg.setVisitRemoves(protoMsg.getVisitTombstones());
+ msg.setVisitInconsistentBuckets(protoMsg.getVisitInconsistentBuckets());
+ msg.setMaxBucketsPerVisitor(protoMsg.getMaxBucketsPerVisitor());
+ for (var protoId : protoMsg.getBucketsList()) {
+ msg.getBuckets().add(fromProtoBucketId(protoId));
+ }
+ for (var protoParam : protoMsg.getParametersList()) {
+ msg.getParameters().put(protoParam.getKey(), protoParam.getValue().toByteArray());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createCreateVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(CreateVisitorReply.class, DocapiVisiting.CreateVisitorResponse.class)
+ .encoder((apiReply) -> {
+ var stats = apiReply.getVisitorStatistics();
+ return DocapiVisiting.CreateVisitorResponse.newBuilder()
+ .setLastBucket(toProtoBucketId(apiReply.getLastBucket()))
+ .setStatistics(DocapiVisiting.VisitorStatistics.newBuilder()
+ .setBucketsVisited(stats.getBucketsVisited())
+ .setDocumentsVisited(stats.getDocumentsVisited())
+ .setBytesVisited(stats.getBytesVisited())
+ .setDocumentsReturned(stats.getDocumentsReturned())
+ .setBytesReturned(stats.getBytesReturned())
+ .build())
+ .build();
+ })
+ .decoder(DocapiVisiting.CreateVisitorResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR);
+ reply.setLastBucket(fromProtoBucketId(protoReply.getLastBucket()));
+ var protoVs = protoReply.getStatistics();
+ var vs = new VisitorStatistics();
+ vs.setBucketsVisited(protoVs.getBucketsVisited());
+ vs.setDocumentsVisited(protoVs.getDocumentsVisited());
+ vs.setBytesVisited(protoVs.getBytesVisited());
+ vs.setDocumentsReturned(protoVs.getDocumentsReturned());
+ vs.setBytesReturned(protoVs.getBytesReturned());
+ reply.setVisitorStatistics(vs);
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DestroyVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createDestroyVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DestroyVisitorMessage.class, DocapiVisiting.DestroyVisitorRequest.class)
+ .encoder((apiMsg) ->
+ DocapiVisiting.DestroyVisitorRequest.newBuilder()
+ .setInstanceId(apiMsg.getInstanceId())
+ .build())
+ .decoder(DocapiVisiting.DestroyVisitorRequest.getDefaultInstance(),
+ (protoMsg) -> new DestroyVisitorMessage(protoMsg.getInstanceId()))
+ .build();
+ }
+
+ static RoutableFactory createDestroyVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DestroyVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DestroyVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DestroyVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DESTROYVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // MapVisitor request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createMapVisitorMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(MapVisitorMessage.class, DocapiVisiting.MapVisitorRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.MapVisitorRequest.newBuilder();
+ for (var entry : apiMsg.getData().entrySet()) {
+ // FIXME MapVisitorMessage uses Parameters (i.e. string -> bytes) in C++, but string -> string in Java...
+ // ... but due to this, UTF-8 is effectively enforced anyway. Not that anything actually uses this :I
+ builder.addData(toProtoVisitorParameter(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8)));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.MapVisitorRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new MapVisitorMessage();
+ for (var param : protoMsg.getDataList()) {
+ msg.getData().put(param.getKey(), param.getValue().toStringUtf8());
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createMapVisitorReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.MapVisitorResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.MapVisitorResponse.newBuilder().build())
+ .decoder(DocapiVisiting.MapVisitorResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_MAPVISITOR))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // QueryResult request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createQueryResultMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(QueryResultMessage.class, DocapiVisiting.QueryResultRequest.class)
+ .encoder((apiMsg) -> {
+ // Serialization of QueryResultMessages is not implemented in Java (receive only)
+ throw new UnsupportedOperationException("Serialization of QueryResultMessage instances is not supported");
+ })
+ .decoder(DocapiVisiting.QueryResultRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new QueryResultMessage();
+ // Explicitly enforce presence of result/summary fields, as our object is not necessarily
+ // well-defined if these have not been initialized.
+ if (!protoMsg.hasSearchResult() || !protoMsg.hasDocumentSummary()) {
+ throw new IllegalArgumentException("Query result does not have all required fields set");
+ }
+ // We have to use toByteArray() instead of asReadOnlyByteBuffer(), as the deserialization routines
+ // try to fetch the raw arrays, which are considered mutable (causing a ReadOnlyBufferException).
+ msg.setSearchResult(new SearchResult(new BufferSerializer(
+ protoMsg.getSearchResult().getPayload().toByteArray())));
+ msg.setSummary(new DocumentSummary(new BufferSerializer(
+ protoMsg.getDocumentSummary().getPayload().toByteArray())));
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createQueryResultReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.QueryResultResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.QueryResultResponse.newBuilder().build())
+ .decoder(DocapiVisiting.QueryResultResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_QUERYRESULT))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // VisitorInfo request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createVisitorInfoMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorInfoMessage.class, DocapiVisiting.VisitorInfoRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.VisitorInfoRequest.newBuilder()
+ .setErrorMessage(apiMsg.getErrorMessage());
+ for (var id : apiMsg.getFinishedBuckets()) {
+ builder.addFinishedBuckets(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.VisitorInfoRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new VisitorInfoMessage();
+ msg.setErrorMessage(protoMsg.getErrorMessage());
+ for (var protoId : protoMsg.getFinishedBucketsList()) {
+ msg.getFinishedBuckets().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createVisitorInfoReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.VisitorInfoResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.VisitorInfoResponse.newBuilder().build())
+ .decoder(DocapiVisiting.VisitorInfoResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_VISITORINFO))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentList request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentListMessage.class, DocapiVisiting.DocumentListRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.DocumentListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()));
+ for (var doc : apiMsg.getDocuments()) {
+ builder.addEntries(DocapiVisiting.DocumentListRequest.Entry.newBuilder()
+ .setTimestamp(doc.getTimestamp())
+ .setIsTombstone(doc.isRemoveEntry())
+ .setDocument(toProtoDocument(doc.getDocument())));
+ }
+ return builder.build();
+ })
+ .decoderWithRepo(DocapiVisiting.DocumentListRequest.getDefaultInstance(), (protoMsg, repo) -> {
+ var msg = new DocumentListMessage();
+ msg.setBucketId(fromProtoBucketId(protoMsg.getBucketId()));
+ for (var entry : protoMsg.getEntriesList()) {
+ msg.getDocuments().add(new DocumentListEntry(
+ fromProtoDocument(entry.getDocument(), repo),
+ entry.getTimestamp(),
+ entry.getIsTombstone()));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createDocumentListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.DocumentListResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.DocumentListResponse.newBuilder().build())
+ .decoder(DocapiVisiting.DocumentListResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // EmptyBuckets request and response
+ // TODO this should be deprecated
+ // ---------------------------------------------
+
+ static RoutableFactory createEmptyBucketsMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(EmptyBucketsMessage.class, DocapiVisiting.EmptyBucketsRequest.class)
+ .encoder((apiMsg) -> {
+ var builder = DocapiVisiting.EmptyBucketsRequest.newBuilder();
+ for (var id : apiMsg.getBucketIds()) {
+ builder.addBucketIds(toProtoBucketId(id));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiVisiting.EmptyBucketsRequest.getDefaultInstance(), (protoMsg) -> {
+ var msg = new EmptyBucketsMessage();
+ for (var protoId : protoMsg.getBucketIdsList()) {
+ msg.getBucketIds().add(fromProtoBucketId(protoId));
+ }
+ return msg;
+ })
+ .build();
+ }
+
+ static RoutableFactory createEmptyBucketsReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(VisitorReply.class, DocapiVisiting.EmptyBucketsResponse.class)
+ .encoder((apiReply) -> DocapiVisiting.EmptyBucketsResponse.newBuilder().build())
+ .decoder(DocapiVisiting.EmptyBucketsResponse.getDefaultInstance(),
+ (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_EMPTYBUCKETS))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketList request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketListMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListMessage.class, DocapiInspect.GetBucketListRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketListRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .build())
+ .decoder(DocapiInspect.GetBucketListRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketListMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketListReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketListReply.class, DocapiInspect.GetBucketListResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketListResponse.newBuilder();
+ for (var info : apiReply.getBuckets()) {
+ builder.addBucketInfo(DocapiInspect.BucketInformation.newBuilder()
+ .setBucketId(toProtoBucketId(info.getBucketId()))
+ .setInfo(info.getBucketInformation()));
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketListResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketListReply();
+ for (var info : protoReply.getBucketInfoList()) {
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(
+ fromProtoBucketId(info.getBucketId()),
+ info.getInfo()));
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // GetBucketState request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createGetBucketStateMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateMessage.class, DocapiInspect.GetBucketStateRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.GetBucketStateRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .build())
+ .decoder(DocapiInspect.GetBucketStateRequest.getDefaultInstance(), (protoMsg) ->
+ new GetBucketStateMessage(fromProtoBucketId(protoMsg.getBucketId())))
+ .build();
+ }
+
+ static RoutableFactory createGetBucketStateReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(GetBucketStateReply.class, DocapiInspect.GetBucketStateResponse.class)
+ .encoder((apiReply) -> {
+ var builder = DocapiInspect.GetBucketStateResponse.newBuilder();
+ for (var state : apiReply.getBucketState()) {
+ var stateBuilder = DocapiInspect.DocumentState.newBuilder()
+ .setTimestamp(state.getTimestamp())
+ .setIsTombstone(state.isRemoveEntry());
+ if (state.hasDocId()) {
+ stateBuilder.setDocumentId(toProtoDocId(state.getDocId()));
+ } else {
+ stateBuilder.setGlobalId(toProtoGlobalId(state.getGid()));
+ }
+ builder.addStates(stateBuilder);
+ }
+ return builder.build();
+ })
+ .decoder(DocapiInspect.GetBucketStateResponse.getDefaultInstance(), (protoReply) -> {
+ var reply = new GetBucketStateReply();
+ for (var state : protoReply.getStatesList()) {
+ if (state.hasDocumentId()) {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoDocId(state.getDocumentId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ } else {
+ reply.getBucketState().add(new DocumentState(
+ fromProtoGlobalId(state.getGlobalId()),
+ state.getTimestamp(),
+ state.getIsTombstone()));
+ }
+ }
+ return reply;
+ })
+ .build();
+ }
+
+ // ---------------------------------------------
+ // StatBucket request and response
+ // ---------------------------------------------
+
+ static RoutableFactory createStatBucketMessageFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketMessage.class, DocapiInspect.StatBucketRequest.class)
+ .encoder((apiMsg) ->
+ DocapiInspect.StatBucketRequest.newBuilder()
+ .setBucketId(toProtoBucketId(apiMsg.getBucketId()))
+ .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace()))
+ .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection()))
+ .build())
+ .decoder(DocapiInspect.StatBucketRequest.getDefaultInstance(), (protoMsg) ->
+ new StatBucketMessage(
+ fromProtoBucketId(protoMsg.getBucketId()),
+ fromProtoBucketSpace(protoMsg.getBucketSpace()),
+ fromProtoDocumentSelection(protoMsg.getSelection())))
+ .build();
+ }
+
+ static RoutableFactory createStatBucketReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(StatBucketReply.class, DocapiInspect.StatBucketResponse.class)
+ .encoder((apiReply) ->
+ DocapiInspect.StatBucketResponse.newBuilder()
+ .setResults(apiReply.getResults())
+ .build())
+ .decoder(DocapiInspect.StatBucketResponse.getDefaultInstance(), (protoReply) ->
+ new StatBucketReply(protoReply.getResults()))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // WrongDistribution response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createWrongDistributionReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(WrongDistributionReply.class, DocapiCommon.WrongDistributionResponse.class)
+ .encoder((apiReply) ->
+ DocapiCommon.WrongDistributionResponse.newBuilder()
+ .setClusterState(toProtoClusterState(apiReply.getSystemState()))
+ .build())
+ .decoder(DocapiCommon.WrongDistributionResponse.getDefaultInstance(), (protoReply) ->
+ new WrongDistributionReply(fromProtoClusterState(protoReply.getClusterState())))
+ .build();
+ }
+
+ // ---------------------------------------------
+ // DocumentIgnored response (no request type)
+ // ---------------------------------------------
+
+ static RoutableFactory createDocumentIgnoredReplyFactory() {
+ return ProtobufCodecBuilder
+ .of(DocumentIgnoredReply.class, DocapiCommon.DocumentIgnoredResponse.class)
+ .encoder((apiReply) -> DocapiCommon.DocumentIgnoredResponse.newBuilder().build())
+ .decoder(DocapiCommon.DocumentIgnoredResponse.getDefaultInstance(),
+ (protoReply) -> new DocumentIgnoredReply())
+ .build();
+ }
+
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
index fa53a3f8568..73ce4dbbc68 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java
@@ -9,6 +9,11 @@ public class StatBucketReply extends DocumentReply {
super(DocumentProtocol.REPLY_STATBUCKET);
}
+ public StatBucketReply(String results) {
+ super(DocumentProtocol.REPLY_STATBUCKET);
+ this.results = results;
+ }
+
public String getResults() {
return results;
}
diff --git a/documentapi/src/protobuf/docapi_common.proto b/documentapi/src/protobuf/docapi_common.proto
new file mode 100644
index 00000000000..5f4cfeb299c
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_common.proto
@@ -0,0 +1,50 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+message BucketSpace {
+ string name = 1;
+}
+
+message BucketId {
+ fixed64 raw_id = 1;
+}
+
+message Document {
+ bytes payload = 1;
+}
+
+message DocumentId {
+ string id = 1;
+}
+
+message FieldSet {
+ string spec = 1;
+}
+
+message GlobalId {
+ // Shall always be 12 bytes (96 bits)
+ bytes raw_gid = 1;
+}
+
+message DocumentSelection {
+ string selection = 1;
+}
+
+message ClusterState {
+ string state_string = 1;
+}
+
+// Polymorphic response type shared by other responses
+message WrongDistributionResponse {
+ ClusterState cluster_state = 1;
+}
+
+// Polymorphic response type shared by other responses
+message DocumentIgnoredResponse {
+ // empty
+}
diff --git a/documentapi/src/protobuf/docapi_feed.proto b/documentapi/src/protobuf/docapi_feed.proto
new file mode 100644
index 00000000000..8d15fd9a536
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_feed.proto
@@ -0,0 +1,71 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+import "docapi_common.proto";
+
+message TestAndSetCondition {
+ string selection = 1;
+}
+
+message DocumentUpdate {
+ bytes payload = 1;
+}
+
+message GetDocumentRequest {
+ DocumentId document_id = 1;
+ FieldSet field_set = 2;
+}
+
+message GetDocumentResponse {
+ Document document = 1;
+ uint64 last_modified = 2;
+}
+
+message PutDocumentRequest {
+ // Note: document contains embedded document ID
+ Document document = 1;
+ TestAndSetCondition condition = 2;
+ bool create_if_missing = 3;
+ uint64 force_assign_timestamp = 4;
+}
+
+message PutDocumentResponse {
+ uint64 modification_timestamp = 1;
+}
+
+message UpdateDocumentRequest {
+ // Note: update contains embedded document ID
+ DocumentUpdate update = 1;
+ TestAndSetCondition condition = 2;
+ uint64 expected_old_timestamp = 3;
+ uint64 force_assign_timestamp = 4;
+}
+
+message UpdateDocumentResponse {
+ bool was_found = 1;
+ uint64 modification_timestamp = 2;
+}
+
+message RemoveDocumentRequest {
+ DocumentId document_id = 1;
+ TestAndSetCondition condition = 2;
+}
+
+message RemoveDocumentResponse {
+ bool was_found = 1;
+ uint64 modification_timestamp = 2;
+}
+
+message RemoveLocationRequest {
+ DocumentSelection selection = 1;
+ BucketSpace bucket_space = 2;
+}
+
+message RemoveLocationResponse {
+ // empty
+}
diff --git a/documentapi/src/protobuf/docapi_inspect.proto b/documentapi/src/protobuf/docapi_inspect.proto
new file mode 100644
index 00000000000..efdc8062e0a
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_inspect.proto
@@ -0,0 +1,48 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+import "docapi_common.proto";
+
+message GetBucketListRequest {
+ BucketId bucket_id = 1;
+ BucketSpace bucket_space = 2;
+}
+
+message BucketInformation {
+ BucketId bucket_id = 1;
+ string info = 2;
+}
+
+message GetBucketListResponse {
+ repeated BucketInformation bucket_info = 1;
+}
+
+message GetBucketStateRequest {
+ BucketId bucket_id = 1;
+}
+
+message DocumentState {
+ DocumentId document_id = 1;
+ GlobalId global_id = 2;
+ uint64 timestamp = 3;
+ bool is_tombstone = 4;
+}
+
+message GetBucketStateResponse {
+ repeated DocumentState states = 1;
+}
+
+message StatBucketRequest {
+ BucketId bucket_id = 1;
+ DocumentSelection selection = 2;
+ BucketSpace bucket_space = 3;
+}
+
+message StatBucketResponse {
+ string results = 1;
+}
diff --git a/documentapi/src/protobuf/docapi_visiting.proto b/documentapi/src/protobuf/docapi_visiting.proto
new file mode 100644
index 00000000000..ecf71ddab55
--- /dev/null
+++ b/documentapi/src/protobuf/docapi_visiting.proto
@@ -0,0 +1,115 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+package documentapi.protobuf;
+
+option cc_enable_arenas = true;
+option java_package = "ai.vespa.documentapi.protobuf";
+
+import "docapi_common.proto";
+
+message VisitorParameter {
+ string key = 1;
+ bytes value = 2;
+}
+
+message CreateVisitorRequest {
+ string visitor_library_name = 1;
+ string instance_id = 2;
+ string control_destination = 3;
+ string data_destination = 4;
+ DocumentSelection selection = 5;
+ uint32 max_pending_reply_count = 6;
+ BucketSpace bucket_space = 7;
+ repeated BucketId buckets = 8;
+ uint64 from_timestamp = 9;
+ uint64 to_timestamp = 10;
+ bool visit_tombstones = 11;
+ FieldSet field_set = 12;
+ bool visit_inconsistent_buckets = 13;
+ uint32 max_buckets_per_visitor = 14;
+ repeated VisitorParameter parameters = 15;
+}
+
+message VisitorStatistics {
+ uint32 buckets_visited = 1;
+ uint64 documents_visited = 2;
+ uint64 bytes_visited = 3;
+ uint64 documents_returned = 4;
+ uint64 bytes_returned = 5;
+}
+
+message CreateVisitorResponse {
+ BucketId last_bucket = 1;
+ VisitorStatistics statistics = 2;
+}
+
+message DestroyVisitorRequest {
+ string instance_id = 1;
+}
+
+message DestroyVisitorResponse {
+ // empty
+}
+
+message VisitorInfoRequest {
+ repeated BucketId finished_buckets = 1;
+ string error_message = 2;
+}
+
+message VisitorInfoResponse {
+ // empty
+}
+
+message MapVisitorRequest {
+ repeated VisitorParameter data = 1;
+}
+
+message MapVisitorResponse {
+ // empty
+}
+
+message SearchResult {
+ bytes payload = 1;
+}
+
+message DocumentSummary {
+ bytes payload = 1;
+}
+
+// We consider streaming search query-related messages to be part of the visiting family
+message QueryResultRequest {
+ SearchResult search_result = 1;
+ DocumentSummary document_summary = 2;
+}
+
+message QueryResultResponse {
+ // empty
+}
+
+// TODO deprecate, only used by "recovery visitor" (?!)
+message DocumentListRequest {
+ message Entry {
+ Document document = 1;
+ uint64 timestamp = 2;
+ bool is_tombstone = 3;
+ }
+
+ BucketId bucket_id = 1;
+ repeated Entry entries = 2;
+}
+
+// TODO deprecate
+message DocumentListResponse {
+ // TODO
+}
+
+// TODO deprecate, not sent by backend
+message EmptyBucketsRequest {
+ repeated BucketId bucket_ids = 1;
+}
+
+// TODO deprecate, not sent by backend
+message EmptyBucketsResponse {
+ // empty
+}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java
index 81904837632..42f200a0b6b 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java
@@ -1,5 +1,5 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol.test;
+package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.component.Version;
import com.yahoo.document.BucketId;
@@ -12,35 +12,6 @@ import com.yahoo.document.GlobalId;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.fieldpathupdate.RemoveFieldPathUpdate;
import com.yahoo.document.idstring.IdString;
-import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage;
-import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply;
-import com.yahoo.documentapi.messagebus.protocol.DestroyVisitorMessage;
-import com.yahoo.documentapi.messagebus.protocol.DocumentIgnoredReply;
-import com.yahoo.documentapi.messagebus.protocol.DocumentListMessage;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.documentapi.messagebus.protocol.DocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.DocumentState;
-import com.yahoo.documentapi.messagebus.protocol.EmptyBucketsMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketListMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketListReply;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketStateMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetBucketStateReply;
-import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.GetDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.MapVisitorMessage;
-import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.QueryResultMessage;
-import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.RemoveLocationMessage;
-import com.yahoo.documentapi.messagebus.protocol.StatBucketMessage;
-import com.yahoo.documentapi.messagebus.protocol.StatBucketReply;
-import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage;
-import com.yahoo.documentapi.messagebus.protocol.VisitorReply;
-import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply;
-import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import com.yahoo.messagebus.Routable;
import com.yahoo.text.Utf8;
import com.yahoo.vdslib.SearchResult;
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java
new file mode 100644
index 00000000000..f2c039af6c0
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java
@@ -0,0 +1,729 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.component.Version;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.GlobalId;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.fieldpathupdate.RemoveFieldPathUpdate;
+import com.yahoo.document.idstring.IdString;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.text.Utf8;
+import com.yahoo.vdslib.SearchResult;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class Messages80TestCase extends MessagesTestBase {
+
+ @Override
+ protected void registerTests(Map<Integer, RunnableTest> out) {
+ out.put(DocumentProtocol.MESSAGE_CREATEVISITOR, new CreateVisitorMessageTest());
+ out.put(DocumentProtocol.MESSAGE_DESTROYVISITOR, new DestroyVisitorMessageTest());
+ out.put(DocumentProtocol.MESSAGE_DOCUMENTLIST, new DocumentListMessageTest());
+ out.put(DocumentProtocol.MESSAGE_EMPTYBUCKETS, new EmptyBucketsMessageTest());
+ out.put(DocumentProtocol.MESSAGE_GETBUCKETLIST, new GetBucketListMessageTest());
+ out.put(DocumentProtocol.MESSAGE_GETBUCKETSTATE, new GetBucketStateMessageTest());
+ out.put(DocumentProtocol.MESSAGE_GETDOCUMENT, new GetDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_MAPVISITOR, new MapVisitorMessageTest());
+ out.put(DocumentProtocol.MESSAGE_PUTDOCUMENT, new PutDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_QUERYRESULT, new QueryResultMessageTest());
+ out.put(DocumentProtocol.MESSAGE_REMOVEDOCUMENT, new RemoveDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_REMOVELOCATION, new RemoveLocationMessageTest());
+ out.put(DocumentProtocol.MESSAGE_STATBUCKET, new StatBucketMessageTest());
+ out.put(DocumentProtocol.MESSAGE_UPDATEDOCUMENT, new UpdateDocumentMessageTest());
+ out.put(DocumentProtocol.MESSAGE_VISITORINFO, new VisitorInfoMessageTest());
+ out.put(DocumentProtocol.REPLY_CREATEVISITOR, new CreateVisitorReplyTest());
+ out.put(DocumentProtocol.REPLY_DESTROYVISITOR, new DestroyVisitorReplyTest());
+ out.put(DocumentProtocol.REPLY_DOCUMENTIGNORED, new DocumentIgnoredReplyTest());
+ out.put(DocumentProtocol.REPLY_DOCUMENTLIST, new DocumentListReplyTest());
+ out.put(DocumentProtocol.REPLY_EMPTYBUCKETS, new EmptyBucketsReplyTest());
+ out.put(DocumentProtocol.REPLY_GETBUCKETLIST, new GetBucketListReplyTest());
+ out.put(DocumentProtocol.REPLY_GETBUCKETSTATE, new GetBucketStateReplyTest());
+ out.put(DocumentProtocol.REPLY_GETDOCUMENT, new GetDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_MAPVISITOR, new MapVisitorReplyTest());
+ out.put(DocumentProtocol.REPLY_PUTDOCUMENT, new PutDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_QUERYRESULT, new QueryResultReplyTest());
+ out.put(DocumentProtocol.REPLY_REMOVEDOCUMENT, new RemoveDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_REMOVELOCATION, new RemoveLocationReplyTest());
+ out.put(DocumentProtocol.REPLY_STATBUCKET, new StatBucketReplyTest());
+ out.put(DocumentProtocol.REPLY_UPDATEDOCUMENT, new UpdateDocumentReplyTest());
+ out.put(DocumentProtocol.REPLY_VISITORINFO, new VisitorInfoReplyTest());
+ out.put(DocumentProtocol.REPLY_WRONGDISTRIBUTION, new WrongDistributionReplyTest());
+ }
+
+ @Override
+ protected Version version() {
+ return new Version(8, 305);
+ }
+
+ @Override
+ protected boolean shouldTestCoverage() {
+ return true;
+ }
+
+ private static void forEachLanguage(Consumer<Language> fun) {
+ for (var lang : MessagesTestBase.LANGUAGES) {
+ fun.accept(lang);
+ }
+ }
+
+ class GetDocumentMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new GetDocumentMessage(new DocumentId("id:ns:testdoc::"), "foo bar");
+ serialize("GetDocumentMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (GetDocumentMessage)deserialize("GetDocumentMessage", DocumentProtocol.MESSAGE_GETDOCUMENT, lang);
+ assertEquals("id:ns:testdoc::", msg2.getDocumentId().toString());
+ assertEquals("foo bar", msg2.getFieldSet());
+ });
+ }
+ }
+
+ class GetDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testDocumentReturnedCase();
+ testEmptyCase();
+ }
+
+ void testDocumentReturnedCase() {
+ var reply = new GetDocumentReply(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::"));
+ reply.setLastModified(1234567L);
+ serialize("GetDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (GetDocumentReply)deserialize("GetDocumentReply", DocumentProtocol.REPLY_GETDOCUMENT, lang);
+ assertEquals(1234567L, reply2.getLastModified());
+ var doc = reply2.getDocument();
+ assertNotNull(doc);
+ assertEquals("testdoc", doc.getDataType().getName());
+ assertEquals("id:ns:testdoc::", doc.getId().toString());
+ assertNotNull(doc.getLastModified());
+ assertEquals(1234567L, doc.getLastModified().longValue());
+ });
+ }
+
+ void testEmptyCase() {
+ var reply = new GetDocumentReply(null);
+ serialize("GetDocumentReply-empty", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (GetDocumentReply)deserialize("GetDocumentReply-empty", DocumentProtocol.REPLY_GETDOCUMENT, lang);
+ assertEquals(0L, reply2.getLastModified());
+ assertNull(reply2.getDocument());
+ });
+ }
+ }
+
+ private static final String CONDITION_STRING = "There's just one condition";
+
+ class PutDocumentMessageTest implements RunnableTest {
+
+ void verifyCreateIfNonExistentFlag() {
+ var msg = new PutDocumentMessage(new DocumentPut(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::")));
+ msg.setCreateIfNonExistent(true);
+ serialize("PutDocumentMessage-create", msg);
+ forEachLanguage((lang) -> {
+ var decoded = (PutDocumentMessage)deserialize("PutDocumentMessage-create", DocumentProtocol.MESSAGE_PUTDOCUMENT, lang);
+ assertTrue(decoded.getCreateIfNonExistent());
+ assertEquals(decoded.getDocumentPut(), decoded.getDocumentPut());
+ });
+ }
+
+ @Override
+ public void run() {
+ var msg = new PutDocumentMessage(new DocumentPut(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::")));
+ msg.setTimestamp(666);
+ msg.setCondition(new TestAndSetCondition(CONDITION_STRING));
+ serialize("PutDocumentMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var deserializedMsg = (PutDocumentMessage)deserialize("PutDocumentMessage", DocumentProtocol.MESSAGE_PUTDOCUMENT, lang);
+ var deserializedDoc = deserializedMsg.getDocumentPut().getDocument();
+ assertNotNull(deserializedDoc);
+ assertEquals(msg.getDocumentPut().getDocument().getDataType().getName(), deserializedDoc.getDataType().getName());
+ assertEquals(msg.getDocumentPut().getDocument().getId().toString(), deserializedDoc.getId().toString());
+ assertEquals(msg.getTimestamp(), deserializedMsg.getTimestamp());
+ assertEquals(msg.getCondition().getSelection(), deserializedMsg.getCondition().getSelection());
+ assertFalse(deserializedMsg.getCreateIfNonExistent());
+ });
+ verifyCreateIfNonExistentFlag();
+ }
+ }
+
+ class PutDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(30);
+ serialize("PutDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var obj = (WriteDocumentReply)deserialize("PutDocumentReply", DocumentProtocol.REPLY_PUTDOCUMENT, lang);
+ assertEquals(30, obj.getHighestModificationTimestamp());
+ });
+ }
+ }
+
+ class UpdateDocumentMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var docType = protocol.getDocumentTypeManager().getDocumentType("testdoc");
+ var update = new DocumentUpdate(docType, new DocumentId("id:ns:testdoc::"));
+ update.addFieldPathUpdate(new RemoveFieldPathUpdate(docType, "intfield", "testdoc.intfield > 0"));
+
+ var msg = new UpdateDocumentMessage(update);
+ msg.setNewTimestamp(777);
+ msg.setOldTimestamp(666);
+ msg.setCondition(new TestAndSetCondition(CONDITION_STRING));
+
+ serialize("UpdateDocumentMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var deserializedMsg = (UpdateDocumentMessage)deserialize("UpdateDocumentMessage", DocumentProtocol.MESSAGE_UPDATEDOCUMENT, lang);
+ assertEquals(msg.getDocumentUpdate(), deserializedMsg.getDocumentUpdate());
+ assertEquals(msg.getNewTimestamp(), deserializedMsg.getNewTimestamp());
+ assertEquals(msg.getOldTimestamp(), deserializedMsg.getOldTimestamp());
+ assertEquals(msg.getCondition().getSelection(), deserializedMsg.getCondition().getSelection());
+ });
+ }
+ }
+
+ class UpdateDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new UpdateDocumentReply();
+ reply.setHighestModificationTimestamp(30);
+ reply.setWasFound(true);
+ serialize("UpdateDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var obj = (UpdateDocumentReply)deserialize("UpdateDocumentReply", DocumentProtocol.REPLY_UPDATEDOCUMENT, lang);
+ assertNotNull(obj);
+ assertEquals(30, reply.getHighestModificationTimestamp());
+ assertTrue(obj.wasFound());
+ });
+ }
+ }
+
+ class RemoveDocumentMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new RemoveDocumentMessage(new DocumentId("id:ns:testdoc::"));
+ msg.setCondition(new TestAndSetCondition(CONDITION_STRING));
+ serialize("RemoveDocumentMessage", msg);
+ forEachLanguage((lang) -> {
+ var deserializedMsg = (RemoveDocumentMessage)deserialize("RemoveDocumentMessage", DocumentProtocol.MESSAGE_REMOVEDOCUMENT, lang);
+ assertEquals(msg.getDocumentId().toString(), deserializedMsg.getDocumentId().toString());
+ assertEquals(msg.getCondition(), deserializedMsg.getCondition());
+ });
+ }
+ }
+
+ class RemoveDocumentReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new RemoveDocumentReply();
+ reply.setHighestModificationTimestamp(30);
+ reply.setWasFound(true);
+ serialize("RemoveDocumentReply", reply);
+ forEachLanguage((lang) -> {
+ var obj = (RemoveDocumentReply)deserialize("RemoveDocumentReply", DocumentProtocol.REPLY_REMOVEDOCUMENT, lang);
+ assertNotNull(obj);
+ assertEquals(30, obj.getHighestModificationTimestamp());
+ assertTrue(obj.wasFound());
+ });
+ }
+ }
+
+ class RemoveLocationMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new RemoveLocationMessage("id.group == \"mygroup\"", "bjarne");
+ serialize("RemoveLocationMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (RemoveLocationMessage)deserialize("RemoveLocationMessage", DocumentProtocol.MESSAGE_REMOVELOCATION, lang);
+ assertEquals("id.group == \"mygroup\"", msg2.getDocumentSelection());
+ assertEquals("bjarne", msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class RemoveLocationReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testDocumentReply("RemoveLocationReply", DocumentProtocol.REPLY_REMOVELOCATION);
+ }
+ }
+
+ class CreateVisitorMessageTest implements RunnableTest {
+ private static final String BUCKET_SPACE = "bjarne";
+
+ @Override
+ public void run() {
+ var msg = new CreateVisitorMessage("SomeLibrary", "myvisitor", "newyork", "london");
+ msg.setDocumentSelection("true and false or true");
+ msg.getParameters().put("myvar", Utf8.toBytes("somevalue"));
+ msg.getParameters().put("anothervar", Utf8.toBytes("34"));
+ msg.getBuckets().add(new BucketId(16, 1234));
+ msg.setVisitRemoves(true);
+ msg.setVisitInconsistentBuckets(true);
+ msg.setFieldSet("foo bar");
+ msg.setMaxBucketsPerVisitor(2);
+ msg.setBucketSpace(BUCKET_SPACE);
+ msg.setMaxPendingReplyCount(12);
+
+ serialize("CreateVisitorMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var msg2 = (CreateVisitorMessage)deserialize("CreateVisitorMessage", DocumentProtocol.MESSAGE_CREATEVISITOR, lang);
+ assertEquals("SomeLibrary", msg2.getLibraryName());
+ assertEquals("myvisitor", msg2.getInstanceId());
+ assertEquals("newyork", msg2.getControlDestination());
+ assertEquals("london", msg2.getDataDestination());
+ assertEquals("true and false or true", msg2.getDocumentSelection());
+ assertEquals(12, msg2.getMaxPendingReplyCount());
+ assertTrue(msg2.getVisitRemoves());
+ assertEquals("foo bar", msg2.getFieldSet());
+ assertTrue(msg2.getVisitInconsistentBuckets());
+ assertEquals(1, msg2.getBuckets().size());
+ assertEquals(new BucketId(16, 1234), msg2.getBuckets().iterator().next());
+ assertEquals(2, msg2.getParameters().size());
+ assertEquals("somevalue", Utf8.toString(msg2.getParameters().get("myvar")));
+ assertEquals("34", Utf8.toString(msg2.getParameters().get("anothervar")));
+ assertEquals(2, msg2.getMaxBucketsPerVisitor());
+ assertEquals(BUCKET_SPACE, msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class CreateVisitorReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR);
+ reply.setLastBucket(new BucketId(16, 123));
+ reply.getVisitorStatistics().setBucketsVisited(3);
+ reply.getVisitorStatistics().setDocumentsVisited(1000);
+ reply.getVisitorStatistics().setBytesVisited(1024000);
+ reply.getVisitorStatistics().setDocumentsReturned(123);
+ reply.getVisitorStatistics().setBytesReturned(512000);
+
+ serialize("CreateVisitorReply", reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (CreateVisitorReply)deserialize("CreateVisitorReply", DocumentProtocol.REPLY_CREATEVISITOR, lang);
+ assertNotNull(reply2);
+ assertEquals(new BucketId(16, 123), reply2.getLastBucket());
+ assertEquals(3, reply2.getVisitorStatistics().getBucketsVisited());
+ assertEquals(1000, reply2.getVisitorStatistics().getDocumentsVisited());
+ assertEquals(1024000, reply2.getVisitorStatistics().getBytesVisited());
+ assertEquals(123, reply2.getVisitorStatistics().getDocumentsReturned());
+ assertEquals(512000, reply2.getVisitorStatistics().getBytesReturned());
+ });
+ }
+ }
+
+ class DestroyVisitorMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new DestroyVisitorMessage("myvisitor");
+ serialize("DestroyVisitorMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (DestroyVisitorMessage)deserialize("DestroyVisitorMessage", DocumentProtocol.MESSAGE_DESTROYVISITOR, lang);
+ assertEquals("myvisitor", msg2.getInstanceId());
+ });
+ }
+ }
+
+ class DestroyVisitorReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("DestroyVisitorReply", DocumentProtocol.REPLY_DESTROYVISITOR);
+ }
+ }
+
+ class MapVisitorMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new MapVisitorMessage();
+ msg.getData().put("foo", "3");
+ msg.getData().put("bar", "5");
+ serialize("MapVisitorMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (MapVisitorMessage) deserialize("MapVisitorMessage", DocumentProtocol.MESSAGE_MAPVISITOR, lang);
+ assertEquals(2, msg2.getData().size());
+ assertEquals("3", msg2.getData().get("foo"));
+ assertEquals("5", msg2.getData().get("bar"));
+ });
+ }
+ }
+
+ class MapVisitorReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("MapVisitorReply", DocumentProtocol.REPLY_MAPVISITOR);
+ }
+ }
+
+ class QueryResultMessageTest implements RunnableTest {
+ @Override
+ public void run() throws Exception {
+ test_result_with_match_features();
+
+ Routable routable = deserialize("QueryResultMessage-1", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ QueryResultMessage msg = (QueryResultMessage)routable;
+ assertEquals(0, msg.getResult().getHitCount());
+
+ routable = deserialize("QueryResultMessage-2", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ msg = (QueryResultMessage)routable;
+ assertEquals(2, msg.getResult().getHitCount());
+ com.yahoo.vdslib.SearchResult.Hit h = msg.getResult().getHit(0);
+ assertEquals(89.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+ h = msg.getResult().getHit(1);
+ assertEquals(109.0, h.getRank(), 1E-6);
+ assertEquals("doc17", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+
+ routable = deserialize("QueryResultMessage-3", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ msg = (QueryResultMessage)routable;
+ assertEquals(2, msg.getResult().getHitCount());
+ h = msg.getResult().getHit(0);
+ assertEquals(109.0, h.getRank(), 1E-6);
+ assertEquals("doc17", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+ h = msg.getResult().getHit(1);
+ assertEquals(89.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ assertFalse(h.getMatchFeatures().isPresent());
+
+ routable = deserialize("QueryResultMessage-4", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ msg = (QueryResultMessage)routable;
+ assertEquals(3, msg.getResult().getHitCount());
+ h = msg.getResult().getHit(0);
+ assertTrue(h instanceof SearchResult.HitWithSortBlob);
+ assertEquals(89.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ byte[] b = ((SearchResult.HitWithSortBlob)h).getSortBlob();
+ assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '2' }, b);
+
+ h = msg.getResult().getHit(1);
+ assertTrue(h instanceof SearchResult.HitWithSortBlob);
+ assertEquals(109.0, h.getRank(), 1E-6);
+ assertEquals("doc17", h.getDocId());
+ b = ((SearchResult.HitWithSortBlob)h).getSortBlob();
+ assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '1' }, b);
+
+ h = msg.getResult().getHit(2);
+ assertTrue(h instanceof SearchResult.HitWithSortBlob);
+ assertEquals(90.0, h.getRank(), 1E-6);
+ assertEquals("doc18", h.getDocId());
+ b = ((SearchResult.HitWithSortBlob)h).getSortBlob();
+ assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '3' }, b);
+ }
+
+ void assertEqualsData(byte[] exp, byte[] act) {
+ assertEquals(exp.length, act.length);
+ for (int i = 0; i < exp.length; ++i) {
+ assertEquals(exp[i], act[i]);
+ }
+ }
+
+ void test_result_with_match_features() {
+ Routable routable = deserialize("QueryResultMessage-6", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP);
+ assertTrue(routable instanceof QueryResultMessage);
+
+ var msg = (QueryResultMessage)routable;
+ assertEquals(2, msg.getResult().getHitCount());
+
+ var h = msg.getResult().getHit(0);
+ assertTrue(h instanceof SearchResult.Hit);
+ assertEquals(7.0, h.getRank(), 1E-6);
+ assertEquals("doc2", h.getDocId());
+ assertTrue(h.getMatchFeatures().isPresent());
+ var mf = h.getMatchFeatures().get();
+ assertEquals(12.0, mf.field("foo").asDouble(), 1E-6);
+ assertEqualsData(new byte[] { 'T', 'h', 'e', 'r', 'e' }, mf.field("bar").asData());
+
+ h = msg.getResult().getHit(1);
+ assertTrue(h instanceof SearchResult.Hit);
+ assertEquals(5.0, h.getRank(), 1E-6);
+ assertEquals("doc1", h.getDocId());
+ assertTrue(h.getMatchFeatures().isPresent());
+ mf = h.getMatchFeatures().get();
+ assertEquals(1.0, mf.field("foo").asDouble(), 1E-6);
+ assertEqualsData(new byte[] { 'H', 'i' }, mf.field("bar").asData());
+ }
+ }
+
+ class QueryResultReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("QueryResultReply", DocumentProtocol.REPLY_QUERYRESULT);
+ }
+ }
+
+ class VisitorInfoMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new VisitorInfoMessage();
+ msg.getFinishedBuckets().add(new BucketId(16, 1));
+ msg.getFinishedBuckets().add(new BucketId(16, 2));
+ msg.getFinishedBuckets().add(new BucketId(16, 4));
+ msg.setErrorMessage("error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6");
+
+ serialize("VisitorInfoMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var msg2 = (VisitorInfoMessage)deserialize("VisitorInfoMessage", DocumentProtocol.MESSAGE_VISITORINFO, lang);
+ assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 1)));
+ assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 2)));
+ assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 4)));
+ assertEquals("error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6", msg2.getErrorMessage());
+ });
+ }
+ }
+
+ class VisitorInfoReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("VisitorInfoReply", DocumentProtocol.REPLY_VISITORINFO);
+ }
+ }
+
+ class DocumentListMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new DocumentListMessage();
+ msg.setBucketId(new BucketId(17, 1234));
+ var doc = new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:scheme:testdoc:n=1234:1");
+ msg.getDocuments().add(new DocumentListEntry(doc, 1234, true));
+
+ serialize("DocumentListMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (DocumentListMessage) deserialize("DocumentListMessage", DocumentProtocol.MESSAGE_DOCUMENTLIST, lang);
+ assertEquals(new BucketId(17, 1234), msg2.getBucketId());
+ assertEquals(1, msg2.getDocuments().size());
+ var entry = msg2.getDocuments().get(0);
+ assertEquals("id:scheme:testdoc:n=1234:1", entry.getDocument().getId().toString());
+ assertEquals(1234, entry.getTimestamp());
+ assertTrue(entry.isRemoveEntry());
+ });
+ }
+ }
+
+ class DocumentListReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("DocumentListReply", DocumentProtocol.REPLY_DOCUMENTLIST);
+ }
+ }
+
+ class EmptyBucketsMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var bids = new ArrayList<BucketId>();
+ for (int i = 0; i < 13; ++i) {
+ bids.add(new BucketId(16, i));
+ }
+ var ebm = new EmptyBucketsMessage(bids);
+ serialize("EmptyBucketsMessage", ebm);
+ forEachLanguage((lang) -> {
+ var ebm2 = (EmptyBucketsMessage)deserialize("EmptyBucketsMessage", DocumentProtocol.MESSAGE_EMPTYBUCKETS, lang);
+ assertEquals(13, ebm2.getBucketIds().size());
+ for (int i = 0; i < 13; ++i) {
+ assertEquals(new BucketId(16, i), ebm2.getBucketIds().get(i));
+ }
+ });
+ }
+ }
+
+ class EmptyBucketsReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ testVisitorReply("EmptyBucketsReply", DocumentProtocol.REPLY_EMPTYBUCKETS);
+ }
+ }
+
+ class GetBucketListMessageTest implements RunnableTest {
+ private static final String BUCKET_SPACE = "beartato";
+
+ @Override
+ public void run() {
+ var msg = new GetBucketListMessage(new BucketId(16, 123));
+ msg.setBucketSpace(BUCKET_SPACE);
+ serialize("GetBucketListMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (GetBucketListMessage)deserialize("GetBucketListMessage", DocumentProtocol.MESSAGE_GETBUCKETLIST, lang);
+ assertEquals(new BucketId(16, 123), msg2.getBucketId());
+ assertEquals(BUCKET_SPACE, msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class GetBucketListReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new GetBucketListReply();
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(16, 123), "foo"));
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(17, 1123), "bar"));
+ reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(18, 11123), "zoink"));
+
+ serialize("GetBucketListReply", reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (GetBucketListReply)deserialize("GetBucketListReply", DocumentProtocol.REPLY_GETBUCKETLIST, lang);
+ assertEquals(3, reply2.getBuckets().size());
+ assertEquals(reply2.getBuckets().get(0), new GetBucketListReply.BucketInfo(new BucketId(16, 123), "foo"));
+ assertEquals(reply2.getBuckets().get(1), new GetBucketListReply.BucketInfo(new BucketId(17, 1123), "bar"));
+ assertEquals(reply2.getBuckets().get(2), new GetBucketListReply.BucketInfo(new BucketId(18, 11123), "zoink"));
+ });
+ }
+ }
+
+ class GetBucketStateMessageTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new GetBucketStateMessage(new BucketId(16, 666));
+ serialize("GetBucketStateMessage", msg);
+
+ forEachLanguage((lang) -> {
+ var msg2 = (GetBucketStateMessage)deserialize("GetBucketStateMessage", DocumentProtocol.MESSAGE_GETBUCKETSTATE, lang);
+ assertEquals(16, msg2.getBucketId().getUsedBits());
+ assertEquals(4611686018427388570L, msg2.getBucketId().getId());
+ });
+ }
+ }
+
+ class GetBucketStateReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var foo = new GlobalId(IdString.createIdString("id:ns:testdoc::foo"));
+ var bar = new GlobalId(IdString.createIdString("id:ns:testdoc::bar"));
+ var baz = new DocumentId("id:ns:testdoc::baz");
+
+ var reply = new GetBucketStateReply();
+ var state = new ArrayList<DocumentState>(3);
+ state.add(new DocumentState(foo, 777, false));
+ state.add(new DocumentState(bar, 888, true));
+ state.add(new DocumentState(baz, 999, false));
+ reply.setBucketState(state);
+
+ serialize("GetBucketStateReply", reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (GetBucketStateReply)deserialize("GetBucketStateReply", DocumentProtocol.REPLY_GETBUCKETSTATE, lang);
+ assertEquals(3, reply2.getBucketState().size());
+
+ assertEquals(777, reply2.getBucketState().get(0).getTimestamp());
+ assertEquals(foo, reply2.getBucketState().get(0).getGid());
+ assertFalse(reply2.getBucketState().get(0).hasDocId());
+ assertFalse(reply2.getBucketState().get(0).isRemoveEntry());
+
+ assertEquals(888, reply2.getBucketState().get(1).getTimestamp());
+ assertEquals(bar, reply2.getBucketState().get(1).getGid());
+ assertFalse(reply2.getBucketState().get(1).hasDocId());
+ assertTrue(reply2.getBucketState().get(1).isRemoveEntry());
+
+ assertEquals(999, reply2.getBucketState().get(2).getTimestamp());
+ assertTrue(reply2.getBucketState().get(2).hasDocId());
+ assertEquals(new GlobalId(baz.getGlobalId()), reply2.getBucketState().get(2).getGid());
+ assertEquals(baz, reply2.getBucketState().get(2).getDocId());
+ assertFalse(reply2.getBucketState().get(2).isRemoveEntry());
+ });
+ }
+ }
+
+ class StatBucketMessageTest implements RunnableTest {
+ private static final String BUCKET_SPACE = "andrei";
+
+ @Override
+ public void run() {
+ var msg = new StatBucketMessage(new BucketId(16, 123), "id.user=123");
+ msg.setBucketSpace(BUCKET_SPACE);
+ serialize("StatBucketMessage", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (StatBucketMessage)deserialize("StatBucketMessage", DocumentProtocol.MESSAGE_STATBUCKET, lang);
+ assertEquals(new BucketId(16, 123), msg2.getBucketId());
+ assertEquals("id.user=123", msg2.getDocumentSelection());
+ assertEquals(BUCKET_SPACE, msg2.getBucketSpace());
+ });
+ }
+ }
+
+ class StatBucketReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var msg = new StatBucketReply();
+ msg.setResults("These are the votes of the Norwegian jury");
+ serialize("StatBucketReply", msg);
+ forEachLanguage((lang) -> {
+ var msg2 = (StatBucketReply)deserialize("StatBucketReply", DocumentProtocol.REPLY_STATBUCKET, lang);
+ assertEquals("These are the votes of the Norwegian jury", msg2.getResults());
+ });
+ }
+ }
+
+ class WrongDistributionReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new WrongDistributionReply("distributor:3 storage:2");
+ serialize("WrongDistributionReply", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (WrongDistributionReply)deserialize("WrongDistributionReply", DocumentProtocol.REPLY_WRONGDISTRIBUTION, lang);
+ assertEquals("distributor:3 storage:2", reply2.getSystemState());
+ });
+ }
+ }
+
+ class DocumentIgnoredReplyTest implements RunnableTest {
+ @Override
+ public void run() {
+ var reply = new DocumentIgnoredReply();
+ serialize("DocumentIgnoredReply", reply);
+ forEachLanguage((lang) -> {
+ var reply2 = (DocumentIgnoredReply)deserialize("DocumentIgnoredReply", DocumentProtocol.REPLY_DOCUMENTIGNORED, lang);
+ assertNotNull(reply2);
+ });
+ }
+ }
+
+ private void testDocumentReply(String filename, int type) {
+ var reply = new DocumentReply(type);
+ serialize(filename, reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (DocumentReply)deserialize(filename, type, lang);
+ assertNotNull(reply2);
+ });
+ }
+
+ private void testVisitorReply(String filename, int type) {
+ VisitorReply reply = new VisitorReply(type);
+ serialize(filename, reply);
+
+ forEachLanguage((lang) -> {
+ var reply2 = (VisitorReply)deserialize(filename, type, lang);
+ assertNotNull(reply2);
+ });
+ }
+
+}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java
index f15b0fe3995..16938ab843a 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java
@@ -1,10 +1,11 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol.test;
+package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.component.Version;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentTypeManagerConfigurer;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.test.TestFileUtil;
import com.yahoo.messagebus.Routable;
import org.junit.Test;
@@ -20,7 +21,7 @@ import static org.junit.Assert.*;
*/
public abstract class MessagesTestBase {
- protected enum Language {
+ public enum Language {
JAVA,
CPP
}
diff --git a/documentapi/src/tests/messages/CMakeLists.txt b/documentapi/src/tests/messages/CMakeLists.txt
index dec61432e4b..3428c34786c 100644
--- a/documentapi/src/tests/messages/CMakeLists.txt
+++ b/documentapi/src/tests/messages/CMakeLists.txt
@@ -8,6 +8,16 @@ vespa_add_executable(documentapi_messages60_test_app TEST
documentapi
)
vespa_add_test(NAME documentapi_messages60_test_app COMMAND documentapi_messages60_test_app)
+
+vespa_add_executable(documentapi_messages80_test_app TEST
+ SOURCES
+ testbase.cpp
+ messages80test.cpp
+ DEPENDS
+ documentapi
+)
+vespa_add_test(NAME documentapi_messages80_test_app COMMAND documentapi_messages80_test_app)
+
vespa_add_executable(documentapi_error_codes_test_app_app TEST
SOURCES
error_codes_test.cpp
diff --git a/documentapi/src/tests/messages/messages60test.cpp b/documentapi/src/tests/messages/messages60test.cpp
index 99ecb3644a5..281e1123e54 100644
--- a/documentapi/src/tests/messages/messages60test.cpp
+++ b/documentapi/src/tests/messages/messages60test.cpp
@@ -751,7 +751,7 @@ Messages60Test::testVisitorInfoMessage()
bool
Messages60Test::testDestroyVisitorReply()
{
- return tryDocumentReply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR);
+ return tryVisitorReply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR);
}
bool
@@ -922,23 +922,6 @@ Messages60Test::testRemoveLocationReply()
////////////////////////////////////////////////////////////////////////////////
bool
-Messages60Test::tryDocumentReply(const string &filename, uint32_t type)
-{
- DocumentReply tmp(type);
-
- EXPECT_EQUAL((uint32_t)5, serialize(filename, tmp));
-
- for (uint32_t lang = 0; lang < NUM_LANGUAGES; ++lang) {
- mbus::Routable::UP obj = deserialize(filename, type, lang);
- if (EXPECT_TRUE(obj)) {
- DocumentReply *ref = dynamic_cast<DocumentReply*>(obj.get());
- EXPECT_TRUE(ref != NULL);
- }
- }
- return true;
-}
-
-bool
Messages60Test::tryVisitorReply(const string &filename, uint32_t type)
{
VisitorReply tmp(type);
diff --git a/documentapi/src/tests/messages/messages60test.h b/documentapi/src/tests/messages/messages60test.h
index 88bc88097eb..d1060a83962 100644
--- a/documentapi/src/tests/messages/messages60test.h
+++ b/documentapi/src/tests/messages/messages60test.h
@@ -6,9 +6,9 @@
class Messages60Test : public TestBase {
protected:
- const vespalib::Version getVersion() const override { return vespalib::Version(6, 221); }
+ vespalib::Version getVersion() const override { return vespalib::Version(6, 221); }
bool shouldTestCoverage() const override { return true; }
- bool tryDocumentReply(const string &filename, uint32_t type);
+
bool tryVisitorReply(const string &filename, uint32_t type);
static size_t serializedLength(const string & str) { return sizeof(int32_t) + str.size(); }
@@ -23,7 +23,6 @@ public:
bool testDocumentIgnoredReply();
bool testDocumentListMessage();
bool testDocumentListReply();
- bool testDocumentSummaryMessage();
bool testEmptyBucketsMessage();
bool testEmptyBucketsReply();
bool testGetBucketListMessage();
@@ -42,7 +41,6 @@ public:
bool testRemoveDocumentReply();
bool testRemoveLocationMessage();
bool testRemoveLocationReply();
- bool testSearchResultMessage();
bool testStatBucketMessage();
bool testStatBucketReply();
bool testUpdateDocumentMessage();
diff --git a/documentapi/src/tests/messages/messages80test.cpp b/documentapi/src/tests/messages/messages80test.cpp
new file mode 100644
index 00000000000..9b97f332318
--- /dev/null
+++ b/documentapi/src/tests/messages/messages80test.cpp
@@ -0,0 +1,908 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "testbase.h"
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/datatype/documenttype.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/select/parser.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/update/fieldpathupdates.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/vespalib/test/insertion_operators.h>
+#include <vespa/vespalib/util/featureset.h>
+#include <array>
+
+using document::DataType;
+using document::DocumentTypeRepo;
+using vespalib::FeatureValues;
+
+// TODO rewrite to GTest!
+class Messages80Test : public TestBase {
+protected:
+ vespalib::Version getVersion() const override {
+ // Must be as high--or higher--than the v8 protocol version specified in documentprocotol.cpp
+ // (and equal to its corresponding value in the Java implementation).
+ return {8, 305};
+ }
+ bool shouldTestCoverage() const override { return true; }
+
+ bool try_visitor_reply(const string& filename, uint32_t type);
+
+ static constexpr std::array<uint32_t, 2> languages() noexcept {
+ return {TestBase::LANG_CPP, TestBase::LANG_JAVA};
+ }
+
+public:
+ Messages80Test();
+ ~Messages80Test() override = default;
+
+ bool test_create_visitor_message();
+ bool test_create_visitor_reply();
+ bool test_destroy_visitor_message();
+ bool test_destroy_visitor_reply();
+ bool test_document_ignored_reply();
+ bool test_document_list_message();
+ bool test_document_list_reply();
+ bool test_empty_buckets_message();
+ bool test_empty_buckets_reply();
+ bool test_get_bucket_list_message();
+ bool test_get_bucket_list_reply();
+ bool test_get_bucket_state_message();
+ bool test_get_bucket_state_reply();
+ bool test_get_document_message();
+ bool test_get_document_reply();
+ bool test_map_visitor_message();
+ bool test_map_visitor_reply();
+ bool test_put_document_message();
+ bool test_put_document_reply();
+ bool test_query_result_message();
+ bool test_query_result_reply();
+ bool test_remove_document_message();
+ bool test_remove_document_reply();
+ bool test_remove_location_message();
+ bool test_remove_location_reply();
+ bool test_stat_bucket_message();
+ bool test_stat_bucket_reply();
+ bool test_update_document_message();
+ bool test_update_document_reply();
+ bool test_visitor_info_message();
+ bool test_visitor_info_reply();
+ bool test_wrong_distribution_reply();
+
+ void do_test_get_reply_with_doc();
+ void do_test_empty_get_reply();
+};
+
+namespace {
+
+std::vector<char> doc1_mf_data{'H', 'i'};
+std::vector<char> doc2_mf_data{'T', 'h', 'e', 'r', 'e'};
+
+}
+
+Messages80Test::Messages80Test() {
+ putTest(DocumentProtocol::MESSAGE_CREATEVISITOR, TEST_METHOD(Messages80Test::test_create_visitor_message));
+ putTest(DocumentProtocol::MESSAGE_DESTROYVISITOR, TEST_METHOD(Messages80Test::test_destroy_visitor_message));
+ putTest(DocumentProtocol::MESSAGE_DOCUMENTLIST, TEST_METHOD(Messages80Test::test_document_list_message));
+ putTest(DocumentProtocol::MESSAGE_EMPTYBUCKETS, TEST_METHOD(Messages80Test::test_empty_buckets_message));
+ putTest(DocumentProtocol::MESSAGE_GETBUCKETLIST, TEST_METHOD(Messages80Test::test_get_bucket_list_message));
+ putTest(DocumentProtocol::MESSAGE_GETBUCKETSTATE, TEST_METHOD(Messages80Test::test_get_bucket_state_message));
+ putTest(DocumentProtocol::MESSAGE_GETDOCUMENT, TEST_METHOD(Messages80Test::test_get_document_message));
+ putTest(DocumentProtocol::MESSAGE_MAPVISITOR, TEST_METHOD(Messages80Test::test_map_visitor_message));
+ putTest(DocumentProtocol::MESSAGE_PUTDOCUMENT, TEST_METHOD(Messages80Test::test_put_document_message));
+ putTest(DocumentProtocol::MESSAGE_QUERYRESULT, TEST_METHOD(Messages80Test::test_query_result_message));
+ putTest(DocumentProtocol::MESSAGE_REMOVEDOCUMENT, TEST_METHOD(Messages80Test::test_remove_document_message));
+ putTest(DocumentProtocol::MESSAGE_REMOVELOCATION, TEST_METHOD(Messages80Test::test_remove_location_message));
+ putTest(DocumentProtocol::MESSAGE_STATBUCKET, TEST_METHOD(Messages80Test::test_stat_bucket_message));
+ putTest(DocumentProtocol::MESSAGE_UPDATEDOCUMENT, TEST_METHOD(Messages80Test::test_update_document_message));
+ putTest(DocumentProtocol::MESSAGE_VISITORINFO, TEST_METHOD(Messages80Test::test_visitor_info_message));
+
+ putTest(DocumentProtocol::REPLY_CREATEVISITOR, TEST_METHOD(Messages80Test::test_create_visitor_reply));
+ putTest(DocumentProtocol::REPLY_DESTROYVISITOR, TEST_METHOD(Messages80Test::test_destroy_visitor_reply));
+ putTest(DocumentProtocol::REPLY_DOCUMENTIGNORED, TEST_METHOD(Messages80Test::test_document_ignored_reply));
+ putTest(DocumentProtocol::REPLY_DOCUMENTLIST, TEST_METHOD(Messages80Test::test_document_list_reply));
+ putTest(DocumentProtocol::REPLY_EMPTYBUCKETS, TEST_METHOD(Messages80Test::test_empty_buckets_reply));
+ putTest(DocumentProtocol::REPLY_GETBUCKETLIST, TEST_METHOD(Messages80Test::test_get_bucket_list_reply));
+ putTest(DocumentProtocol::REPLY_GETBUCKETSTATE, TEST_METHOD(Messages80Test::test_get_bucket_state_reply));
+ putTest(DocumentProtocol::REPLY_GETDOCUMENT, TEST_METHOD(Messages80Test::test_get_document_reply));
+ putTest(DocumentProtocol::REPLY_MAPVISITOR, TEST_METHOD(Messages80Test::test_map_visitor_reply));
+ putTest(DocumentProtocol::REPLY_PUTDOCUMENT, TEST_METHOD(Messages80Test::test_put_document_reply));
+ putTest(DocumentProtocol::REPLY_QUERYRESULT, TEST_METHOD(Messages80Test::test_query_result_reply));
+ putTest(DocumentProtocol::REPLY_REMOVEDOCUMENT, TEST_METHOD(Messages80Test::test_remove_document_reply));
+ putTest(DocumentProtocol::REPLY_REMOVELOCATION, TEST_METHOD(Messages80Test::test_remove_location_reply));
+ putTest(DocumentProtocol::REPLY_STATBUCKET, TEST_METHOD(Messages80Test::test_stat_bucket_reply));
+ putTest(DocumentProtocol::REPLY_UPDATEDOCUMENT, TEST_METHOD(Messages80Test::test_update_document_reply));
+ putTest(DocumentProtocol::REPLY_VISITORINFO, TEST_METHOD(Messages80Test::test_visitor_info_reply));
+ putTest(DocumentProtocol::REPLY_WRONGDISTRIBUTION, TEST_METHOD(Messages80Test::test_wrong_distribution_reply));
+}
+
+namespace {
+
+document::Document::SP
+createDoc(const DocumentTypeRepo& repo, const string& type_name, const string& id) {
+ return std::make_shared<document::Document>(repo, *repo.getDocumentType(type_name), document::DocumentId(id));
+}
+
+}
+
+bool Messages80Test::test_get_document_message() {
+ GetDocumentMessage tmp(document::DocumentId("id:ns:testdoc::"), "foo bar");
+ EXPECT_EQUAL(280u, sizeof(GetDocumentMessage)); // FIXME doesn't belong here
+ serialize("GetDocumentMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetDocumentMessage", DocumentProtocol::MESSAGE_GETDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetDocumentMessage&>(*obj);
+ EXPECT_EQUAL(ref.getDocumentId().toString(), "id:ns:testdoc::");
+ EXPECT_EQUAL(ref.getFieldSet(), "foo bar");
+ }
+ }
+ return true;
+}
+
+void Messages80Test::do_test_get_reply_with_doc() {
+ auto doc = createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::");
+ GetDocumentReply tmp(doc);
+ tmp.setLastModified(1234567);
+
+ EXPECT_EQUAL(128u, sizeof(GetDocumentReply)); // FIXME doesn't belong here!
+ serialize("GetDocumentReply", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetDocumentReply", DocumentProtocol::REPLY_GETDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getLastModified(), 1234567ULL); // FIXME signed vs. unsigned... -_-
+ ASSERT_TRUE(ref.hasDocument());
+ auto& doc2 = ref.getDocument();
+ EXPECT_EQUAL(doc2.getType().getName(), "testdoc");
+ EXPECT_EQUAL(doc2.getId().toString(), "id:ns:testdoc::");
+ EXPECT_EQUAL(doc2.getLastModified(), 1234567LL); // FIXME signed vs. unsigned... -_-
+ }
+ }
+}
+
+void Messages80Test::do_test_empty_get_reply() {
+ GetDocumentReply tmp;
+ serialize("GetDocumentReply-empty", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetDocumentReply-empty", DocumentProtocol::REPLY_GETDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getLastModified(), 0ULL);
+ EXPECT_FALSE(ref.hasDocument());
+ }
+ }
+}
+
+bool Messages80Test::test_get_document_reply() {
+ TEST_DO(do_test_get_reply_with_doc());
+ TEST_DO(do_test_empty_get_reply());
+ return true;
+}
+
+bool Messages80Test::test_put_document_message() {
+ auto doc = createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::");
+ PutDocumentMessage msg(doc);
+
+ msg.setTimestamp(666);
+ msg.setCondition(TestAndSetCondition("There's just one condition"));
+
+ // FIXME these don't belong here!
+ EXPECT_EQUAL(64u, sizeof(vespalib::string));
+ EXPECT_EQUAL(sizeof(vespalib::string), sizeof(TestAndSetCondition));
+ EXPECT_EQUAL(112u, sizeof(DocumentMessage));
+ EXPECT_EQUAL(sizeof(TestAndSetCondition) + sizeof(DocumentMessage), sizeof(TestAndSetMessage));
+ EXPECT_EQUAL(sizeof(TestAndSetMessage) + 32, sizeof(PutDocumentMessage));
+
+ serialize("PutDocumentMessage", msg);
+
+ for (auto lang : languages()) {
+ auto routableUp = deserialize("PutDocumentMessage", DocumentProtocol::MESSAGE_PUTDOCUMENT, lang);
+ if (EXPECT_TRUE(routableUp)) {
+ auto& deserializedMsg = dynamic_cast<PutDocumentMessage &>(*routableUp);
+
+ EXPECT_EQUAL(deserializedMsg.getDocument().getType().getName(), msg.getDocument().getType().getName());
+ EXPECT_EQUAL(deserializedMsg.getDocument().getId().toString(), msg.getDocument().getId().toString());
+ EXPECT_EQUAL(deserializedMsg.getTimestamp(), msg.getTimestamp());
+ EXPECT_GREATER(deserializedMsg.getApproxSize(), 0u);
+ EXPECT_EQUAL(deserializedMsg.getCondition().getSelection(), msg.getCondition().getSelection());
+ EXPECT_FALSE(deserializedMsg.get_create_if_non_existent());
+ }
+ }
+
+ //-------------------------------------------------------------------------
+
+ PutDocumentMessage msg2(createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::"));
+ msg2.set_create_if_non_existent(true);
+ serialize("PutDocumentMessage-create", msg2);
+ for (auto lang : languages()) {
+ auto obj = deserialize("PutDocumentMessage-create", DocumentProtocol::MESSAGE_PUTDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& decoded = dynamic_cast<PutDocumentMessage&>(*obj);
+ EXPECT_TRUE(decoded.get_create_if_non_existent());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_put_document_reply() {
+ WriteDocumentReply reply(DocumentProtocol::REPLY_PUTDOCUMENT);
+ reply.setHighestModificationTimestamp(30);
+
+ serialize("PutDocumentReply", reply);
+ EXPECT_EQUAL(sizeof(WriteDocumentReply), 112u); // FIXME doesn't belong here!
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("PutDocumentReply", DocumentProtocol::REPLY_PUTDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<WriteDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_update_document_message() {
+ const DocumentTypeRepo& repo = getTypeRepo();
+ const document::DocumentType& docType = *repo.getDocumentType("testdoc");
+
+ auto doc_update = std::make_shared<document::DocumentUpdate>(repo, docType, document::DocumentId("id:ns:testdoc::"));
+ doc_update->addFieldPathUpdate(std::make_unique<document::RemoveFieldPathUpdate>("intfield", "testdoc.intfield > 0"));
+
+ UpdateDocumentMessage msg(std::move(doc_update));
+ msg.setOldTimestamp(666u);
+ msg.setNewTimestamp(777u);
+ msg.setCondition(TestAndSetCondition("There's just one condition"));
+
+ EXPECT_EQUAL(sizeof(TestAndSetMessage) + 32, sizeof(UpdateDocumentMessage)); // FIXME doesn't belong here!
+ serialize("UpdateDocumentMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("UpdateDocumentMessage", DocumentProtocol::MESSAGE_UPDATEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& decoded = dynamic_cast<UpdateDocumentMessage&>(*obj);
+ EXPECT_EQUAL(decoded.getDocumentUpdate(), msg.getDocumentUpdate());
+ EXPECT_EQUAL(decoded.getOldTimestamp(), msg.getOldTimestamp());
+ EXPECT_EQUAL(decoded.getNewTimestamp(), msg.getNewTimestamp());
+ EXPECT_GREATER(decoded.getApproxSize(), 0u); // Actual value depends on protobuf size
+ EXPECT_EQUAL(decoded.getCondition().getSelection(), msg.getCondition().getSelection());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_update_document_reply() {
+ UpdateDocumentReply reply;
+ reply.setWasFound(true);
+ reply.setHighestModificationTimestamp(30);
+
+ serialize("UpdateDocumentReply", reply);
+ EXPECT_EQUAL(120u, sizeof(UpdateDocumentReply)); // FIXME doesn't belong here!
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("UpdateDocumentReply", DocumentProtocol::REPLY_UPDATEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<UpdateDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u);
+ EXPECT_TRUE(ref.wasFound());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_document_message() {
+ RemoveDocumentMessage msg(document::DocumentId("id:ns:testdoc::"));
+ msg.setCondition(TestAndSetCondition("There's just one condition"));
+
+ EXPECT_EQUAL(sizeof(TestAndSetMessage) + 104, sizeof(RemoveDocumentMessage)); // FIXME doesn't belong here!
+ serialize("RemoveDocumentMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveDocumentMessage", DocumentProtocol::MESSAGE_REMOVEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<RemoveDocumentMessage &>(*obj);
+ EXPECT_EQUAL(ref.getDocumentId().toString(), "id:ns:testdoc::");
+ EXPECT_EQUAL(ref.getCondition().getSelection(), msg.getCondition().getSelection());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_document_reply() {
+ RemoveDocumentReply reply;
+ std::vector<uint64_t> ts;
+ reply.setWasFound(true);
+ reply.setHighestModificationTimestamp(30);
+ EXPECT_EQUAL(120u, sizeof(RemoveDocumentReply)); // FIXME doesn't belong here!
+
+ serialize("RemoveDocumentReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveDocumentReply", DocumentProtocol::REPLY_REMOVEDOCUMENT, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<RemoveDocumentReply&>(*obj);
+ EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u);
+ EXPECT_TRUE(ref.wasFound());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_location_message() {
+ document::BucketIdFactory factory;
+ document::select::Parser parser(getTypeRepo(), factory);
+ RemoveLocationMessage msg(factory, parser, "id.group == \"mygroup\"");
+ msg.setBucketSpace("bjarne");
+ serialize("RemoveLocationMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveLocationMessage", DocumentProtocol::MESSAGE_REMOVELOCATION, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<RemoveLocationMessage&>(*obj);
+ EXPECT_EQUAL(ref.getDocumentSelection(), "id.group == \"mygroup\"");
+ EXPECT_EQUAL(ref.getBucketSpace(), "bjarne");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_remove_location_reply() {
+ DocumentReply tmp(DocumentProtocol::REPLY_REMOVELOCATION);
+ serialize("RemoveLocationReply", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("RemoveLocationReply", DocumentProtocol::REPLY_REMOVELOCATION, lang);
+ EXPECT_TRUE(obj);
+ }
+ return true;
+}
+
+bool Messages80Test::test_create_visitor_message() {
+ CreateVisitorMessage tmp("SomeLibrary", "myvisitor", "newyork", "london");
+ tmp.setDocumentSelection("true and false or true");
+ tmp.getParameters().set("myvar", "somevalue");
+ tmp.getParameters().set("anothervar", uint64_t(34));
+ tmp.getBuckets().emplace_back(16, 1234);
+ tmp.setVisitRemoves(true);
+ tmp.setVisitInconsistentBuckets(true);
+ tmp.setFieldSet("foo bar");
+ tmp.setMaxBucketsPerVisitor(2);
+ tmp.setMaximumPendingReplyCount(12);
+ tmp.setBucketSpace("bjarne");
+
+ serialize("CreateVisitorMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("CreateVisitorMessage", DocumentProtocol::MESSAGE_CREATEVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<CreateVisitorMessage&>(*obj);
+
+ EXPECT_EQUAL(ref.getLibraryName(), "SomeLibrary");
+ EXPECT_EQUAL(ref.getInstanceId(), "myvisitor");
+ EXPECT_EQUAL(ref.getControlDestination(), "newyork");
+ EXPECT_EQUAL(ref.getDataDestination(), "london");
+ EXPECT_EQUAL(ref.getDocumentSelection(), "true and false or true");
+ EXPECT_EQUAL(ref.getFieldSet(), "foo bar");
+ EXPECT_EQUAL(ref.getMaximumPendingReplyCount(), uint32_t(12));
+ EXPECT_TRUE(ref.visitRemoves());
+ EXPECT_TRUE(ref.visitInconsistentBuckets());
+ ASSERT_EQUAL(ref.getBuckets().size(), size_t(1));
+ EXPECT_EQUAL(ref.getBuckets()[0], document::BucketId(16, 1234));
+ EXPECT_EQUAL(ref.getParameters().get("myvar"), "somevalue");
+ EXPECT_EQUAL(ref.getParameters().get("anothervar", uint64_t(1)), uint64_t(34));
+ EXPECT_EQUAL(ref.getMaxBucketsPerVisitor(), uint32_t(2));
+ EXPECT_EQUAL(ref.getBucketSpace(), "bjarne");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_create_visitor_reply() {
+ CreateVisitorReply reply(DocumentProtocol::REPLY_CREATEVISITOR);
+ reply.setLastBucket(document::BucketId(16, 123));
+ vdslib::VisitorStatistics vs;
+ vs.setBucketsVisited(3);
+ vs.setDocumentsVisited(1000);
+ vs.setBytesVisited(1024000);
+ vs.setDocumentsReturned(123);
+ vs.setBytesReturned(512000);
+ reply.setVisitorStatistics(vs);
+
+ serialize("CreateVisitorReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("CreateVisitorReply", DocumentProtocol::REPLY_CREATEVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<CreateVisitorReply&>(*obj);
+ EXPECT_EQUAL(ref.getLastBucket(), document::BucketId(16, 123));
+ EXPECT_EQUAL(ref.getVisitorStatistics().getBucketsVisited(), (uint32_t)3);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getDocumentsVisited(), (uint64_t)1000);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getBytesVisited(), (uint64_t)1024000);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getDocumentsReturned(), (uint64_t)123);
+ EXPECT_EQUAL(ref.getVisitorStatistics().getBytesReturned(), (uint64_t)512000);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_destroy_visitor_message() {
+ DestroyVisitorMessage tmp("myvisitor");
+ serialize("DestroyVisitorMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("DestroyVisitorMessage", DocumentProtocol::MESSAGE_DESTROYVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<DestroyVisitorMessage&>(*obj);
+ EXPECT_EQUAL(ref.getInstanceId(), "myvisitor");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_destroy_visitor_reply() {
+ return try_visitor_reply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR);
+}
+
+bool Messages80Test::test_map_visitor_message() {
+ MapVisitorMessage tmp;
+ tmp.getData().set("foo", 3);
+ tmp.getData().set("bar", 5);
+
+ serialize("MapVisitorMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("MapVisitorMessage", DocumentProtocol::MESSAGE_MAPVISITOR, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<MapVisitorMessage&>(*obj);
+ EXPECT_EQUAL(ref.getData().size(), 2u);
+ EXPECT_EQUAL(ref.getData().get("foo", 0), 3);
+ EXPECT_EQUAL(ref.getData().get("bar", 0), 5);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_map_visitor_reply() {
+ return try_visitor_reply("MapVisitorReply", DocumentProtocol::REPLY_MAPVISITOR);
+}
+
+bool Messages80Test::test_query_result_message() {
+ QueryResultMessage srm;
+ vdslib::SearchResult& sr(srm.getSearchResult());
+ EXPECT_EQUAL(srm.getSequenceId(), 0u);
+ EXPECT_EQUAL(sr.getHitCount(), 0u);
+ EXPECT_EQUAL(sr.getAggregatorList().getSerializedSize(), 4u);
+ EXPECT_EQUAL(sr.getSerializedSize(), 20u);
+ EXPECT_EQUAL(srm.getApproxSize(), 28u);
+
+ serialize("QueryResultMessage-1", srm);
+
+ // Serialization is only implemented in C++
+ {
+ auto routable = deserialize("QueryResultMessage-1", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ vdslib::SearchResult& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dm.getSequenceId(), size_t(0));
+ EXPECT_EQUAL(dr.getHitCount(), size_t(0));
+ }
+
+ sr.addHit(0, "doc1", 89);
+ sr.addHit(1, "doc17", 109);
+ serialize("QueryResultMessage-2", srm);
+
+ const char* doc_id;
+ vdslib::SearchResult::RankType rank;
+
+ {
+ auto routable = deserialize("QueryResultMessage-2", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(2));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ }
+
+ sr.sort();
+ serialize("QueryResultMessage-3", srm);
+
+ {
+ auto routable = deserialize("QueryResultMessage-3", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(2));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ }
+
+ QueryResultMessage srm2;
+ vdslib::SearchResult& sr2(srm2.getSearchResult());
+ sr2.addHit(0, "doc1", 89, "sortdata2", 9);
+ sr2.addHit(1, "doc17", 109, "sortdata1", 9);
+ sr2.addHit(2, "doc18", 90, "sortdata3", 9);
+ serialize("QueryResultMessage-4", srm2);
+
+ {
+ auto routable = deserialize("QueryResultMessage-4", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(3));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ dr.getHit(2, doc_id, rank);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90));
+ EXPECT_EQUAL(strcmp("doc18", doc_id), 0);
+ }
+
+ sr2.sort();
+ const void* buf;
+ size_t sz;
+ sr2.getHit(0, doc_id, rank);
+ sr2.getSortBlob(0, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata1", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ sr2.getHit(1, doc_id, rank);
+ sr2.getSortBlob(1, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata2", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ sr2.getHit(2, doc_id, rank);
+ sr2.getSortBlob(2, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata3", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90));
+ EXPECT_EQUAL(strcmp("doc18", doc_id), 0);
+
+ serialize("QueryResultMessage-5", srm2);
+ {
+ auto routable = deserialize("QueryResultMessage-5", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(3));
+ dr.getHit(0, doc_id, rank);
+ dr.getSortBlob(0, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata1", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109));
+ EXPECT_EQUAL(strcmp("doc17", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ dr.getSortBlob(1, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata2", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89));
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ dr.getHit(2, doc_id, rank);
+ dr.getSortBlob(2, buf, sz);
+ EXPECT_EQUAL(sz, 9u);
+ EXPECT_EQUAL(memcmp("sortdata3", buf, sz), 0);
+ EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90));
+ EXPECT_EQUAL(strcmp("doc18", doc_id), 0);
+ }
+
+ QueryResultMessage qrm3;
+ auto& sr3 = qrm3.getSearchResult();
+ sr3.addHit(0, "doc1", 5);
+ sr3.addHit(1, "doc2", 7);
+ FeatureValues mf;
+ mf.names.emplace_back("foo");
+ mf.names.emplace_back("bar");
+ mf.values.resize(4);
+ mf.values[0].set_double(1.0);
+ mf.values[1].set_data({doc1_mf_data.data(), doc1_mf_data.size()});
+ mf.values[2].set_double(12.0);
+ mf.values[3].set_data({doc2_mf_data.data(), doc2_mf_data.size()});
+ sr3.set_match_features(FeatureValues(mf));
+ sr3.sort();
+
+ serialize("QueryResultMessage-6", qrm3);
+ {
+ auto routable = deserialize("QueryResultMessage-6", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP);
+ if (!EXPECT_TRUE(routable)) {
+ return false;
+ }
+ auto& dm = dynamic_cast<QueryResultMessage&>(*routable);
+ auto& dr = dm.getSearchResult();
+ EXPECT_EQUAL(dr.getHitCount(), size_t(2));
+ dr.getHit(0, doc_id, rank);
+ EXPECT_EQUAL(vdslib::SearchResult::RankType(7), rank);
+ EXPECT_EQUAL(strcmp("doc2", doc_id), 0);
+ dr.getHit(1, doc_id, rank);
+ EXPECT_EQUAL(vdslib::SearchResult::RankType(5), rank);
+ EXPECT_EQUAL(strcmp("doc1", doc_id), 0);
+ auto mfv = dr.get_match_feature_values(0);
+ EXPECT_EQUAL(mfv.size(), 2u);
+ EXPECT_EQUAL(mfv[0].as_double(), 12.0);
+ EXPECT_EQUAL(mfv[1].as_data().make_string(), "There");
+ mfv = dr.get_match_feature_values(1);
+ EXPECT_EQUAL(mfv.size(), 2u);
+ EXPECT_EQUAL(mfv[0].as_double(), 1.0);
+ EXPECT_EQUAL(mfv[1].as_data().make_string(), "Hi");
+ const auto& mf_names = dr.get_match_features().names;
+ EXPECT_EQUAL(mf_names.size(), 2u);
+ EXPECT_EQUAL(mf_names[0], "foo");
+ EXPECT_EQUAL(mf_names[1], "bar");
+ }
+ return true;
+}
+
+bool Messages80Test::test_query_result_reply() {
+ return try_visitor_reply("QueryResultReply", DocumentProtocol::REPLY_QUERYRESULT);
+}
+
+bool Messages80Test::test_visitor_info_message() {
+ VisitorInfoMessage tmp;
+ tmp.getFinishedBuckets().emplace_back(16, 1);
+ tmp.getFinishedBuckets().emplace_back(16, 2);
+ tmp.getFinishedBuckets().emplace_back(16, 4);
+ string utf8 = "error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6"; // FIXME utf-8 literal
+ tmp.setErrorMessage(utf8);
+
+ serialize("VisitorInfoMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("VisitorInfoMessage", DocumentProtocol::MESSAGE_VISITORINFO, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<VisitorInfoMessage&>(*obj);
+ ASSERT_EQUAL(ref.getFinishedBuckets().size(), 3u);
+ EXPECT_EQUAL(ref.getFinishedBuckets()[0], document::BucketId(16, 1));
+ EXPECT_EQUAL(ref.getFinishedBuckets()[1], document::BucketId(16, 2));
+ EXPECT_EQUAL(ref.getFinishedBuckets()[2], document::BucketId(16, 4));
+ EXPECT_EQUAL(ref.getErrorMessage(), utf8);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_visitor_info_reply() {
+ return try_visitor_reply("VisitorInfoReply", DocumentProtocol::REPLY_VISITORINFO);
+}
+
+bool Messages80Test::test_document_list_message() {
+ auto doc = createDoc(getTypeRepo(), "testdoc", "id:scheme:testdoc:n=1234:1");
+ DocumentListMessage::Entry entry(1234, std::move(doc), true);
+ DocumentListMessage tmp(document::BucketId(17, 1234));
+ tmp.getDocuments().push_back(std::move(entry));
+
+ serialize("DocumentListMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("DocumentListMessage", DocumentProtocol::MESSAGE_DOCUMENTLIST, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<DocumentListMessage&>(*obj);
+ ASSERT_EQUAL(ref.getDocuments().size(), 1u);
+ EXPECT_EQUAL(ref.getDocuments()[0].getDocument()->getId().toString(), "id:scheme:testdoc:n=1234:1");
+ EXPECT_EQUAL(ref.getDocuments()[0].getTimestamp(), 1234);
+ EXPECT_TRUE(ref.getDocuments()[0].isRemoveEntry());
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_document_list_reply() {
+ return try_visitor_reply("DocumentListReply", DocumentProtocol::REPLY_DOCUMENTLIST);
+}
+
+bool Messages80Test::test_empty_buckets_message() {
+ std::vector<document::BucketId> bids;
+ for (size_t i=0; i < 13; ++i) {
+ bids.emplace_back(16, i);
+ }
+ EmptyBucketsMessage msg(bids);
+
+ serialize("EmptyBucketsMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("EmptyBucketsMessage", DocumentProtocol::MESSAGE_EMPTYBUCKETS, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<EmptyBucketsMessage&>(*obj);
+ ASSERT_EQUAL(ref.getBucketIds().size(), 13u);
+ for (size_t i = 0; i < 13; ++i) {
+ EXPECT_EQUAL(ref.getBucketIds()[i], document::BucketId(16, i));
+ }
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_empty_buckets_reply() {
+ return try_visitor_reply("EmptyBucketsReply", DocumentProtocol::REPLY_EMPTYBUCKETS);
+}
+
+bool Messages80Test::test_get_bucket_list_message() {
+ GetBucketListMessage msg(document::BucketId(16, 123));
+ msg.setBucketSpace("beartato");
+
+ serialize("GetBucketListMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketListMessage", DocumentProtocol::MESSAGE_GETBUCKETLIST, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketListMessage&>(*obj);
+ EXPECT_EQUAL(ref.getBucketId(), document::BucketId(16, 123));
+ EXPECT_EQUAL(ref.getBucketSpace(), "beartato");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_get_bucket_list_reply() {
+ GetBucketListReply reply;
+ reply.getBuckets().emplace_back(document::BucketId(16, 123), "foo");
+ reply.getBuckets().emplace_back(document::BucketId(17, 1123), "bar");
+ reply.getBuckets().emplace_back(document::BucketId(18, 11123), "zoink");
+
+ serialize("GetBucketListReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketListReply", DocumentProtocol::REPLY_GETBUCKETLIST, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketListReply&>(*obj);
+ ASSERT_EQUAL(ref.getBuckets().size(), 3u);
+ EXPECT_EQUAL(ref.getBuckets()[0], GetBucketListReply::BucketInfo(document::BucketId(16, 123), "foo"));
+ EXPECT_EQUAL(ref.getBuckets()[1], GetBucketListReply::BucketInfo(document::BucketId(17, 1123), "bar"));
+ EXPECT_EQUAL(ref.getBuckets()[2], GetBucketListReply::BucketInfo(document::BucketId(18, 11123), "zoink"));
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_get_bucket_state_message() {
+ GetBucketStateMessage tmp;
+ tmp.setBucketId(document::BucketId(16, 666));
+
+ serialize("GetBucketStateMessage", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketStateMessage", DocumentProtocol::MESSAGE_GETBUCKETSTATE, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketStateMessage&>(*obj);
+ EXPECT_EQUAL(ref.getBucketId().getUsedBits(), 16u);
+ EXPECT_EQUAL(ref.getBucketId().getId(), 4611686018427388570ULL);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_get_bucket_state_reply() {
+ auto foo = document::DocumentId("id:ns:testdoc::foo").getGlobalId();
+ auto bar = document::DocumentId("id:ns:testdoc::bar").getGlobalId();
+ auto baz = document::DocumentId("id:ns:testdoc::baz");
+
+ GetBucketStateReply reply;
+ reply.getBucketState().emplace_back(foo, 777, false);
+ reply.getBucketState().emplace_back(bar, 888, true);
+ reply.getBucketState().emplace_back(baz, 999, false);
+ serialize("GetBucketStateReply", reply);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("GetBucketStateReply", DocumentProtocol::REPLY_GETBUCKETSTATE, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<GetBucketStateReply&>(*obj);
+ ASSERT_EQUAL(ref.getBucketState().size(), 3u);
+ EXPECT_EQUAL(ref.getBucketState()[0].getTimestamp(), 777u);
+ EXPECT_FALSE(ref.getBucketState()[0].getDocumentId());
+ EXPECT_EQUAL(ref.getBucketState()[0].getGlobalId(), foo);
+ EXPECT_FALSE(ref.getBucketState()[0].isRemoveEntry());
+
+ EXPECT_EQUAL(ref.getBucketState()[1].getTimestamp(), 888u);
+ EXPECT_FALSE(ref.getBucketState()[1].getDocumentId());
+ EXPECT_EQUAL(ref.getBucketState()[1].getGlobalId(), bar);
+ EXPECT_TRUE(ref.getBucketState()[1].isRemoveEntry());
+
+ EXPECT_EQUAL(ref.getBucketState()[2].getTimestamp(), 999u);
+ EXPECT_EQUAL(ref.getBucketState()[2].getGlobalId(), baz.getGlobalId());
+ EXPECT_FALSE(ref.getBucketState()[2].isRemoveEntry());
+ ASSERT_TRUE(ref.getBucketState()[2].getDocumentId());
+ EXPECT_EQUAL(*ref.getBucketState()[2].getDocumentId(), baz);
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_stat_bucket_message() {
+ StatBucketMessage msg(document::BucketId(16, 123), "id.user=123");
+ msg.setBucketSpace("andrei");
+
+ serialize("StatBucketMessage", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("StatBucketMessage", DocumentProtocol::MESSAGE_STATBUCKET, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<StatBucketMessage&>(*obj);
+ EXPECT_EQUAL(ref.getBucketId(), document::BucketId(16, 123));
+ EXPECT_EQUAL(ref.getDocumentSelection(), "id.user=123");
+ EXPECT_EQUAL(ref.getBucketSpace(), "andrei");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_stat_bucket_reply() {
+ StatBucketReply msg;
+ msg.setResults("These are the votes of the Norwegian jury");
+
+ serialize("StatBucketReply", msg);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("StatBucketReply", DocumentProtocol::REPLY_STATBUCKET, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<StatBucketReply&>(*obj);
+ EXPECT_EQUAL(ref.getResults(), "These are the votes of the Norwegian jury");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_wrong_distribution_reply() {
+ WrongDistributionReply tmp("distributor:3 storage:2");
+
+ serialize("WrongDistributionReply", tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize("WrongDistributionReply", DocumentProtocol::REPLY_WRONGDISTRIBUTION, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto& ref = dynamic_cast<WrongDistributionReply&>(*obj);
+ EXPECT_EQUAL(ref.getSystemState(), "distributor:3 storage:2");
+ }
+ }
+ return true;
+}
+
+bool Messages80Test::test_document_ignored_reply() {
+ DocumentIgnoredReply tmp;
+ serialize("DocumentIgnoredReply", tmp);
+ for (auto lang : languages()) {
+ auto obj = deserialize("DocumentIgnoredReply", DocumentProtocol::REPLY_DOCUMENTIGNORED, lang);
+ EXPECT_TRUE(obj);
+ }
+ return true;
+}
+
+bool Messages80Test::try_visitor_reply(const string& filename, uint32_t type) {
+ VisitorReply tmp(type);
+ serialize(filename, tmp);
+
+ for (auto lang : languages()) {
+ auto obj = deserialize(filename, type, lang);
+ if (EXPECT_TRUE(obj)) {
+ auto* ptr = dynamic_cast<VisitorReply*>(obj.get());
+ EXPECT_TRUE(ptr);
+ }
+ }
+ return true;
+}
+
+// TODO rewrite to Gtest
+TEST_APPHOOK(Messages80Test);
diff --git a/documentapi/src/tests/messages/testbase.cpp b/documentapi/src/tests/messages/testbase.cpp
index 4ea770e7309..e5647ceaef8 100644
--- a/documentapi/src/tests/messages/testbase.cpp
+++ b/documentapi/src/tests/messages/testbase.cpp
@@ -25,6 +25,8 @@ TestBase::TestBase() :
{
}
+TestBase::~TestBase() = default;
+
int
TestBase::Main()
{
@@ -34,16 +36,15 @@ TestBase::Main()
LOG(info, "Running tests for version %s.", getVersion().toString().c_str());
// Run registered tests.
- for (std::map<uint32_t, TEST_METHOD_PT>::iterator it = _tests.begin();
- it != _tests.end(); ++it)
- {
- LOG(info, "Running test for routable type %d.", it->first);
- EXPECT_TRUE( (this->*(it->second))() );
+ for (const auto& test : _tests) {
+ LOG(info, "Running test for routable type %d.", test.first);
+ EXPECT_TRUE( (this->*(test.second))() );
TEST_FLUSH();
}
// Test routable type coverage.
- std::vector<uint32_t> expected, actual;
+ std::vector<uint32_t> expected;
+ std::vector<uint32_t> actual;
EXPECT_TRUE(testCoverage(expected, actual));
expected.push_back(0);
EXPECT_TRUE(!testCoverage(expected, actual));
@@ -58,10 +59,8 @@ TestBase::Main()
_protocol.getRoutableTypes(getVersion(), expected);
actual.clear();
- for (std::map<uint32_t, TEST_METHOD_PT>::iterator it = _tests.begin();
- it != _tests.end(); ++it)
- {
- actual.push_back(it->first);
+ for (const auto& test : _tests) {
+ actual.push_back(test.first);
}
if (shouldTestCoverage()) {
EXPECT_TRUE(testCoverage(expected, actual, true));
@@ -100,13 +99,11 @@ TestBase::testCoverage(const std::vector<uint32_t> &expected, const std::vector<
bool ret = true;
std::vector<uint32_t> lst(actual);
- for (std::vector<uint32_t>::const_iterator it = expected.begin();
- it != expected.end(); ++it)
- {
- std::vector<uint32_t>::iterator occ = std::find(lst.begin(), lst.end(), *it);
+ for (uint32_t wanted : expected) {
+ auto occ = std::find(lst.begin(), lst.end(), wanted);
if (occ == lst.end()) {
if (report) {
- LOG(error, "Routable type %d is registered in DocumentProtocol but not tested.", *it);
+ LOG(error, "Routable type %d is registered in DocumentProtocol but not tested.", wanted);
}
ret = false;
} else {
@@ -115,10 +112,8 @@ TestBase::testCoverage(const std::vector<uint32_t> &expected, const std::vector<
}
if (!lst.empty()) {
if (report) {
- for (std::vector<uint32_t>::iterator it = lst.begin();
- it != lst.end(); ++it)
- {
- LOG(error, "Routable type %d is tested but not registered in DocumentProtocol.", *it);
+ for (uint32_t missing : lst) {
+ LOG(error, "Routable type %d is tested but not registered in DocumentProtocol.", missing);
}
}
ret = false;
@@ -151,7 +146,7 @@ TestBase::serialize(const string &filename, const mbus::Routable &routable, Tamp
return 0;
}
mbus::Routable::UP obj = _protocol.decode(version, blob);
- if (!EXPECT_TRUE(obj.get() != NULL)) {
+ if (!EXPECT_TRUE(obj.get() != nullptr)) {
LOG(error, "Protocol failed to decode serialized data.");
return 0;
}
@@ -172,7 +167,7 @@ TestBase::deserialize(const string &filename, uint32_t classId, uint32_t lang)
mbus::Blob blob = readFile(path);
if (!EXPECT_TRUE(blob.size() != 0)) {
LOG(error, "Could not open file '%s' for reading.", path.c_str());
- return mbus::Routable::UP();
+ return {};
}
mbus::Routable::UP ret = _protocol.decode(version, blob);
@@ -180,7 +175,7 @@ TestBase::deserialize(const string &filename, uint32_t classId, uint32_t lang)
LOG(error, "Unable to decode class %d", classId);
} else if (!EXPECT_TRUE(classId == ret->getType())) {
LOG(error, "Expected class %d, got %d.", classId, ret->getType());
- return mbus::Routable::UP();
+ return {};
}
return ret;
}
diff --git a/documentapi/src/tests/messages/testbase.h b/documentapi/src/tests/messages/testbase.h
index 313f2d1f293..ad371bcc3bc 100644
--- a/documentapi/src/tests/messages/testbase.h
+++ b/documentapi/src/tests/messages/testbase.h
@@ -36,8 +36,8 @@ protected:
};
TestBase();
- virtual ~TestBase() { /* empty */ }
- virtual const vespalib::Version getVersion() const = 0;
+ ~TestBase() override;
+ virtual vespalib::Version getVersion() const = 0;
virtual bool shouldTestCoverage() const = 0;
TestBase &putTest(uint32_t type, TEST_METHOD_PT test);
int Main() override;
diff --git a/documentapi/src/vespa/documentapi/CMakeLists.txt b/documentapi/src/vespa/documentapi/CMakeLists.txt
index a3a2815cc4f..1d0b3784a9d 100644
--- a/documentapi/src/vespa/documentapi/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/CMakeLists.txt
@@ -7,3 +7,5 @@ vespa_add_library(documentapi
INSTALL lib64
DEPENDS
)
+
+vespa_add_target_package_dependency(documentapi Protobuf)
diff --git a/documentapi/src/vespa/documentapi/messagebus/.gitignore b/documentapi/src/vespa/documentapi/messagebus/.gitignore
index d58390943e2..488f2e6355d 100644
--- a/documentapi/src/vespa/documentapi/messagebus/.gitignore
+++ b/documentapi/src/vespa/documentapi/messagebus/.gitignore
@@ -2,3 +2,5 @@
Makefile
config-*.cpp
config-*.h
+*.pb.cc
+*.pb.h
diff --git a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
index c3198ba7b2b..d59fd56037d 100644
--- a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
@@ -1,12 +1,35 @@
# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+find_package(Protobuf REQUIRED)
+
+# .proto files are in a higher-level directory as they are shared across languages
+set(documentapi_messagebus_PROTOBUF_REL_PATH "../../../protobuf")
+PROTOBUF_GENERATE_CPP(documentapi_messagebus_PROTOBUF_SRCS documentapi_messagebus_PROTOBUF_HDRS
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_common.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_feed.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_inspect.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_visiting.proto")
+
+vespa_add_source_target(protobufgen_documentapi_messagebus DEPENDS
+ ${documentapi_messagebus_PROTOBUF_SRCS}
+ ${documentapi_messagebus_PROTOBUF_HDRS})
+
+vespa_suppress_warnings_for_protobuf_sources(SOURCES ${documentapi_messagebus_PROTOBUF_SRCS})
+
+# protoc explicitly annotates methods with inline, which triggers -Werror=inline when
+# the header file grows over a certain size.
+set_source_files_properties(routable_factories_8.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline")
+
vespa_add_library(documentapi_documentapimessagebus OBJECT
SOURCES
documentprotocol.cpp
replymerger.cpp
+ routable_factories_8.cpp
routablefactories60.cpp
routablerepository.cpp
routingpolicyfactories.cpp
routingpolicyrepository.cpp
+ ${documentapi_messagebus_PROTOBUF_SRCS}
DEPENDS
documentapi_documentapipolicies
)
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
index f16f63029ee..f53594c6d32 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
@@ -1,16 +1,17 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "replymerger.h"
+#include "routable_factories_8.h"
#include "routablefactories60.h"
-#include "routingpolicyfactories.h"
#include "routablerepository.h"
+#include "routingpolicyfactories.h"
#include "routingpolicyrepository.h"
-#include "replymerger.h"
#include <vespa/document/util/stringutil.h>
#include <vespa/documentapi/documentapi.h>
-#include <vespa/vespalib/util/exceptions.h>
#include <vespa/messagebus/error.h>
-#include <sstream>
+#include <vespa/vespalib/util/exceptions.h>
#include <cassert>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".documentprotocol");
@@ -40,47 +41,98 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo,
putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>());
putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>());
+ add_legacy_v6_factories();
+ add_v8_factories();
+}
+
+DocumentProtocol::~DocumentProtocol() = default;
+
+void
+DocumentProtocol::add_legacy_v6_factories()
+{
// Prepare version specifications to use when adding routable factories.
vespalib::VersionSpecification version6(6, 221);
-
std::vector<vespalib::VersionSpecification> from6 = { version6 };
// Add 6.x serialization
- putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6);
- putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6);
- putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6);
- putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6);
- putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6);
- putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6);
- putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6);
- putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6);
+ putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6);
+ putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6);
+ putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6);
+ putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6);
+ putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6);
+ putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6);
+ putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6);
putRoutableFactory(REPLY_WRONGDISTRIBUTION, std::make_shared<RoutableFactories60::WrongDistributionReplyFactory>(), from6);
}
-DocumentProtocol::~DocumentProtocol() = default;
+void
+DocumentProtocol::add_v8_factories()
+{
+ vespalib::VersionSpecification version8(8, 304);
+ std::vector<vespalib::VersionSpecification> from8 = { version8 };
+
+ using RF8 = messagebus::RoutableFactories80;
+ auto put_v8_factory = [&](auto msg_id, auto factory) {
+ putRoutableFactory(msg_id, std::move(factory), from8);
+ };
+
+ put_v8_factory(MESSAGE_CREATEVISITOR, RF8::create_visitor_message_factory());
+ put_v8_factory(MESSAGE_DESTROYVISITOR, RF8::destroy_visitor_message_factory());
+ put_v8_factory(MESSAGE_DOCUMENTLIST, RF8::document_list_message_factory(_repo));
+ put_v8_factory(MESSAGE_EMPTYBUCKETS, RF8::empty_buckets_message_factory());
+ put_v8_factory(MESSAGE_GETBUCKETLIST, RF8::get_bucket_list_message_factory());
+ put_v8_factory(MESSAGE_GETBUCKETSTATE, RF8::get_bucket_state_message_factory());
+ put_v8_factory(MESSAGE_GETDOCUMENT, RF8::get_document_message_factory());
+ put_v8_factory(MESSAGE_MAPVISITOR, RF8::map_visitor_message_factory());
+ put_v8_factory(MESSAGE_PUTDOCUMENT, RF8::put_document_message_factory(_repo));
+ put_v8_factory(MESSAGE_QUERYRESULT, RF8::query_result_message_factory());
+ put_v8_factory(MESSAGE_REMOVEDOCUMENT, RF8::remove_document_message_factory());
+ put_v8_factory(MESSAGE_REMOVELOCATION, RF8::remove_location_message_factory(_repo));
+ put_v8_factory(MESSAGE_STATBUCKET, RF8::stat_bucket_message_factory());
+ put_v8_factory(MESSAGE_UPDATEDOCUMENT, RF8::update_document_message_factory(_repo));
+ put_v8_factory(MESSAGE_VISITORINFO, RF8::visitor_info_message_factory());
+ put_v8_factory(REPLY_CREATEVISITOR, RF8::create_visitor_reply_factory());
+ put_v8_factory(REPLY_DESTROYVISITOR, RF8::destroy_visitor_reply_factory());
+ put_v8_factory(REPLY_DOCUMENTIGNORED, RF8::document_ignored_reply_factory());
+ put_v8_factory(REPLY_DOCUMENTLIST, RF8::document_list_reply_factory());
+ put_v8_factory(REPLY_EMPTYBUCKETS, RF8::empty_buckets_reply_factory());
+ put_v8_factory(REPLY_GETBUCKETLIST, RF8::get_bucket_list_reply_factory());
+ put_v8_factory(REPLY_GETBUCKETSTATE, RF8::get_bucket_state_reply_factory());
+ put_v8_factory(REPLY_GETDOCUMENT, RF8::get_document_reply_factory(_repo));
+ put_v8_factory(REPLY_MAPVISITOR, RF8::map_visitor_reply_factory());
+ put_v8_factory(REPLY_PUTDOCUMENT, RF8::put_document_reply_factory());
+ put_v8_factory(REPLY_QUERYRESULT, RF8::query_result_reply_factory());
+ put_v8_factory(REPLY_REMOVEDOCUMENT, RF8::remove_document_reply_factory());
+ put_v8_factory(REPLY_REMOVELOCATION, RF8::remove_location_reply_factory());
+ put_v8_factory(REPLY_STATBUCKET, RF8::stat_bucket_reply_factory());
+ put_v8_factory(REPLY_UPDATEDOCUMENT, RF8::update_document_reply_factory());
+ put_v8_factory(REPLY_VISITORINFO, RF8::visitor_info_reply_factory());
+ put_v8_factory(REPLY_WRONGDISTRIBUTION, RF8::wrong_distribution_reply_factory());
+}
mbus::IRoutingPolicy::UP
DocumentProtocol::createPolicy(const mbus::string &name, const mbus::string &param) const
@@ -99,13 +151,9 @@ mbus::Blob
DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable &routable) const
{
mbus::Blob blob(_routableRepository->encode(version, routable));
- // When valgrind reports errors of uninitialized data being written to
- // the network, it is useful to be able to see the serialized data to
- // try to identify what bits are uninitialized.
if (LOG_WOULD_LOG(spam)) {
std::ostringstream message;
- document::StringUtil::printAsHex(
- message, blob.data(), blob.size());
+ document::StringUtil::printAsHex(message, blob.data(), blob.size());
LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s",
routable.getProtocol().c_str(), routable.getType(),
version.toString().c_str(), message.str().c_str());
@@ -120,7 +168,7 @@ DocumentProtocol::decode(const vespalib::Version &version, mbus::BlobRef data) c
return _routableRepository->decode(version, data);
} catch (vespalib::Exception &e) {
LOG(warning, "%s", e.getMessage().c_str());
- return mbus::Routable::UP();
+ return {};
}
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
index c771e86031d..b9658750ada 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
@@ -294,6 +294,10 @@ public:
mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string &param) const override;
mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override;
mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override;
+
+private:
+ void add_legacy_v6_factories();
+ void add_v8_factories();
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
index bd2f245ad15..12c6a50fa24 100644
--- a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
+++ b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
@@ -22,18 +22,13 @@ class IRoutableFactory {
protected:
IRoutableFactory() = default;
public:
- /**
- * Convenience typedefs.
- */
using UP = std::unique_ptr<IRoutableFactory>;
using SP = std::shared_ptr<IRoutableFactory>;
IRoutableFactory(const IRoutableFactory &) = delete;
IRoutableFactory & operator = (const IRoutableFactory &) = delete;
- /**
- * Virtual destructor required for inheritance.
- */
- virtual ~IRoutableFactory() { }
+
+ virtual ~IRoutableFactory() = default;
/**
* This method encodes the content of the given routable into a byte buffer that can later be decoded
@@ -45,11 +40,11 @@ public:
* @param out The buffer to write into.
* @return True if the routable could be encoded.
*/
- virtual bool encode(const mbus::Routable &obj,
- vespalib::GrowableByteBuffer &out) const = 0;
+ [[nodiscard]] virtual bool encode(const mbus::Routable &obj,
+ vespalib::GrowableByteBuffer &out) const = 0;
/**
- * This method decodes the given byte bufer to a routable.
+ * This method decodes the given byte buffer to a routable.
*
* This method is NOT exception safe. Return null to signal failure.
*
@@ -57,7 +52,7 @@ public:
* @param loadTypes The set of configured load types.
* @return The decoded routable.
*/
- virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0;
+ [[nodiscard]] virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0;
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
index 3d713c95da8..c47c2421fcb 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
@@ -18,7 +18,7 @@ protected:
*
* @return A document reply that corresponds to this message.
*/
- virtual DocumentReply::UP doCreateReply() const = 0;
+ [[nodiscard]] virtual DocumentReply::UP doCreateReply() const = 0;
public:
/**
@@ -37,16 +37,16 @@ public:
*
* @return The created reply.
*/
- std::unique_ptr<mbus::Reply> createReply() const;
+ [[nodiscard]] std::unique_ptr<mbus::Reply> createReply() const;
/**
* Returns the priority of this message.
*
* @return The priority.
*/
- Priority::Value getPriority() const { return _priority; };
+ [[nodiscard]] Priority::Value getPriority() const { return _priority; };
- uint8_t priority() const override { return (uint8_t)_priority; }
+ [[nodiscard]] uint8_t priority() const override { return (uint8_t)_priority; }
/**
* Sets the priority tag for this message.
@@ -55,7 +55,7 @@ public:
*/
void setPriority(Priority::Value p) { _priority = p; };
- uint32_t getApproxSize() const override;
+ [[nodiscard]] uint32_t getApproxSize() const override;
void setApproxSize(uint32_t approxSize) {
_approxSize = approxSize;
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
index 755e523c065..2c5833beb20 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
@@ -14,13 +14,12 @@ EmptyBucketsMessage::EmptyBucketsMessage(const std::vector<document::BucketId> &
{
}
-EmptyBucketsMessage::~EmptyBucketsMessage() {
-}
+EmptyBucketsMessage::~EmptyBucketsMessage() = default;
void
-EmptyBucketsMessage::setBucketIds(const std::vector<document::BucketId> &bucketIds)
+EmptyBucketsMessage::setBucketIds(std::vector<document::BucketId> bucketIds)
{
- _bucketIds = bucketIds;
+ _bucketIds = std::move(bucketIds);
}
void
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
index 7cecd1e1a2b..7078efc778c 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
@@ -27,7 +27,7 @@ public:
std::vector<document::BucketId> &getBucketIds() { return _bucketIds; }
const std::vector<document::BucketId> &getBucketIds() const { return _bucketIds; }
- void setBucketIds(const std::vector<document::BucketId> &bucketIds);
+ void setBucketIds(std::vector<document::BucketId> bucketIds);
void resize(uint32_t size);
uint32_t getType() const override;
string toString() const override { return "emptybucketsmessage"; }
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
index dbab28e3172..1ebc867156e 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
@@ -30,8 +30,8 @@ public:
*
* @param document The document to put.
*/
- PutDocumentMessage(DocumentSP document);
- ~PutDocumentMessage();
+ explicit PutDocumentMessage(DocumentSP document);
+ ~PutDocumentMessage() override;
/**
* Returns the document to put.
@@ -39,7 +39,7 @@ public:
* @return The document.
*/
const DocumentSP & getDocumentSP() const { return _document; }
- DocumentSP stealDocument() { return std::move(_document); }
+ [[nodiscard]] DocumentSP stealDocument() { return std::move(_document); }
const document::Document & getDocument() const { return *_document; }
/**
@@ -68,7 +68,7 @@ public:
string toString() const override { return "putdocumentmessage"; }
void set_create_if_non_existent(bool value) noexcept { _create_if_non_existent = value; }
- bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; }
+ [[nodiscard]] bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; }
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
index 87c3456d62c..4608350f8a6 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
@@ -10,7 +10,7 @@ namespace document { class BucketIdFactory; }
namespace documentapi {
/**
- * Message (VDS only) to remove an entire location for users using user or group schemes for their documents.
+ * Message to remove an entire location for users using user or group schemes for their documents.
* A location in this context is either a user id or a group name.
*/
class RemoveLocationMessage : public DocumentMessage {
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
index cd0cb5e1d3a..da3219d999f 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
@@ -74,12 +74,16 @@ public:
const vdslib::Parameters& getParameters() const { return _params; }
vdslib::Parameters& getParameters() { return _params; }
void setParameters(const vdslib::Parameters& params) { _params = params; }
+ void setParameters(vdslib::Parameters&& params) noexcept { _params = std::move(params); }
uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; }
void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; }
const std::vector<document::BucketId>& getBuckets() const { return _buckets; }
std::vector<document::BucketId>& getBuckets() { return _buckets; }
+ void setBuckets(std::vector<document::BucketId> buckets) noexcept {
+ _buckets = std::move(buckets);
+ }
const document::BucketId getBucketId() const { return *_buckets.begin(); }
@@ -196,6 +200,9 @@ public:
std::vector<document::BucketId>& getFinishedBuckets() { return _finishedBuckets; }
const std::vector<document::BucketId>& getFinishedBuckets() const { return _finishedBuckets; }
+ void setFinishedBuckets(std::vector<document::BucketId> buckets) noexcept {
+ _finishedBuckets = std::move(buckets);
+ }
const string& getErrorMessage() const { return _errorMessage; }
void setErrorMessage(const string& errorMessage) { _errorMessage = errorMessage; };
@@ -224,6 +231,7 @@ public:
vdslib::Parameters& getData() { return _data; };
const vdslib::Parameters& getData() const { return _data; };
+ void setData(vdslib::Parameters&& data) noexcept { _data = std::move(data); }
uint32_t getApproxSize() const override;
uint32_t getType() const override;
@@ -246,9 +254,9 @@ public:
Entry(const Entry& other);
Entry(const document::DocumentTypeRepo &repo, document::ByteBuffer& buf);
- int64_t getTimestamp() { return _timestamp; }
- const DocumentSP & getDocument() { return _document; }
- bool isRemoveEntry() { return _removeEntry; }
+ int64_t getTimestamp() const noexcept { return _timestamp; }
+ const DocumentSP & getDocument() const noexcept { return _document; }
+ bool isRemoveEntry() const noexcept { return _removeEntry; }
void serialize(vespalib::GrowableByteBuffer& buf) const;
private:
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
new file mode 100644
index 00000000000..399801526ba
--- /dev/null
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
@@ -0,0 +1,883 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "routable_factories_8.h"
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/select/parser.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/util/serializableexceptions.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/documentapi/messagebus/docapi_common.pb.h>
+#include <vespa/documentapi/messagebus/docapi_feed.pb.h>
+#include <vespa/documentapi/messagebus/docapi_visiting.pb.h>
+#include <vespa/documentapi/messagebus/docapi_inspect.pb.h>
+#include <vespa/vespalib/objects/nbostream.h>
+
+namespace documentapi::messagebus {
+
+namespace {
+
+// Protobuf codec helpers for common types
+
+void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) {
+ dest.set_raw_id(src.getRawId());
+}
+
+document::BucketId get_bucket_id(const protobuf::BucketId& src) {
+ return document::BucketId(src.raw_id());
+}
+
+void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) {
+ auto doc_id = src.toString();
+ dest.set_id(doc_id.data(), doc_id.size());
+}
+
+document::DocumentId get_document_id(const protobuf::DocumentId& src) {
+ return document::DocumentId(src.id());
+}
+
+// TODO DocumentAPI should be extended to use actual document::FieldSet enums instead of always passing strings.
+void set_raw_field_set(protobuf::FieldSet& dest, vespalib::stringref src) {
+ dest.set_spec(src.data(), src.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_raw_field_set(const protobuf::FieldSet& src) noexcept {
+ return src.spec();
+}
+
+void set_raw_selection(protobuf::DocumentSelection& dest, vespalib::stringref src) {
+ dest.set_selection(src.data(), src.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_raw_selection(const protobuf::DocumentSelection& src) noexcept {
+ return src.selection();
+}
+
+void set_bucket_space(protobuf::BucketSpace& dest, vespalib::stringref space_name) {
+ dest.set_name(space_name.data(), space_name.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_bucket_space(const protobuf::BucketSpace& src) noexcept {
+ return src.name();
+}
+
+void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
+ char tmp[document::GlobalId::LENGTH];
+ memcpy(tmp, src.get(), document::GlobalId::LENGTH);
+ dest.set_raw_gid(tmp, document::GlobalId::LENGTH);
+}
+
+document::GlobalId get_global_id(const protobuf::GlobalId& src) {
+ if (src.raw_gid().size() != document::GlobalId::LENGTH) {
+ throw document::DeserializeException("Unexpected serialized protobuf GlobalId size");
+ }
+ return document::GlobalId(src.raw_gid().data()); // By copy
+}
+
+documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) {
+ return documentapi::TestAndSetCondition(src.selection());
+}
+
+void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) {
+ dest.set_selection(src.getSelection().data(), src.getSelection().size());
+}
+
+std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src_doc.payload().empty()) {
+ vespalib::nbostream doc_buf(src_doc.payload().data(), src_doc.payload().size());
+ return std::make_shared<document::Document>(type_repo, doc_buf);
+ }
+ return {};
+}
+
+std::shared_ptr<document::Document> get_document_or_throw(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ auto doc = get_document(src_doc, type_repo);
+ if (!doc) [[unlikely]] {
+ throw document::DeserializeException("Message does not contain a required document object", VESPA_STRLOC);
+ }
+ return doc;
+}
+
+void set_document(protobuf::Document& target_doc, const document::Document& src_doc) {
+ vespalib::nbostream stream;
+ src_doc.serialize(stream);
+ target_doc.set_payload(stream.peek(), stream.size());
+}
+
+void set_update(protobuf::DocumentUpdate& dest, const document::DocumentUpdate& src) {
+ vespalib::nbostream stream;
+ src.serializeHEAD(stream);
+ dest.set_payload(stream.peek(), stream.size());
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::DocumentUpdate& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src.payload().empty()) {
+ return document::DocumentUpdate::createHEAD(
+ type_repo, vespalib::nbostream(src.payload().data(), src.payload().size()));
+ }
+ return {};
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update_or_throw(const protobuf::DocumentUpdate& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ auto upd = get_update(src, type_repo);
+ if (!upd) [[unlikely]] {
+ throw document::DeserializeException("Message does not contain a required document update object", VESPA_STRLOC);
+ }
+ return upd;
+}
+
+template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn>
+requires std::is_invocable_r_v<void, EncodeFn, const DocApiType&, ProtobufType&> &&
+ std::is_invocable_r_v<std::unique_ptr<DocApiType>, DecodeFn, const ProtobufType&>
+class ProtobufRoutableFactory final : public IRoutableFactory {
+ EncodeFn _encode_fn;
+ DecodeFn _decode_fn;
+public:
+ template <typename EncFn, typename DecFn>
+ ProtobufRoutableFactory(EncFn&& enc_fn, DecFn&& dec_fn) noexcept
+ : _encode_fn(std::forward<EncFn>(enc_fn)),
+ _decode_fn(std::forward<DecFn>(dec_fn))
+ {}
+ ~ProtobufRoutableFactory() override = default;
+
+ bool encode(const mbus::Routable& obj, vespalib::GrowableByteBuffer& out) const override {
+ ::google::protobuf::Arena arena;
+ auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena);
+
+ _encode_fn(dynamic_cast<const DocApiType&>(obj), *proto_obj);
+
+ const auto sz = proto_obj->ByteSizeLong();
+ assert(sz <= INT32_MAX);
+ auto* buf = reinterpret_cast<uint8_t*>(out.allocate(sz));
+ return proto_obj->SerializeWithCachedSizesToArray(buf);
+ }
+
+ mbus::Routable::UP decode(document::ByteBuffer& in) const override {
+ ::google::protobuf::Arena arena;
+ auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena);
+ const auto buf_size = in.getRemaining();
+ assert(buf_size <= INT_MAX);
+ bool ok = proto_obj->ParseFromArray(in.getBufferAtPos(), buf_size);
+ if (!ok) {
+ return {}; // Malformed protobuf payload
+ }
+ auto msg = _decode_fn(*proto_obj);
+ if constexpr (std::is_base_of_v<DocumentMessage, DocApiType>) {
+ msg->setApproxSize(buf_size); // Wire size is a proxy for in-memory size
+ }
+ return msg;
+ }
+};
+
+template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn>
+auto make_codec(EncodeFn&& enc_fn, DecodeFn&& dec_fn) {
+ return std::make_shared<ProtobufRoutableFactory<DocApiType, ProtobufType, EncodeFn, DecodeFn>>(
+ std::forward<EncodeFn>(enc_fn), std::forward<DecodeFn>(dec_fn));
+}
+
+} // anon ns
+
+// ---------------------------------------------
+// Get request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_message_factory() {
+ return make_codec<GetDocumentMessage, protobuf::GetDocumentRequest>(
+ [](const GetDocumentMessage& src, protobuf::GetDocumentRequest& dest) {
+ set_document_id(*dest.mutable_document_id(), src.getDocumentId());
+ set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet());
+ },
+ [](const protobuf::GetDocumentRequest& src) {
+ return std::make_unique<GetDocumentMessage>(get_document_id(src.document_id()), get_raw_field_set(src.field_set()));
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<GetDocumentReply, protobuf::GetDocumentResponse>(
+ [](const GetDocumentReply& src, protobuf::GetDocumentResponse& dest) {
+ if (src.hasDocument()) {
+ set_document(*dest.mutable_document(), src.getDocument());
+ }
+ dest.set_last_modified(src.getLastModified());
+ },
+ [type_repo = std::move(repo)](const protobuf::GetDocumentResponse& src) {
+ auto msg = std::make_unique<GetDocumentReply>();
+ if (src.has_document()) {
+ auto doc = get_document(src.document(), *type_repo);
+ doc->setLastModified(static_cast<int64_t>(src.last_modified()));
+ msg->setDocument(std::move(doc));
+ }
+ msg->setLastModified(src.last_modified());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Put request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<PutDocumentMessage, protobuf::PutDocumentRequest>(
+ [](const PutDocumentMessage& src, protobuf::PutDocumentRequest& dest) {
+ dest.set_force_assign_timestamp(src.getTimestamp());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ if (src.getDocumentSP()) { // This should always be present in practice
+ set_document(*dest.mutable_document(), src.getDocument());
+ }
+ dest.set_create_if_missing(src.get_create_if_non_existent());
+ },
+ [type_repo = std::move(repo)](const protobuf::PutDocumentRequest& src) {
+ auto msg = std::make_unique<PutDocumentMessage>();
+ msg->setDocument(get_document_or_throw(src.document(), *type_repo));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ msg->setTimestamp(src.force_assign_timestamp());
+ msg->set_create_if_non_existent(src.create_if_missing());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_reply_factory() {
+ return make_codec<WriteDocumentReply, protobuf::PutDocumentResponse>(
+ [](const WriteDocumentReply& src, protobuf::PutDocumentResponse& dest) {
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::PutDocumentResponse& src) {
+ auto msg = std::make_unique<WriteDocumentReply>(DocumentProtocol::REPLY_PUTDOCUMENT);
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Update request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<UpdateDocumentMessage, protobuf::UpdateDocumentRequest>(
+ [](const UpdateDocumentMessage& src, protobuf::UpdateDocumentRequest& dest) {
+ set_update(*dest.mutable_update(), src.getDocumentUpdate());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ dest.set_expected_old_timestamp(src.getOldTimestamp());
+ dest.set_force_assign_timestamp(src.getNewTimestamp());
+ },
+ [type_repo = std::move(repo)](const protobuf::UpdateDocumentRequest& src) {
+ auto msg = std::make_unique<UpdateDocumentMessage>();
+ msg->setDocumentUpdate(get_update_or_throw(src.update(), *type_repo));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ msg->setOldTimestamp(src.expected_old_timestamp());
+ msg->setNewTimestamp(src.force_assign_timestamp());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_reply_factory() {
+ return make_codec<UpdateDocumentReply, protobuf::UpdateDocumentResponse>(
+ [](const UpdateDocumentReply& src, protobuf::UpdateDocumentResponse& dest) {
+ dest.set_was_found(src.wasFound());
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::UpdateDocumentResponse& src) {
+ auto msg = std::make_unique<UpdateDocumentReply>();
+ msg->setWasFound(src.was_found());
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Remove request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_message_factory() {
+ return make_codec<RemoveDocumentMessage, protobuf::RemoveDocumentRequest>(
+ [](const RemoveDocumentMessage& src, protobuf::RemoveDocumentRequest& dest) {
+ set_document_id(*dest.mutable_document_id(), src.getDocumentId());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ },
+ [](const protobuf::RemoveDocumentRequest& src) {
+ auto msg = std::make_unique<RemoveDocumentMessage>();
+ msg->setDocumentId(get_document_id(src.document_id()));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_reply_factory() {
+ return make_codec<RemoveDocumentReply, protobuf::RemoveDocumentResponse>(
+ [](const RemoveDocumentReply& src, protobuf::RemoveDocumentResponse& dest) {
+ dest.set_was_found(src.wasFound());
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::RemoveDocumentResponse& src) {
+ auto msg = std::make_unique<RemoveDocumentReply>();
+ msg->setWasFound(src.was_found());
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// RemoveLocation request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory>
+RoutableFactories80::remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<RemoveLocationMessage, protobuf::RemoveLocationRequest>(
+ [](const RemoveLocationMessage& src, protobuf::RemoveLocationRequest& dest) {
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [type_repo = std::move(repo)](const protobuf::RemoveLocationRequest& src) {
+ document::BucketIdFactory factory;
+ document::select::Parser parser(*type_repo, factory);
+ auto msg = std::make_unique<RemoveLocationMessage>(factory, parser, get_raw_selection(src.selection()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_location_reply_factory() {
+ return make_codec<DocumentReply, protobuf::RemoveLocationResponse>(
+ []([[maybe_unused]] const DocumentReply& src, [[maybe_unused]] protobuf::RemoveLocationResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::RemoveLocationResponse& src) {
+ // The lack of 1-1 type mapping is pretty awkward :I
+ return std::make_unique<DocumentReply>(DocumentProtocol::REPLY_REMOVELOCATION);
+ }
+ );
+}
+
+// ---------------------------------------------
+// CreateVisitor request and response
+// ---------------------------------------------
+
+namespace {
+
+void set_bucket_id_vector(::google::protobuf::RepeatedPtrField<protobuf::BucketId>& dest,
+ const std::vector<document::BucketId>& src)
+{
+ assert(src.size() <= INT_MAX);
+ dest.Reserve(static_cast<int>(src.size()));
+ for (const auto& bucket_id : src) {
+ set_bucket_id(*dest.Add(), bucket_id);
+ }
+}
+
+std::vector<document::BucketId> get_bucket_id_vector(const ::google::protobuf::RepeatedPtrField<protobuf::BucketId>& src) {
+ std::vector<document::BucketId> ids;
+ ids.reserve(src.size());
+ for (const auto& proto_bucket : src) {
+ ids.emplace_back(proto_bucket.raw_id());
+ }
+ return ids;
+}
+
+void set_visitor_params(::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& dest,
+ const vdslib::Parameters& src)
+{
+ assert(src.size() <= INT_MAX);
+ dest.Reserve(static_cast<int>(src.size()));
+ for (const auto& kv : src) {
+ auto* proto_kv = dest.Add();
+ proto_kv->set_key(kv.first.data(), kv.first.size());
+ proto_kv->set_value(kv.second.data(), kv.second.size());
+ }
+}
+
+vdslib::Parameters get_visitor_params(const ::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& src) {
+ vdslib::Parameters params;
+ for (const auto& proto_kv : src) {
+ params.set(proto_kv.key(), proto_kv.value());
+ }
+ return params;
+}
+
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_message_factory() {
+ return make_codec<CreateVisitorMessage, protobuf::CreateVisitorRequest>(
+ [](const CreateVisitorMessage& src, protobuf::CreateVisitorRequest& dest) {
+ dest.set_visitor_library_name(src.getLibraryName().data(), src.getLibraryName().size());
+ dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size());
+ dest.set_control_destination(src.getControlDestination().data(), src.getControlDestination().size());
+ dest.set_data_destination(src.getDataDestination().data(), src.getDataDestination().size());
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ dest.set_max_pending_reply_count(src.getMaximumPendingReplyCount());
+
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ set_bucket_id_vector(*dest.mutable_buckets(), src.getBuckets());
+
+ dest.set_from_timestamp(src.getFromTimestamp());
+ dest.set_to_timestamp(src.getToTimestamp());
+ dest.set_visit_tombstones(src.visitRemoves());
+ set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet());
+ dest.set_visit_inconsistent_buckets(src.visitInconsistentBuckets());
+ dest.set_max_buckets_per_visitor(src.getMaxBucketsPerVisitor());
+
+ set_visitor_params(*dest.mutable_parameters(), src.getParameters());
+ },
+ [](const protobuf::CreateVisitorRequest& src) {
+ auto msg = std::make_unique<CreateVisitorMessage>();
+ msg->setLibraryName(src.visitor_library_name());
+ msg->setInstanceId(src.instance_id());
+ msg->setControlDestination(src.control_destination());
+ msg->setDataDestination(src.data_destination());
+ msg->setDocumentSelection(get_raw_selection(src.selection()));
+ msg->setMaximumPendingReplyCount(src.max_pending_reply_count());
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ msg->setBuckets(get_bucket_id_vector(src.buckets()));
+ msg->setFromTimestamp(src.from_timestamp());
+ msg->setToTimestamp(src.to_timestamp());
+ msg->setVisitRemoves(src.visit_tombstones());
+ msg->setFieldSet(get_raw_field_set(src.field_set()));
+ msg->setVisitInconsistentBuckets(src.visit_inconsistent_buckets());
+ msg->setMaxBucketsPerVisitor(src.max_buckets_per_visitor());
+ msg->setVisitorDispatcherVersion(50); // Hard-coded; same as for v6 serialization
+ msg->setParameters(get_visitor_params(src.parameters()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_reply_factory() {
+ return make_codec<CreateVisitorReply, protobuf::CreateVisitorResponse>(
+ [](const CreateVisitorReply& src, protobuf::CreateVisitorResponse& dest) {
+ set_bucket_id(*dest.mutable_last_bucket(), src.getLastBucket());
+ const auto& vs = src.getVisitorStatistics();
+ auto* stats = dest.mutable_statistics();
+ stats->set_buckets_visited(vs.getBucketsVisited());
+ stats->set_documents_visited(vs.getDocumentsVisited());
+ stats->set_bytes_visited(vs.getBytesVisited());
+ stats->set_documents_returned(vs.getDocumentsReturned());
+ stats->set_bytes_returned(vs.getBytesReturned());
+ },
+ [](const protobuf::CreateVisitorResponse& src) {
+ auto reply = std::make_unique<CreateVisitorReply>(DocumentProtocol::REPLY_CREATEVISITOR);
+ reply->setLastBucket(get_bucket_id(src.last_bucket()));
+ const auto& vs = src.statistics();
+ vdslib::VisitorStatistics stats;
+ stats.setBucketsVisited(vs.buckets_visited());
+ stats.setDocumentsVisited(vs.documents_visited());
+ stats.setBytesVisited(vs.bytes_visited());
+ stats.setDocumentsReturned(vs.documents_returned());
+ stats.setBytesReturned(vs.bytes_returned());
+ reply->setVisitorStatistics(stats);
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// DestroyVisitor request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_message_factory() {
+ return make_codec<DestroyVisitorMessage, protobuf::DestroyVisitorRequest>(
+ [](const DestroyVisitorMessage& src, protobuf::DestroyVisitorRequest& dest) {
+ dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size());
+ },
+ [](const protobuf::DestroyVisitorRequest& src) {
+ auto msg = std::make_unique<DestroyVisitorMessage>();
+ msg->setInstanceId(src.instance_id());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_reply_factory() {
+ return make_codec<VisitorReply, protobuf::DestroyVisitorResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DestroyVisitorResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DestroyVisitorResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DESTROYVISITOR);
+ }
+ );
+}
+
+// ---------------------------------------------
+// MapVisitor request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_message_factory() {
+ return make_codec<MapVisitorMessage, protobuf::MapVisitorRequest>(
+ [](const MapVisitorMessage& src, protobuf::MapVisitorRequest& dest) {
+ set_visitor_params(*dest.mutable_data(), src.getData());
+ },
+ [](const protobuf::MapVisitorRequest& src) {
+ auto msg = std::make_unique<MapVisitorMessage>();
+ msg->setData(get_visitor_params(src.data()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_reply_factory() {
+ return make_codec<VisitorReply, protobuf::MapVisitorResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::MapVisitorResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::MapVisitorResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_MAPVISITOR);
+ }
+ );
+}
+
+// ---------------------------------------------
+// QueryResult request and response
+// ---------------------------------------------
+
+namespace {
+
+void set_search_result(protobuf::SearchResult& dest, const vdslib::SearchResult& src) {
+ // We treat these as opaque blobs for now. Should ideally be protobuf as well.
+ vespalib::GrowableByteBuffer buf;
+ src.serialize(buf);
+ assert(buf.position() <= INT_MAX);
+ dest.set_payload(buf.getBuffer(), buf.position());
+}
+
+void set_document_summary(protobuf::DocumentSummary& dest, const vdslib::DocumentSummary& src) {
+ // We treat these as opaque blobs for now. Should ideally be protobuf as well.
+ vespalib::GrowableByteBuffer buf;
+ src.serialize(buf);
+ assert(buf.position() <= INT_MAX);
+ dest.set_payload(buf.getBuffer(), buf.position());
+}
+
+document::ByteBuffer wrap_as_buffer(std::string_view buf) {
+ assert(buf.size() <= UINT32_MAX);
+ return {buf.data(), static_cast<uint32_t>(buf.size())};
+}
+
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_message_factory() {
+ return make_codec<QueryResultMessage, protobuf::QueryResultRequest>(
+ [](const QueryResultMessage& src, protobuf::QueryResultRequest& dest) {
+ set_search_result(*dest.mutable_search_result(), src.getSearchResult());
+ set_document_summary(*dest.mutable_document_summary(), src.getDocumentSummary());
+ },
+ [](const protobuf::QueryResultRequest& src) {
+ auto msg = std::make_unique<QueryResultMessage>();
+ // Explicitly enforce presence of result/summary fields, as our object is not necessarily
+ // well-defined if these have not been initialized.
+ if (!src.has_search_result() || !src.has_document_summary()) {
+ throw document::DeserializeException("Query result does not have all required fields set", VESPA_STRLOC);
+ }
+ {
+ auto buf_view = wrap_as_buffer(src.search_result().payload()); // Must be lvalue
+ msg->getSearchResult().deserialize(buf_view);
+ }
+ {
+ auto buf_view = wrap_as_buffer(src.document_summary().payload()); // Also lvalue
+ msg->getDocumentSummary().deserialize(buf_view);
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_reply_factory() {
+ return make_codec<VisitorReply, protobuf::QueryResultResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::QueryResultResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::QueryResultResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_QUERYRESULT);
+ }
+ );
+}
+
+// ---------------------------------------------
+// VisitorInfo request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_message_factory() {
+ return make_codec<VisitorInfoMessage, protobuf::VisitorInfoRequest>(
+ [](const VisitorInfoMessage& src, protobuf::VisitorInfoRequest& dest) {
+ set_bucket_id_vector(*dest.mutable_finished_buckets(), src.getFinishedBuckets());
+ dest.set_error_message(src.getErrorMessage());
+ },
+ [](const protobuf::VisitorInfoRequest& src) {
+ auto msg = std::make_unique<VisitorInfoMessage>();
+ msg->setFinishedBuckets(get_bucket_id_vector(src.finished_buckets()));
+ msg->setErrorMessage(src.error_message());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_reply_factory() {
+ return make_codec<VisitorReply, protobuf::VisitorInfoResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::VisitorInfoResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::VisitorInfoResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_VISITORINFO);
+ }
+ );
+}
+
+// ---------------------------------------------
+// DocumentList request and response
+// TODO deprecate
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory>
+RoutableFactories80::document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<DocumentListMessage, protobuf::DocumentListRequest>(
+ [](const DocumentListMessage& src, protobuf::DocumentListRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ for (const auto& doc : src.getDocuments()) {
+ auto* proto_entry = dest.add_entries();
+ proto_entry->set_timestamp(doc.getTimestamp());
+ proto_entry->set_is_tombstone(doc.isRemoveEntry());
+ set_document(*proto_entry->mutable_document(), *doc.getDocument());
+ }
+ },
+ [type_repo = std::move(repo)](const protobuf::DocumentListRequest& src) {
+ auto msg = std::make_unique<DocumentListMessage>();
+ msg->setBucketId(get_bucket_id(src.bucket_id()));
+ for (const auto& proto_entry : src.entries()) {
+ auto doc = get_document_or_throw(proto_entry.document(), *type_repo);
+ msg->getDocuments().emplace_back(proto_entry.timestamp(), std::move(doc), proto_entry.is_tombstone());
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::document_list_reply_factory() {
+ return make_codec<VisitorReply, protobuf::DocumentListResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DocumentListResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DocumentListResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DOCUMENTLIST);
+ }
+ );
+}
+
+// ---------------------------------------------
+// EmptyBuckets request and response
+// TODO this should be deprecated
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_message_factory() {
+ return make_codec<EmptyBucketsMessage, protobuf::EmptyBucketsRequest>(
+ [](const EmptyBucketsMessage& src, protobuf::EmptyBucketsRequest& dest) {
+ set_bucket_id_vector(*dest.mutable_bucket_ids(), src.getBucketIds());
+ },
+ [](const protobuf::EmptyBucketsRequest& src) {
+ auto msg = std::make_unique<EmptyBucketsMessage>();
+ msg->setBucketIds(get_bucket_id_vector(src.bucket_ids()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_reply_factory() {
+ return make_codec<VisitorReply, protobuf::EmptyBucketsResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::EmptyBucketsResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::EmptyBucketsResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_EMPTYBUCKETS); // ugh
+ }
+ );
+}
+
+// ---------------------------------------------
+// GetBucketList request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_message_factory() {
+ return make_codec<GetBucketListMessage, protobuf::GetBucketListRequest>(
+ [](const GetBucketListMessage& src, protobuf::GetBucketListRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [](const protobuf::GetBucketListRequest& src) {
+ auto msg = std::make_unique<GetBucketListMessage>(get_bucket_id(src.bucket_id()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_reply_factory() {
+ return make_codec<GetBucketListReply, protobuf::GetBucketListResponse>(
+ [](const GetBucketListReply& src, protobuf::GetBucketListResponse& dest) {
+ auto* proto_info = dest.mutable_bucket_info();
+ assert(src.getBuckets().size() <= INT_MAX);
+ proto_info->Reserve(static_cast<int>(src.getBuckets().size()));
+ for (const auto& info : src.getBuckets()) {
+ auto* entry = proto_info->Add();
+ set_bucket_id(*entry->mutable_bucket_id(), info._bucket);
+ entry->set_info(info._bucketInformation.data(), info._bucketInformation.size());
+ }
+ },
+ [](const protobuf::GetBucketListResponse& src) {
+ auto reply = std::make_unique<GetBucketListReply>();
+ reply->getBuckets().reserve(src.bucket_info_size());
+ for (const auto& proto_info : src.bucket_info()) {
+ GetBucketListReply::BucketInfo info;
+ info._bucket = get_bucket_id(proto_info.bucket_id());
+ info._bucketInformation = proto_info.info();
+ reply->getBuckets().emplace_back(std::move(info));
+ }
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// GetBucketState request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_message_factory() {
+ return make_codec<GetBucketStateMessage, protobuf::GetBucketStateRequest>(
+ [](const GetBucketStateMessage& src, protobuf::GetBucketStateRequest& dest) {
+ // FIXME misses bucket space, but does not seem to be in use?
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ },
+ [](const protobuf::GetBucketStateRequest& src) {
+ return std::make_unique<GetBucketStateMessage>(get_bucket_id(src.bucket_id()));
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_reply_factory() {
+ return make_codec<GetBucketStateReply, protobuf::GetBucketStateResponse>(
+ [](const GetBucketStateReply& src, protobuf::GetBucketStateResponse& dest) {
+ assert(src.getBucketState().size() <= INT_MAX);
+ auto* proto_states = dest.mutable_states();
+ proto_states->Reserve(static_cast<int>(src.getBucketState().size()));
+ for (const auto& state : src.getBucketState()) {
+ auto* ps = proto_states->Add();
+ if (state.getDocumentId()) {
+ set_document_id(*ps->mutable_document_id(), *state.getDocumentId());
+ } else {
+ set_global_id(*ps->mutable_global_id(), state.getGlobalId());
+ }
+ ps->set_timestamp(state.getTimestamp());
+ ps->set_is_tombstone(state.isRemoveEntry());
+ }
+ },
+ [](const protobuf::GetBucketStateResponse& src) {
+ auto reply = std::make_unique<GetBucketStateReply>();
+ reply->getBucketState().reserve(src.states_size());
+ for (const auto& proto_state : src.states()) {
+ if (proto_state.has_document_id()) {
+ reply->getBucketState().emplace_back(get_document_id(proto_state.document_id()),
+ proto_state.timestamp(), proto_state.is_tombstone());
+ } else {
+ reply->getBucketState().emplace_back(get_global_id(proto_state.global_id()),
+ proto_state.timestamp(), proto_state.is_tombstone());
+ }
+ }
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// StatBucket request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_message_factory() {
+ return make_codec<StatBucketMessage, protobuf::StatBucketRequest>(
+ [](const StatBucketMessage& src, protobuf::StatBucketRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [](const protobuf::StatBucketRequest& src) {
+ auto msg = std::make_unique<StatBucketMessage>();
+ msg->setBucketId(get_bucket_id(src.bucket_id()));
+ msg->setDocumentSelection(get_raw_selection(src.selection()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_reply_factory() {
+ return make_codec<StatBucketReply, protobuf::StatBucketResponse>(
+ [](const StatBucketReply& src, protobuf::StatBucketResponse& dest) {
+ dest.set_results(src.getResults());
+ },
+ [](const protobuf::StatBucketResponse& src) {
+ auto reply = std::make_unique<StatBucketReply>();
+ reply->setResults(src.results());
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// WrongDistribution response (no request type)
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::wrong_distribution_reply_factory() {
+ return make_codec<WrongDistributionReply, protobuf::WrongDistributionResponse>(
+ [](const WrongDistributionReply& src, protobuf::WrongDistributionResponse& dest) {
+ dest.mutable_cluster_state()->set_state_string(src.getSystemState());
+ },
+ [](const protobuf::WrongDistributionResponse& src) {
+ auto reply = std::make_unique<WrongDistributionReply>();
+ reply->setSystemState(src.cluster_state().state_string());
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// DocumentIgnored response (no request type)
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::document_ignored_reply_factory() {
+ return make_codec<DocumentIgnoredReply, protobuf::DocumentIgnoredResponse>(
+ []([[maybe_unused]] const DocumentIgnoredReply& src, [[maybe_unused]] protobuf::DocumentIgnoredResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DocumentIgnoredResponse& src) {
+ return std::make_unique<DocumentIgnoredReply>();
+ }
+ );
+}
+
+} // documentapi::messagebus
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
new file mode 100644
index 00000000000..76da2f7dc9f
--- /dev/null
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
@@ -0,0 +1,72 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "iroutablefactory.h"
+
+namespace document { class DocumentTypeRepo; }
+
+namespace documentapi::messagebus {
+
+class RoutableFactories80 {
+public:
+ RoutableFactories80() = delete;
+
+ // CRUD messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_reply_factory();
+
+ // Visitor-related messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_reply_factory();
+
+ // Inspection-related messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_reply_factory();
+
+ // Polymorphic reply messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> wrong_distribution_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_ignored_reply_factory();
+};
+
+}
diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
index 54774f142ba..3e1ae07f7ca 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
@@ -14,11 +14,13 @@ RoutableRepository::VersionMap::VersionMap() :
_factoryVersions()
{ }
+RoutableRepository::VersionMap::~VersionMap() = default;
+
bool
RoutableRepository::VersionMap::putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory)
{
bool ret = _factoryVersions.find(version) != _factoryVersions.end();
- _factoryVersions[version] = factory;
+ _factoryVersions[version] = std::move(factory);
return ret;
}
@@ -28,14 +30,14 @@ RoutableRepository::VersionMap::getFactory(const vespalib::Version &version) con
{
const vespalib::VersionSpecification versionSpec{version.getMajor(), version.getMinor(), version.getMicro()};
- std::vector< std::pair<vespalib::VersionSpecification, IRoutableFactory::SP> > candidates;
- for (auto & entry : _factoryVersions) {
+ std::vector<std::pair<vespalib::VersionSpecification, IRoutableFactory::SP>> candidates;
+ for (const auto & entry : _factoryVersions) {
if (entry.first.compareTo(versionSpec) <= 0) {
- candidates.push_back(std::make_pair(entry.first, entry.second));
+ candidates.emplace_back(entry.first, entry.second);
}
}
if (candidates.empty()) {
- return IRoutableFactory::SP();
+ return {};
}
return std::max_element(candidates.begin(), candidates.end(),
@@ -54,7 +56,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
{
if (data.size() == 0) {
LOG(error, "Received empty byte array for deserialization.");
- return mbus::Routable::UP();
+ return {};
}
document::ByteBuffer in(data.data(), data.size());
@@ -64,7 +66,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
if (!factory) {
LOG(error, "No routable factory found for routable type %d (version %s).",
type, version.toString().c_str());
- return mbus::Routable::UP();
+ return {};
}
mbus::Routable::UP ret = factory->decode(in);
if (!ret) {
@@ -74,7 +76,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
std::ostringstream ost;
document::StringUtil::printAsHex(ost, data.data(), data.size());
LOG(error, "%s", ost.str().c_str());
- return mbus::Routable::UP();
+ return {};
}
return ret;
}
@@ -107,7 +109,7 @@ RoutableRepository::putFactory(const vespalib::VersionSpecification &version,
uint32_t type, IRoutableFactory::SP factory)
{
std::lock_guard guard(_lock);
- if (_factoryTypes[type].putFactory(version, factory)) {
+ if (_factoryTypes[type].putFactory(version, std::move(factory))) {
_cache.clear();
}
}
@@ -117,17 +119,17 @@ RoutableRepository::getFactory(const vespalib::Version &version, uint32_t type)
{
std::lock_guard guard(_lock);
CacheKey cacheKey(version, type);
- FactoryCache::const_iterator cit = _cache.find(cacheKey);
+ auto cit = _cache.find(cacheKey);
if (cit != _cache.end()) {
return cit->second;
}
- TypeMap::const_iterator vit = _factoryTypes.find(type);
+ auto vit = _factoryTypes.find(type);
if (vit == _factoryTypes.end()) {
- return IRoutableFactory::SP();
+ return {};
}
- IRoutableFactory::SP factory = vit->second.getFactory(version);
+ auto factory = vit->second.getFactory(version);
if (!factory) {
- return IRoutableFactory::SP();
+ return {};
}
_cache[cacheKey] = factory;
return factory;
diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
index 5060d1b3817..62b5103d8f5 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
@@ -27,8 +27,9 @@ private:
public:
VersionMap();
- bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory);
- IRoutableFactory::SP getFactory(const vespalib::Version &version) const;
+ ~VersionMap();
+ [[nodiscard]] bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory);
+ [[nodiscard]] IRoutableFactory::SP getFactory(const vespalib::Version &version) const;
};
using CacheKey = std::pair<vespalib::Version, uint32_t>;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 0373609e806..3ab62542ace 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -2,7 +2,6 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.jrt.Acceptor;
import com.yahoo.jrt.ListenFailedException;
@@ -301,15 +300,34 @@ public class RPCNetwork implements Network, MethodHandler {
return false;
}
+ private static Version deriveSupportedProtocolVersion() {
+ // This is a very leaky abstraction, but since MessageBus only exchanges versions
+ // (and not a set of supported protocols), we have to do this workaround.
+ // Disallow-version MUST be lower than that used as a protocol lower bound in
+ // DocumentProtocol.java and the exact same as that used in C++ for the same purposes.
+ // ... Or else!
+ // TODO remove this glorious hack once protobuf protocol is enabled by default
+ var maybeEnvVal = System.getenv("VESPA_MBUS_DOCUMENTAPI_USE_PROTOBUF");
+ if ("true".equals(maybeEnvVal) || "yes".equals(maybeEnvVal)) {
+ return new Version(8, 304); // _Allows_ new protobuf protocol
+ }
+ return new Version(8, 303); // _Disallows_ new protobuf protocol
+ }
+
+ private static final Version REPORTED_VERSION = deriveSupportedProtocolVersion();
+
/**
- * Returns the version of this network. This gets called when the "mbus.getVersion" method is invoked on this
- * network, and is separated into its own function so that unit tests can override it to simulate other versions
- * than current.
+ * Returns the (protocol) version of this network. This gets called when the "mbus.getVersion" method is invoked
+ * on this network, and is separated into its own function so that unit tests can override it to simulate other
+ * versions than current.
+ *
+ * Note that this version reflects the highest supported <em>protocol</em> version, and is not necessarily
+ * 1-1 with the actual Vespa release version of the underlying binary.
*
* @return the version to claim to be
*/
protected Version getVersion() {
- return Vtag.currentVersion;
+ return REPORTED_VERSION;
}
/**
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index cacd18430a7..f626e2c325b 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -18,6 +18,8 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <cstdlib>
+#include <string_view>
#include <thread>
#include <vespa/log/log.h>
@@ -25,6 +27,7 @@ LOG_SETUP(".rpcnetwork");
using vespalib::make_string;
using namespace std::chrono_literals;
+using namespace std::string_view_literals;
namespace mbus {
@@ -148,10 +151,26 @@ RPCNetwork::flushTargetPool()
_targetPool->flushTargets(true);
}
+namespace {
+
+[[nodiscard]] vespalib::Version derive_supported_protocol_version() {
+ // TODO remove this hilariously leaky abstraction once protobuf protocol is the default :D
+ // Disallow-version MUST be lower than that used as a protocol lower bound in documentprotocol.cpp
+ // and the exact same as that used in Java for the same purposes. Or else!
+ const char* maybe_env_val = getenv("VESPA_MBUS_DOCUMENTAPI_USE_PROTOBUF");
+ if (maybe_env_val && (("true"sv == maybe_env_val) || ("yes"sv == maybe_env_val))) {
+ return {8, 304}; // _Allows_ new protobuf protocol
+ }
+ return {8, 303}; // _Disallows_ new protobuf protocol
+}
+
+}
+
const vespalib::Version &
RPCNetwork::getVersion() const
{
- return vespalib::Vtag::currentVersion;
+ static vespalib::Version reported_version = derive_supported_protocol_version();
+ return reported_version;
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 05ccaecb2c5..40590d4545f 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -98,11 +98,15 @@ private:
protected:
/**
- * Returns the version of this network. This gets called when the
+ * Returns the (protocol) version of this network. This gets called when the
* "mbus.getVersion" method is invoked on this network, and is separated
* into its own function so that unit tests can override it to simulate
* other versions than current.
*
+ * Note that this version reflects the highest supported protocol version, and
+ * is not necessarily 1-1 with the actual Vespa release version of the
+ * underlying binary.
+ *
* @return The version to claim to be.
*/
virtual const vespalib::Version &getVersion() const;
diff --git a/vdslib/src/vespa/vdslib/container/parameters.cpp b/vdslib/src/vespa/vdslib/container/parameters.cpp
index 236b4970396..b5fbe96566d 100644
--- a/vdslib/src/vespa/vdslib/container/parameters.cpp
+++ b/vdslib/src/vespa/vdslib/container/parameters.cpp
@@ -19,6 +19,12 @@ Parameters::Parameters(document::ByteBuffer& buffer)
deserialize(buffer);
}
+Parameters::Parameters(const Parameters&) = default;
+Parameters& Parameters::operator=(const Parameters&) = default;
+
+Parameters::Parameters(Parameters&&) noexcept = default;
+Parameters& Parameters::operator=(Parameters&&) noexcept = default;
+
Parameters::~Parameters() = default;
size_t Parameters::getSerializedSize() const
diff --git a/vdslib/src/vespa/vdslib/container/parameters.h b/vdslib/src/vespa/vdslib/container/parameters.h
index d28e2cd9890..60ae3028719 100644
--- a/vdslib/src/vespa/vdslib/container/parameters.h
+++ b/vdslib/src/vespa/vdslib/container/parameters.h
@@ -46,6 +46,11 @@ public:
explicit Parameters(document::ByteBuffer& buffer);
~Parameters() override;
+ Parameters(const Parameters&);
+ Parameters& operator=(const Parameters&);
+ Parameters(Parameters&&) noexcept;
+ Parameters& operator=(Parameters&&) noexcept;
+
bool operator==(const Parameters &other) const;
size_t getSerializedSize() const;