diff options
50 files changed, 4181 insertions, 181 deletions
diff --git a/container-dev/pom.xml b/container-dev/pom.xml index 8c45a124e26..c3ceb0e8d07 100644 --- a/container-dev/pom.xml +++ b/container-dev/pom.xml @@ -104,6 +104,10 @@ <version>${project.version}</version> <exclusions> <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + <exclusion> <groupId>com.yahoo.vespa</groupId> <artifactId>http-utils</artifactId> </exclusion> diff --git a/container-documentapi/pom.xml b/container-documentapi/pom.xml index fbccc1a5184..7e6bd749b4a 100644 --- a/container-documentapi/pom.xml +++ b/container-documentapi/pom.xml @@ -37,6 +37,13 @@ <scope>provided</scope> </dependency> + <!-- documentapi needs protobuf runtime, and it's not provided from the container --> + <!-- TODO: Remove this when we have a better solution for protobuf --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + </dependencies> <build> diff --git a/document/abi-spec.json b/document/abi-spec.json index 899c107a242..ca06e2547d7 100644 --- a/document/abi-spec.json +++ b/document/abi-spec.json @@ -2724,7 +2724,8 @@ "abstract" ], "methods" : [ - "public abstract com.yahoo.io.GrowableByteBuffer getBuf()" + "public abstract com.yahoo.io.GrowableByteBuffer getBuf()", + "public abstract com.yahoo.document.DocumentTypeManager getTypeRepo()" ], "fields" : [ ] }, @@ -3015,6 +3016,7 @@ ], "methods" : [ "public final com.yahoo.document.DocumentTypeManager getDocumentTypeManager()", + "public com.yahoo.document.DocumentTypeManager getTypeRepo()", "public void read(com.yahoo.document.Document)", "public void read(com.yahoo.vespa.objects.FieldBase, com.yahoo.document.Document)", "public void read(com.yahoo.vespa.objects.FieldBase, com.yahoo.document.datatypes.FieldValue)", diff --git a/document/src/main/java/com/yahoo/document/Document.java b/document/src/main/java/com/yahoo/document/Document.java index 4bb93426994..294750f40f3 100644 --- a/document/src/main/java/com/yahoo/document/Document.java +++ b/document/src/main/java/com/yahoo/document/Document.java @@ -123,6 +123,7 @@ public class Document extends StructuredFieldValue { } public int getSerializedSize() throws SerializationException { + // TODO shouldn't this be createHead()? DocumentSerializer data = DocumentSerializerFactory.create6(new GrowableByteBuffer(64 * 1024, 2.0f)); data.write(this); return data.getBuf().position(); @@ -135,6 +136,7 @@ public class Document extends StructuredFieldValue { public final int getApproxSize() { return 4096; } public void serialize(OutputStream out) throws SerializationException { + // TODO shouldn't this be createHead()? DocumentSerializer writer = DocumentSerializerFactory.create6(new GrowableByteBuffer(64 * 1024, 2.0f)); writer.write(this); GrowableByteBuffer data = writer.getBuf(); diff --git a/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java b/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java index 59849d88c28..f6e3a75c7b2 100644 --- a/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java +++ b/document/src/main/java/com/yahoo/document/serialization/DocumentDeserializer.java @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document.serialization; +import com.yahoo.document.DocumentTypeManager; import com.yahoo.io.GrowableByteBuffer; /** @@ -17,5 +18,7 @@ public interface DocumentDeserializer extends DocumentReader, DocumentUpdateRead */ GrowableByteBuffer getBuf(); + DocumentTypeManager getTypeRepo(); + } diff --git a/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java b/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java index 9d52bc4aead..faaad95d5e1 100644 --- a/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java +++ b/document/src/main/java/com/yahoo/document/serialization/DocumentSerializer.java @@ -15,6 +15,6 @@ public interface DocumentSerializer extends DocumentWriter, SpanNodeWriter, Anno /** * Returns the underlying buffer used for serialization. */ - public GrowableByteBuffer getBuf(); + GrowableByteBuffer getBuf(); } diff --git a/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java b/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java index 5f24a2d8f60..b508f0d2c7c 100644 --- a/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java +++ b/document/src/main/java/com/yahoo/document/serialization/VespaDocumentDeserializer6.java @@ -91,6 +91,11 @@ public class VespaDocumentDeserializer6 extends BufferSerializer implements Docu final public DocumentTypeManager getDocumentTypeManager() { return manager; } + @Override + public DocumentTypeManager getTypeRepo() { + return manager; + } + public void read(Document document) { read(null, document); } diff --git a/documentapi-dependencies/pom.xml b/documentapi-dependencies/pom.xml index 7fd73017d8b..99b4616ffe7 100644 --- a/documentapi-dependencies/pom.xml +++ b/documentapi-dependencies/pom.xml @@ -16,6 +16,10 @@ <dependencies> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>annotations</artifactId> <version>${project.version}</version> diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index 0252da8a4d1..d00c89ae737 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -2078,6 +2078,7 @@ "public void <init>(com.yahoo.document.DocumentId, long, boolean)", "public void <init>(com.yahoo.document.GlobalId, long, boolean)", "public void <init>(com.yahoo.vespa.objects.Deserializer)", + "public boolean hasDocId()", "public com.yahoo.document.DocumentId getDocId()", "public com.yahoo.document.GlobalId getGid()", "public long getTimestamp()", @@ -2509,11 +2510,13 @@ "public" ], "methods" : [ + "public void <init>(java.lang.String, java.lang.String)", "public void <init>(java.lang.String)", "public java.lang.String getDocumentSelection()", "public com.yahoo.documentapi.messagebus.protocol.DocumentReply createReply()", "public int getType()", - "public com.yahoo.document.BucketId getBucketId()" + "public com.yahoo.document.BucketId getBucketId()", + "public java.lang.String getBucketSpace()" ], "fields" : [ ] }, @@ -3072,6 +3075,7 @@ ], "methods" : [ "public void <init>()", + "public void <init>(java.lang.String)", "public java.lang.String getResults()", "public void setResults(java.lang.String)" ], diff --git a/documentapi/pom.xml b/documentapi/pom.xml index 4b026d7f359..9188b803ca7 100644 --- a/documentapi/pom.xml +++ b/documentapi/pom.xml @@ -47,6 +47,10 @@ <artifactId>maven-compiler-plugin</artifactId> </plugin> <plugin> + <groupId>com.github.os72</groupId> + <artifactId>protoc-jar-maven-plugin</artifactId> + </plugin> + <plugin> <groupId>com.helger.maven</groupId> <artifactId>ph-javacc-maven-plugin</artifactId> <executions> diff --git a/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java new file mode 100644 index 00000000000..4331a3d461e --- /dev/null +++ b/documentapi/src/main/java/ai/vespa/documentapi/protobuf/package-info.java @@ -0,0 +1,5 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package ai.vespa.documentapi.protobuf; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java index ec49a0c570f..061d9e9afb9 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java @@ -256,9 +256,9 @@ public class DocumentProtocol implements Protocol { private DocumentProtocol(DocumentTypeManager docMan, String configId, DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) { - if (docMan != null) + if (docMan != null) { this.docMan = docMan; - else { + } else { this.docMan = new DocumentTypeManager(); DocumentTypeManagerConfigurer.configure(this.docMan, configId); } @@ -275,6 +275,11 @@ public class DocumentProtocol implements Protocol { putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory()); putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory()); + registerLegacyV6Factories(); + registerV8Factories(); + } + + private void registerLegacyV6Factories() { // Prepare version specifications to use when adding routable factories. VersionSpecification version6 = new VersionSpecification(6, 221); @@ -311,11 +316,48 @@ public class DocumentProtocol implements Protocol { putRoutableFactory(REPLY_REMOVELOCATION, new RoutableFactories60.RemoveLocationReplyFactory(), from6); putRoutableFactory(REPLY_STATBUCKET, new RoutableFactories60.StatBucketReplyFactory(), from6); putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6); - putRoutableFactory(REPLY_UPDATEDOCUMENT, new RoutableFactories60.UpdateDocumentReplyFactory(), from6); putRoutableFactory(REPLY_VISITORINFO, new RoutableFactories60.VisitorInfoReplyFactory(), from6); putRoutableFactory(REPLY_WRONGDISTRIBUTION, new RoutableFactories60.WrongDistributionReplyFactory(), from6); } + private void registerV8Factories() { + var version8 = new VersionSpecification(8, 304); // Must be same as in C++ impl + var from8 = Collections.singletonList(version8); + + putRoutableFactory(MESSAGE_CREATEVISITOR, RoutableFactories80.createCreateVisitorMessageFactory(), from8); + putRoutableFactory(MESSAGE_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorMessageFactory(), from8); + putRoutableFactory(MESSAGE_DOCUMENTLIST, RoutableFactories80.createDocumentListMessageFactory(), from8); + putRoutableFactory(MESSAGE_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsMessageFactory(), from8); + putRoutableFactory(MESSAGE_GETBUCKETLIST, RoutableFactories80.createGetBucketListMessageFactory(), from8); + putRoutableFactory(MESSAGE_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateMessageFactory(), from8); + putRoutableFactory(MESSAGE_GETDOCUMENT, RoutableFactories80.createGetDocumentMessageFactory(), from8); + putRoutableFactory(MESSAGE_MAPVISITOR, RoutableFactories80.createMapVisitorMessageFactory(), from8); + putRoutableFactory(MESSAGE_PUTDOCUMENT, RoutableFactories80.createPutDocumentMessageFactory(), from8); + putRoutableFactory(MESSAGE_QUERYRESULT, RoutableFactories80.createQueryResultMessageFactory(), from8); + putRoutableFactory(MESSAGE_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentMessageFactory(), from8); + putRoutableFactory(MESSAGE_REMOVELOCATION, RoutableFactories80.createRemoveLocationMessageFactory(), from8); + putRoutableFactory(MESSAGE_STATBUCKET, RoutableFactories80.createStatBucketMessageFactory(), from8); + putRoutableFactory(MESSAGE_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentMessageFactory(), from8); + putRoutableFactory(MESSAGE_VISITORINFO, RoutableFactories80.createVisitorInfoMessageFactory(), from8); + putRoutableFactory(REPLY_CREATEVISITOR, RoutableFactories80.createCreateVisitorReplyFactory(), from8); + putRoutableFactory(REPLY_DESTROYVISITOR, RoutableFactories80.createDestroyVisitorReplyFactory(), from8); + putRoutableFactory(REPLY_DOCUMENTIGNORED, RoutableFactories80.createDocumentIgnoredReplyFactory(), from8); + putRoutableFactory(REPLY_DOCUMENTLIST, RoutableFactories80.createDocumentListReplyFactory(), from8); + putRoutableFactory(REPLY_EMPTYBUCKETS, RoutableFactories80.createEmptyBucketsReplyFactory(), from8); + putRoutableFactory(REPLY_GETBUCKETLIST, RoutableFactories80.createGetBucketListReplyFactory(), from8); + putRoutableFactory(REPLY_GETBUCKETSTATE, RoutableFactories80.createGetBucketStateReplyFactory(), from8); + putRoutableFactory(REPLY_GETDOCUMENT, RoutableFactories80.createGetDocumentReplyFactory(), from8); + putRoutableFactory(REPLY_MAPVISITOR, RoutableFactories80.createMapVisitorReplyFactory(), from8); + putRoutableFactory(REPLY_PUTDOCUMENT, RoutableFactories80.createPutDocumentReplyFactory(), from8); + putRoutableFactory(REPLY_QUERYRESULT, RoutableFactories80.createQueryResultReplyFactory(), from8); + putRoutableFactory(REPLY_REMOVEDOCUMENT, RoutableFactories80.createRemoveDocumentReplyFactory(), from8); + putRoutableFactory(REPLY_REMOVELOCATION, RoutableFactories80.createRemoveLocationReplyFactory(), from8); + putRoutableFactory(REPLY_STATBUCKET, RoutableFactories80.createStatBucketReplyFactory(), from8); + putRoutableFactory(REPLY_UPDATEDOCUMENT, RoutableFactories80.createUpdateDocumentReplyFactory(), from8); + putRoutableFactory(REPLY_VISITORINFO, RoutableFactories80.createVisitorInfoReplyFactory(), from8); + putRoutableFactory(REPLY_WRONGDISTRIBUTION, RoutableFactories80.createWrongDistributionReplyFactory(), from8); + } + /** * Adds a new routable factory to this protocol. This method is thread-safe, and may be invoked on a protocol object * that is already in use by a message bus instance. Notice that the name you supply for a factory is the diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java index 727d1e4cd89..10ca1930317 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentState.java @@ -39,6 +39,8 @@ public class DocumentState implements Comparable<DocumentState> { removeEntry = buf.getByte(null)>0; } + public boolean hasDocId() { return docId != null; } + public DocumentId getDocId() { return docId; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java index 957e65c54e1..25862eb39f3 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RemoveLocationMessage.java @@ -6,15 +6,17 @@ import com.yahoo.document.select.BucketSelector; import java.util.Set; /** - * Message (VDS only) to remove an entire location for users using n= or g= schemes. + * Message to remove an entire location for users using n= or g= schemes. * We use a document selection so the user can specify a subset of those documents to be deleted * if they wish. */ public class RemoveLocationMessage extends DocumentMessage { String documentSelection; BucketId bucketId; + private final String bucketSpace; - public RemoveLocationMessage(String documentSelection) { + public RemoveLocationMessage(String documentSelection, String bucketSpace) { + this.bucketSpace = bucketSpace; try { this.documentSelection = documentSelection; BucketSelector bucketSel = new BucketSelector(new BucketIdFactory()); @@ -32,6 +34,10 @@ public class RemoveLocationMessage extends DocumentMessage { } } + public RemoveLocationMessage(String documentSelection) { + this(documentSelection, FixedBucketSpaces.defaultSpace()); + } + public String getDocumentSelection() { return documentSelection; } @@ -49,4 +55,8 @@ public class RemoveLocationMessage extends DocumentMessage { public BucketId getBucketId() { return bucketId; } + + public String getBucketSpace() { + return bucketSpace; + } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java new file mode 100644 index 00000000000..e03a7a05a4b --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutableFactories80.java @@ -0,0 +1,931 @@ +package com.yahoo.documentapi.messagebus.protocol; + +import ai.vespa.documentapi.protobuf.DocapiCommon; +import ai.vespa.documentapi.protobuf.DocapiFeed; +import ai.vespa.documentapi.protobuf.DocapiInspect; +import ai.vespa.documentapi.protobuf.DocapiVisiting; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import com.yahoo.document.BucketId; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.GlobalId; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.document.serialization.DocumentDeserializer; +import com.yahoo.document.serialization.DocumentDeserializerFactory; +import com.yahoo.document.serialization.DocumentSerializer; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.messagebus.Routable; +import com.yahoo.vdslib.DocumentSummary; +import com.yahoo.vdslib.SearchResult; +import com.yahoo.vdslib.VisitorStatistics; +import com.yahoo.vespa.objects.BufferSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Implementation of MessageBus message request/response serialization built around Protocol Buffers. + */ +abstract class RoutableFactories80 { + + private static class ProtobufCodec<DocApiT extends Routable, ProtoT extends AbstractMessage> implements RoutableFactory { + + private final Class<DocApiT> apiClass; + private final Function<DocApiT, ProtoT> encoderFn; + private final Function<DocumentDeserializer, DocApiT> decoderFn; + + ProtobufCodec(Class<DocApiT> apiClass, + Function<DocApiT, ProtoT> encoderFn, + Function<DocumentDeserializer, DocApiT> decoderFn) { + this.apiClass = apiClass; + this.encoderFn = encoderFn; + this.decoderFn = decoderFn; + } + + @Override + public boolean encode(Routable obj, DocumentSerializer out) { + try { + var protoMsg = encoderFn.apply(apiClass.cast(obj)); + var protoStream = CodedOutputStream.newInstance(out.getBuf().getByteBuffer()); // Not AutoCloseable... + try { + protoMsg.writeTo(protoStream); + } finally { + protoStream.flush(); + } + } catch (IOException | UnsupportedOperationException e) { + return false; + } + return true; + } + + @Override + public Routable decode(DocumentDeserializer in) { + return decoderFn.apply(in); + } + } + + private static class ProtobufCodecBuilder<DocApiT extends Routable, ProtoT extends AbstractMessage> { + + private final Class<DocApiT> apiClass; + private final Class<ProtoT> protoClass; + private Function<DocApiT, ProtoT> encoderFn; + private Function<DocumentDeserializer, DocApiT> decoderFn; + + ProtobufCodecBuilder(Class<DocApiT> apiClass, Class<ProtoT> protoClass) { + this.apiClass = apiClass; + this.protoClass = protoClass; + } + + static <DocApiT extends Routable, ProtoT extends AbstractMessage> ProtobufCodecBuilder<DocApiT, ProtoT> + of(Class<DocApiT> apiClass, Class<ProtoT> protoClass) { + return new ProtobufCodecBuilder<>(apiClass, protoClass); + } + + ProtobufCodecBuilder<DocApiT, ProtoT> encoder(Function<DocApiT, ProtoT> fn) { + if (encoderFn != null) { + throw new IllegalArgumentException("Encoder already set"); + } + encoderFn = fn; + return this; + } + + ProtobufCodecBuilder<DocApiT, ProtoT> decoder(ProtoT messagePrototype, Function<ProtoT, DocApiT> fn) { + if (decoderFn != null) { + throw new IllegalArgumentException("Decoder already set"); + } + decoderFn = (buf) -> { + try { + var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer()); + return fn.apply(protoClass.cast(protoObj)); + } catch (IOException e) { + return null; + } + }; + return this; + } + + ProtobufCodecBuilder<DocApiT, ProtoT> decoderWithRepo(ProtoT messagePrototype, BiFunction<ProtoT, DocumentTypeManager, DocApiT> fn) { + if (decoderFn != null) { + throw new IllegalArgumentException("Decoder already set"); + } + decoderFn = (buf) -> { + try { + var protoObj = messagePrototype.getParserForType().parseFrom(buf.getBuf().getByteBuffer()); + return fn.apply(protoClass.cast(protoObj), buf.getTypeRepo()); + } catch (IOException e) { + return null; + } + }; + return this; + } + + ProtobufCodec<DocApiT, ProtoT> build() { + Objects.requireNonNull(encoderFn, "Encoder has not been set"); + Objects.requireNonNull(decoderFn, "Decoder has not been set"); + return new ProtobufCodec<>(apiClass, encoderFn, decoderFn); + } + } + + // Protobuf codec helpers for common types + + private static DocapiCommon.GlobalId toProtoGlobalId(GlobalId gid) { + return DocapiCommon.GlobalId.newBuilder().setRawGid(ByteString.copyFrom(gid.getRawId())).build(); + } + + private static GlobalId fromProtoGlobalId(DocapiCommon.GlobalId gid) { + return new GlobalId(gid.getRawGid().toByteArray()); + } + + private static DocapiCommon.BucketId toProtoBucketId(BucketId id) { + return DocapiCommon.BucketId.newBuilder().setRawId(id.getRawId()).build(); + } + + private static BucketId fromProtoBucketId(DocapiCommon.BucketId id) { + return new BucketId(id.getRawId()); + } + + private static DocapiCommon.DocumentId toProtoDocId(DocumentId id) { + return DocapiCommon.DocumentId.newBuilder().setId(id.toString()).build(); + } + + private static DocumentId fromProtoDocId(DocapiCommon.DocumentId id) { + return new DocumentId(id.getId()); + } + + private static DocapiCommon.FieldSet toProtoFieldSet(String rawFieldSpec) { + return DocapiCommon.FieldSet.newBuilder().setSpec(rawFieldSpec).build(); + } + + private static String fromProtoFieldSet(DocapiCommon.FieldSet fieldSet) { + return fieldSet.getSpec(); + } + + private static ByteBuffer serializeDoc(Document doc) { + var buf = new GrowableByteBuffer(); + doc.serialize(buf); + buf.flip(); + return buf.getByteBuffer(); + } + + private static DocapiCommon.Document toProtoDocument(Document doc) { + // TODO a lot of copying here... Consider adding Document serialization to OutputStream + // so that we can serialize directly into a ByteString.Output instance. + return toProtoDocument(serializeDoc(doc)); + } + + private static DocapiCommon.Document toProtoDocument(ByteBuffer rawDocData) { + return DocapiCommon.Document.newBuilder() + .setPayload(ByteString.copyFrom(rawDocData)) + .build(); + } + + private static Document fromProtoDocument(DocapiCommon.Document protoDoc, DocumentTypeManager repo) { + var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoDoc.getPayload().asReadOnlyByteBuffer())); + return Document.createDocument(deserializer); + } + + private static Document deserializeDoc(ByteBuffer rawDocData, DocumentTypeManager repo) { + var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(rawDocData)); + return Document.createDocument(deserializer); + } + + private static DocapiFeed.TestAndSetCondition toProtoTasCondition(TestAndSetCondition tasCond) { + return DocapiFeed.TestAndSetCondition.newBuilder() + .setSelection(tasCond.getSelection()) + .build(); + } + + private static TestAndSetCondition fromProtoTasCondition(DocapiFeed.TestAndSetCondition protoTasCond) { + // Note: the empty (default) string implies "no condition present" + return new TestAndSetCondition(protoTasCond.getSelection()); + } + + private static ByteBuffer serializeUpdate(DocumentUpdate update) { + var buf = new GrowableByteBuffer(); + update.serialize(buf); + buf.flip(); + return buf.getByteBuffer(); + } + + private static DocapiFeed.DocumentUpdate toProtoUpdate(DocumentUpdate update) { + // TODO also consider DocumentUpdate serialization directly to OutputStream to avoid unneeded copying + return DocapiFeed.DocumentUpdate.newBuilder() + .setPayload(ByteString.copyFrom(serializeUpdate(update))) + .build(); + } + + private static DocumentUpdate fromProtoUpdate(DocapiFeed.DocumentUpdate protoUpdate, DocumentTypeManager repo) { + var deserializer = DocumentDeserializerFactory.createHead(repo, new GrowableByteBuffer(protoUpdate.getPayload().asReadOnlyByteBuffer())); + return new DocumentUpdate(deserializer); + } + + private static DocapiCommon.DocumentSelection toProtoDocumentSelection(String rawSelection) { + return DocapiCommon.DocumentSelection.newBuilder() + .setSelection(rawSelection) + .build(); + } + + private static String fromProtoDocumentSelection(DocapiCommon.DocumentSelection protoSelection) { + return protoSelection.getSelection(); + } + + private static DocapiCommon.BucketSpace toProtoBucketSpace(String spaceName) { + return DocapiCommon.BucketSpace.newBuilder() + .setName(spaceName) + .build(); + } + + private static String fromProtoBucketSpace(DocapiCommon.BucketSpace protoSpace) { + return protoSpace.getName(); + } + + private static DocapiCommon.ClusterState toProtoClusterState(String stateStr) { + return DocapiCommon.ClusterState.newBuilder().setStateString(stateStr).build(); + } + + private static String fromProtoClusterState(DocapiCommon.ClusterState state) { + return state.getStateString(); + } + + // Message codec implementations + + // --------------------------------------------- + // Get request and response + // --------------------------------------------- + + static RoutableFactory createGetDocumentMessageFactory() { + return ProtobufCodecBuilder + .of(GetDocumentMessage.class, DocapiFeed.GetDocumentRequest.class) + .encoder((apiMsg) -> + DocapiFeed.GetDocumentRequest.newBuilder() + .setDocumentId(toProtoDocId(apiMsg.getDocumentId())) + .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet())) + .build()) + .decoder(DocapiFeed.GetDocumentRequest.getDefaultInstance(), (protoMsg) -> + new GetDocumentMessage( + fromProtoDocId(protoMsg.getDocumentId()), + fromProtoFieldSet(protoMsg.getFieldSet()))) + .build(); + } + + static RoutableFactory createGetDocumentReplyFactory() { + return ProtobufCodecBuilder + .of(GetDocumentReply.class, DocapiFeed.GetDocumentResponse.class) + .encoder((apiReply) -> { + var builder = DocapiFeed.GetDocumentResponse.newBuilder() + .setLastModified(apiReply.getLastModified()); + var maybeDoc = apiReply.getDocument(); + if (maybeDoc != null) { + builder.setDocument(toProtoDocument(serializeDoc(maybeDoc))); + } + return builder.build(); + }) + .decoderWithRepo(DocapiFeed.GetDocumentResponse.getDefaultInstance(), (protoReply, repo) -> { + GetDocumentReply reply; + if (protoReply.hasDocument()) { + var doc = fromProtoDocument(protoReply.getDocument(), repo); + doc.setLastModified(protoReply.getLastModified()); + reply = new GetDocumentReply(doc); + } else { + reply = new GetDocumentReply(null); + } + reply.setLastModified(protoReply.getLastModified()); + return reply; + }) + .build(); + } + + // --------------------------------------------- + // Put request and response + // --------------------------------------------- + + static RoutableFactory createPutDocumentMessageFactory() { + return ProtobufCodecBuilder + .of(PutDocumentMessage.class, DocapiFeed.PutDocumentRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiFeed.PutDocumentRequest.newBuilder() + .setForceAssignTimestamp(apiMsg.getTimestamp()) + .setCreateIfMissing(apiMsg.getCreateIfNonExistent()) + .setDocument(toProtoDocument(apiMsg.getDocumentPut().getDocument())); + if (apiMsg.getCondition().isPresent()) { + builder.setCondition(toProtoTasCondition(apiMsg.getCondition())); + } + return builder.build(); + }) + .decoderWithRepo(DocapiFeed.PutDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> { + var doc = fromProtoDocument(protoMsg.getDocument(), repo); + var msg = new PutDocumentMessage(new DocumentPut(doc)); + if (protoMsg.hasCondition()) { + msg.setCondition(fromProtoTasCondition(protoMsg.getCondition())); + } + msg.setTimestamp(protoMsg.getForceAssignTimestamp()); + msg.setCreateIfNonExistent(protoMsg.getCreateIfMissing()); + return msg; + }) + .build(); + } + + static RoutableFactory createPutDocumentReplyFactory() { + return ProtobufCodecBuilder + .of(WriteDocumentReply.class, DocapiFeed.PutDocumentResponse.class) + .encoder((apiReply) -> + DocapiFeed.PutDocumentResponse.newBuilder() + .setModificationTimestamp(apiReply.getHighestModificationTimestamp()) + .build()) + .decoder(DocapiFeed.PutDocumentResponse.getDefaultInstance(), (protoReply) -> { + var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT); + reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp()); + return reply; + }) + .build(); + } + + // --------------------------------------------- + // Update request and response + // --------------------------------------------- + + static RoutableFactory createUpdateDocumentMessageFactory() { + return ProtobufCodecBuilder + .of(UpdateDocumentMessage.class, DocapiFeed.UpdateDocumentRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiFeed.UpdateDocumentRequest.newBuilder() + .setUpdate(toProtoUpdate(apiMsg.getDocumentUpdate())) + .setExpectedOldTimestamp(apiMsg.getOldTimestamp()) + .setForceAssignTimestamp(apiMsg.getNewTimestamp()); + if (apiMsg.getCondition().isPresent()) { + builder.setCondition(toProtoTasCondition(apiMsg.getCondition())); + } + return builder.build(); + }) + .decoderWithRepo(DocapiFeed.UpdateDocumentRequest.getDefaultInstance(), (protoMsg, repo) -> { + var msg = new UpdateDocumentMessage(fromProtoUpdate(protoMsg.getUpdate(), repo)); + msg.setOldTimestamp(protoMsg.getExpectedOldTimestamp()); + msg.setNewTimestamp(protoMsg.getForceAssignTimestamp()); + if (protoMsg.hasCondition()) { + msg.setCondition(fromProtoTasCondition(protoMsg.getCondition())); + } + return msg; + }) + .build(); + } + + static RoutableFactory createUpdateDocumentReplyFactory() { + return ProtobufCodecBuilder + .of(UpdateDocumentReply.class, DocapiFeed.UpdateDocumentResponse.class) + .encoder((apiReply) -> + DocapiFeed.UpdateDocumentResponse.newBuilder() + .setModificationTimestamp(apiReply.getHighestModificationTimestamp()) + .setWasFound(apiReply.wasFound()) + .build()) + .decoder(DocapiFeed.UpdateDocumentResponse.getDefaultInstance(), (protoReply) -> { + var reply = new UpdateDocumentReply(); + reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp()); + reply.setWasFound(protoReply.getWasFound()); + return reply; + }) + .build(); + } + + // --------------------------------------------- + // Remove request and response + // --------------------------------------------- + + static RoutableFactory createRemoveDocumentMessageFactory() { + return ProtobufCodecBuilder + .of(RemoveDocumentMessage.class, DocapiFeed.RemoveDocumentRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiFeed.RemoveDocumentRequest.newBuilder() + .setDocumentId(toProtoDocId(apiMsg.getDocumentId())); + if (apiMsg.getCondition().isPresent()) { + builder.setCondition(toProtoTasCondition(apiMsg.getCondition())); + } + return builder.build(); + }) + .decoder(DocapiFeed.RemoveDocumentRequest.getDefaultInstance(), (protoMsg) -> { + var msg = new RemoveDocumentMessage(fromProtoDocId(protoMsg.getDocumentId())); + if (protoMsg.hasCondition()) { + msg.setCondition(fromProtoTasCondition(protoMsg.getCondition())); + } + return msg; + }) + .build(); + } + + static RoutableFactory createRemoveDocumentReplyFactory() { + return ProtobufCodecBuilder + .of(RemoveDocumentReply.class, DocapiFeed.RemoveDocumentResponse.class) + .encoder((apiReply) -> + DocapiFeed.RemoveDocumentResponse.newBuilder() + .setWasFound(apiReply.wasFound()) + .setModificationTimestamp(apiReply.getHighestModificationTimestamp()) + .build()) + .decoder(DocapiFeed.RemoveDocumentResponse.getDefaultInstance(), (protoReply) -> { + var reply = new RemoveDocumentReply(); + reply.setWasFound(protoReply.getWasFound()); + reply.setHighestModificationTimestamp(protoReply.getModificationTimestamp()); + return reply; + }) + .build(); + } + + // --------------------------------------------- + // RemoveLocation request and response + // --------------------------------------------- + + static RoutableFactory createRemoveLocationMessageFactory() { + return ProtobufCodecBuilder + .of(RemoveLocationMessage.class, DocapiFeed.RemoveLocationRequest.class) + .encoder((apiMsg) -> + DocapiFeed.RemoveLocationRequest.newBuilder() + .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace())) + .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection())) + .build()) + .decoder(DocapiFeed.RemoveLocationRequest.getDefaultInstance(), (protoMsg) -> + new RemoveLocationMessage( + fromProtoDocumentSelection(protoMsg.getSelection()), + fromProtoBucketSpace(protoMsg.getBucketSpace()))) + .build(); + } + + static RoutableFactory createRemoveLocationReplyFactory() { + return ProtobufCodecBuilder + .of(DocumentReply.class, DocapiFeed.RemoveLocationResponse.class) + .encoder((apiReply) -> DocapiFeed.RemoveLocationResponse.newBuilder().build()) + .decoder(DocapiFeed.RemoveLocationResponse.getDefaultInstance(), + (protoReply) -> new DocumentReply(DocumentProtocol.REPLY_REMOVELOCATION)) + .build(); + } + + // --------------------------------------------- + // CreateVisitor request and response + // --------------------------------------------- + + private static DocapiVisiting.VisitorParameter toProtoVisitorParameter(String key, byte[] value) { + return DocapiVisiting.VisitorParameter.newBuilder() + .setKey(key) + .setValue(ByteString.copyFrom(value)) + .build(); + } + + static RoutableFactory createCreateVisitorMessageFactory() { + return ProtobufCodecBuilder + .of(CreateVisitorMessage.class, DocapiVisiting.CreateVisitorRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiVisiting.CreateVisitorRequest.newBuilder() + .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace())) + .setVisitorLibraryName(apiMsg.getLibraryName()) + .setInstanceId(apiMsg.getInstanceId()) + .setControlDestination(apiMsg.getControlDestination()) + .setDataDestination(apiMsg.getDataDestination()) + .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection())) + .setFieldSet(toProtoFieldSet(apiMsg.getFieldSet())) + .setMaxPendingReplyCount(apiMsg.getMaxPendingReplyCount()) + .setFromTimestamp(apiMsg.getFromTimestamp()) + .setToTimestamp(apiMsg.getToTimestamp()) + .setVisitTombstones(apiMsg.getVisitRemoves()) + .setVisitInconsistentBuckets(apiMsg.getVisitInconsistentBuckets()) + .setMaxBucketsPerVisitor(apiMsg.getMaxBucketsPerVisitor()); + for (var id : apiMsg.getBuckets()) { + builder.addBuckets(toProtoBucketId(id)); + } + for (var entry : apiMsg.getParameters().entrySet()) { + builder.addParameters(toProtoVisitorParameter(entry.getKey(), entry.getValue())); + } + return builder.build(); + }) + .decoder(DocapiVisiting.CreateVisitorRequest.getDefaultInstance(), (protoMsg) -> { + var msg = new CreateVisitorMessage(); + msg.setBucketSpace(fromProtoBucketSpace(protoMsg.getBucketSpace())); + msg.setLibraryName(protoMsg.getVisitorLibraryName()); + msg.setInstanceId(protoMsg.getInstanceId()); + msg.setControlDestination(protoMsg.getControlDestination()); + msg.setDataDestination(protoMsg.getDataDestination()); + msg.setDocumentSelection(fromProtoDocumentSelection(protoMsg.getSelection())); + msg.setFieldSet(fromProtoFieldSet(protoMsg.getFieldSet())); + msg.setMaxPendingReplyCount(protoMsg.getMaxPendingReplyCount()); + msg.setFromTimestamp(protoMsg.getFromTimestamp()); + msg.setToTimestamp(protoMsg.getToTimestamp()); + msg.setVisitRemoves(protoMsg.getVisitTombstones()); + msg.setVisitInconsistentBuckets(protoMsg.getVisitInconsistentBuckets()); + msg.setMaxBucketsPerVisitor(protoMsg.getMaxBucketsPerVisitor()); + for (var protoId : protoMsg.getBucketsList()) { + msg.getBuckets().add(fromProtoBucketId(protoId)); + } + for (var protoParam : protoMsg.getParametersList()) { + msg.getParameters().put(protoParam.getKey(), protoParam.getValue().toByteArray()); + } + return msg; + }) + .build(); + } + + static RoutableFactory createCreateVisitorReplyFactory() { + return ProtobufCodecBuilder + .of(CreateVisitorReply.class, DocapiVisiting.CreateVisitorResponse.class) + .encoder((apiReply) -> { + var stats = apiReply.getVisitorStatistics(); + return DocapiVisiting.CreateVisitorResponse.newBuilder() + .setLastBucket(toProtoBucketId(apiReply.getLastBucket())) + .setStatistics(DocapiVisiting.VisitorStatistics.newBuilder() + .setBucketsVisited(stats.getBucketsVisited()) + .setDocumentsVisited(stats.getDocumentsVisited()) + .setBytesVisited(stats.getBytesVisited()) + .setDocumentsReturned(stats.getDocumentsReturned()) + .setBytesReturned(stats.getBytesReturned()) + .build()) + .build(); + }) + .decoder(DocapiVisiting.CreateVisitorResponse.getDefaultInstance(), (protoReply) -> { + var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR); + reply.setLastBucket(fromProtoBucketId(protoReply.getLastBucket())); + var protoVs = protoReply.getStatistics(); + var vs = new VisitorStatistics(); + vs.setBucketsVisited(protoVs.getBucketsVisited()); + vs.setDocumentsVisited(protoVs.getDocumentsVisited()); + vs.setBytesVisited(protoVs.getBytesVisited()); + vs.setDocumentsReturned(protoVs.getDocumentsReturned()); + vs.setBytesReturned(protoVs.getBytesReturned()); + reply.setVisitorStatistics(vs); + return reply; + }) + .build(); + } + + // --------------------------------------------- + // DestroyVisitor request and response + // --------------------------------------------- + + static RoutableFactory createDestroyVisitorMessageFactory() { + return ProtobufCodecBuilder + .of(DestroyVisitorMessage.class, DocapiVisiting.DestroyVisitorRequest.class) + .encoder((apiMsg) -> + DocapiVisiting.DestroyVisitorRequest.newBuilder() + .setInstanceId(apiMsg.getInstanceId()) + .build()) + .decoder(DocapiVisiting.DestroyVisitorRequest.getDefaultInstance(), + (protoMsg) -> new DestroyVisitorMessage(protoMsg.getInstanceId())) + .build(); + } + + static RoutableFactory createDestroyVisitorReplyFactory() { + return ProtobufCodecBuilder + .of(VisitorReply.class, DocapiVisiting.DestroyVisitorResponse.class) + .encoder((apiReply) -> DocapiVisiting.DestroyVisitorResponse.newBuilder().build()) + .decoder(DocapiVisiting.DestroyVisitorResponse.getDefaultInstance(), + (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DESTROYVISITOR)) + .build(); + } + + // --------------------------------------------- + // MapVisitor request and response + // --------------------------------------------- + + static RoutableFactory createMapVisitorMessageFactory() { + return ProtobufCodecBuilder + .of(MapVisitorMessage.class, DocapiVisiting.MapVisitorRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiVisiting.MapVisitorRequest.newBuilder(); + for (var entry : apiMsg.getData().entrySet()) { + // FIXME MapVisitorMessage uses Parameters (i.e. string -> bytes) in C++, but string -> string in Java... + // ... but due to this, UTF-8 is effectively enforced anyway. Not that anything actually uses this :I + builder.addData(toProtoVisitorParameter(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8))); + } + return builder.build(); + }) + .decoder(DocapiVisiting.MapVisitorRequest.getDefaultInstance(), (protoMsg) -> { + var msg = new MapVisitorMessage(); + for (var param : protoMsg.getDataList()) { + msg.getData().put(param.getKey(), param.getValue().toStringUtf8()); + } + return msg; + }) + .build(); + } + + static RoutableFactory createMapVisitorReplyFactory() { + return ProtobufCodecBuilder + .of(VisitorReply.class, DocapiVisiting.MapVisitorResponse.class) + .encoder((apiReply) -> DocapiVisiting.MapVisitorResponse.newBuilder().build()) + .decoder(DocapiVisiting.MapVisitorResponse.getDefaultInstance(), + (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_MAPVISITOR)) + .build(); + } + + // --------------------------------------------- + // QueryResult request and response + // --------------------------------------------- + + static RoutableFactory createQueryResultMessageFactory() { + return ProtobufCodecBuilder + .of(QueryResultMessage.class, DocapiVisiting.QueryResultRequest.class) + .encoder((apiMsg) -> { + // Serialization of QueryResultMessages is not implemented in Java (receive only) + throw new UnsupportedOperationException("Serialization of QueryResultMessage instances is not supported"); + }) + .decoder(DocapiVisiting.QueryResultRequest.getDefaultInstance(), (protoMsg) -> { + var msg = new QueryResultMessage(); + // Explicitly enforce presence of result/summary fields, as our object is not necessarily + // well-defined if these have not been initialized. + if (!protoMsg.hasSearchResult() || !protoMsg.hasDocumentSummary()) { + throw new IllegalArgumentException("Query result does not have all required fields set"); + } + // We have to use toByteArray() instead of asReadOnlyByteBuffer(), as the deserialization routines + // try to fetch the raw arrays, which are considered mutable (causing a ReadOnlyBufferException). + msg.setSearchResult(new SearchResult(new BufferSerializer( + protoMsg.getSearchResult().getPayload().toByteArray()))); + msg.setSummary(new DocumentSummary(new BufferSerializer( + protoMsg.getDocumentSummary().getPayload().toByteArray()))); + return msg; + }) + .build(); + } + + static RoutableFactory createQueryResultReplyFactory() { + return ProtobufCodecBuilder + .of(VisitorReply.class, DocapiVisiting.QueryResultResponse.class) + .encoder((apiReply) -> DocapiVisiting.QueryResultResponse.newBuilder().build()) + .decoder(DocapiVisiting.QueryResultResponse.getDefaultInstance(), + (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_QUERYRESULT)) + .build(); + } + + // --------------------------------------------- + // VisitorInfo request and response + // --------------------------------------------- + + static RoutableFactory createVisitorInfoMessageFactory() { + return ProtobufCodecBuilder + .of(VisitorInfoMessage.class, DocapiVisiting.VisitorInfoRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiVisiting.VisitorInfoRequest.newBuilder() + .setErrorMessage(apiMsg.getErrorMessage()); + for (var id : apiMsg.getFinishedBuckets()) { + builder.addFinishedBuckets(toProtoBucketId(id)); + } + return builder.build(); + }) + .decoder(DocapiVisiting.VisitorInfoRequest.getDefaultInstance(), (protoMsg) -> { + var msg = new VisitorInfoMessage(); + msg.setErrorMessage(protoMsg.getErrorMessage()); + for (var protoId : protoMsg.getFinishedBucketsList()) { + msg.getFinishedBuckets().add(fromProtoBucketId(protoId)); + } + return msg; + }) + .build(); + } + + static RoutableFactory createVisitorInfoReplyFactory() { + return ProtobufCodecBuilder + .of(VisitorReply.class, DocapiVisiting.VisitorInfoResponse.class) + .encoder((apiReply) -> DocapiVisiting.VisitorInfoResponse.newBuilder().build()) + .decoder(DocapiVisiting.VisitorInfoResponse.getDefaultInstance(), + (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_VISITORINFO)) + .build(); + } + + // --------------------------------------------- + // DocumentList request and response + // TODO this should be deprecated + // --------------------------------------------- + + static RoutableFactory createDocumentListMessageFactory() { + return ProtobufCodecBuilder + .of(DocumentListMessage.class, DocapiVisiting.DocumentListRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiVisiting.DocumentListRequest.newBuilder() + .setBucketId(toProtoBucketId(apiMsg.getBucketId())); + for (var doc : apiMsg.getDocuments()) { + builder.addEntries(DocapiVisiting.DocumentListRequest.Entry.newBuilder() + .setTimestamp(doc.getTimestamp()) + .setIsTombstone(doc.isRemoveEntry()) + .setDocument(toProtoDocument(doc.getDocument()))); + } + return builder.build(); + }) + .decoderWithRepo(DocapiVisiting.DocumentListRequest.getDefaultInstance(), (protoMsg, repo) -> { + var msg = new DocumentListMessage(); + msg.setBucketId(fromProtoBucketId(protoMsg.getBucketId())); + for (var entry : protoMsg.getEntriesList()) { + msg.getDocuments().add(new DocumentListEntry( + fromProtoDocument(entry.getDocument(), repo), + entry.getTimestamp(), + entry.getIsTombstone())); + } + return msg; + }) + .build(); + } + + static RoutableFactory createDocumentListReplyFactory() { + return ProtobufCodecBuilder + .of(VisitorReply.class, DocapiVisiting.DocumentListResponse.class) + .encoder((apiReply) -> DocapiVisiting.DocumentListResponse.newBuilder().build()) + .decoder(DocapiVisiting.DocumentListResponse.getDefaultInstance(), + (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_DOCUMENTLIST)) + .build(); + } + + // --------------------------------------------- + // EmptyBuckets request and response + // TODO this should be deprecated + // --------------------------------------------- + + static RoutableFactory createEmptyBucketsMessageFactory() { + return ProtobufCodecBuilder + .of(EmptyBucketsMessage.class, DocapiVisiting.EmptyBucketsRequest.class) + .encoder((apiMsg) -> { + var builder = DocapiVisiting.EmptyBucketsRequest.newBuilder(); + for (var id : apiMsg.getBucketIds()) { + builder.addBucketIds(toProtoBucketId(id)); + } + return builder.build(); + }) + .decoder(DocapiVisiting.EmptyBucketsRequest.getDefaultInstance(), (protoMsg) -> { + var msg = new EmptyBucketsMessage(); + for (var protoId : protoMsg.getBucketIdsList()) { + msg.getBucketIds().add(fromProtoBucketId(protoId)); + } + return msg; + }) + .build(); + } + + static RoutableFactory createEmptyBucketsReplyFactory() { + return ProtobufCodecBuilder + .of(VisitorReply.class, DocapiVisiting.EmptyBucketsResponse.class) + .encoder((apiReply) -> DocapiVisiting.EmptyBucketsResponse.newBuilder().build()) + .decoder(DocapiVisiting.EmptyBucketsResponse.getDefaultInstance(), + (protoReply) -> new VisitorReply(DocumentProtocol.REPLY_EMPTYBUCKETS)) + .build(); + } + + // --------------------------------------------- + // GetBucketList request and response + // --------------------------------------------- + + static RoutableFactory createGetBucketListMessageFactory() { + return ProtobufCodecBuilder + .of(GetBucketListMessage.class, DocapiInspect.GetBucketListRequest.class) + .encoder((apiMsg) -> + DocapiInspect.GetBucketListRequest.newBuilder() + .setBucketId(toProtoBucketId(apiMsg.getBucketId())) + .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace())) + .build()) + .decoder(DocapiInspect.GetBucketListRequest.getDefaultInstance(), (protoMsg) -> + new GetBucketListMessage( + fromProtoBucketId(protoMsg.getBucketId()), + fromProtoBucketSpace(protoMsg.getBucketSpace()))) + .build(); + } + + static RoutableFactory createGetBucketListReplyFactory() { + return ProtobufCodecBuilder + .of(GetBucketListReply.class, DocapiInspect.GetBucketListResponse.class) + .encoder((apiReply) -> { + var builder = DocapiInspect.GetBucketListResponse.newBuilder(); + for (var info : apiReply.getBuckets()) { + builder.addBucketInfo(DocapiInspect.BucketInformation.newBuilder() + .setBucketId(toProtoBucketId(info.getBucketId())) + .setInfo(info.getBucketInformation())); + } + return builder.build(); + }) + .decoder(DocapiInspect.GetBucketListResponse.getDefaultInstance(), (protoReply) -> { + var reply = new GetBucketListReply(); + for (var info : protoReply.getBucketInfoList()) { + reply.getBuckets().add(new GetBucketListReply.BucketInfo( + fromProtoBucketId(info.getBucketId()), + info.getInfo())); + } + return reply; + }) + .build(); + } + + // --------------------------------------------- + // GetBucketState request and response + // --------------------------------------------- + + static RoutableFactory createGetBucketStateMessageFactory() { + return ProtobufCodecBuilder + .of(GetBucketStateMessage.class, DocapiInspect.GetBucketStateRequest.class) + .encoder((apiMsg) -> + DocapiInspect.GetBucketStateRequest.newBuilder() + .setBucketId(toProtoBucketId(apiMsg.getBucketId())) + .build()) + .decoder(DocapiInspect.GetBucketStateRequest.getDefaultInstance(), (protoMsg) -> + new GetBucketStateMessage(fromProtoBucketId(protoMsg.getBucketId()))) + .build(); + } + + static RoutableFactory createGetBucketStateReplyFactory() { + return ProtobufCodecBuilder + .of(GetBucketStateReply.class, DocapiInspect.GetBucketStateResponse.class) + .encoder((apiReply) -> { + var builder = DocapiInspect.GetBucketStateResponse.newBuilder(); + for (var state : apiReply.getBucketState()) { + var stateBuilder = DocapiInspect.DocumentState.newBuilder() + .setTimestamp(state.getTimestamp()) + .setIsTombstone(state.isRemoveEntry()); + if (state.hasDocId()) { + stateBuilder.setDocumentId(toProtoDocId(state.getDocId())); + } else { + stateBuilder.setGlobalId(toProtoGlobalId(state.getGid())); + } + builder.addStates(stateBuilder); + } + return builder.build(); + }) + .decoder(DocapiInspect.GetBucketStateResponse.getDefaultInstance(), (protoReply) -> { + var reply = new GetBucketStateReply(); + for (var state : protoReply.getStatesList()) { + if (state.hasDocumentId()) { + reply.getBucketState().add(new DocumentState( + fromProtoDocId(state.getDocumentId()), + state.getTimestamp(), + state.getIsTombstone())); + } else { + reply.getBucketState().add(new DocumentState( + fromProtoGlobalId(state.getGlobalId()), + state.getTimestamp(), + state.getIsTombstone())); + } + } + return reply; + }) + .build(); + } + + // --------------------------------------------- + // StatBucket request and response + // --------------------------------------------- + + static RoutableFactory createStatBucketMessageFactory() { + return ProtobufCodecBuilder + .of(StatBucketMessage.class, DocapiInspect.StatBucketRequest.class) + .encoder((apiMsg) -> + DocapiInspect.StatBucketRequest.newBuilder() + .setBucketId(toProtoBucketId(apiMsg.getBucketId())) + .setBucketSpace(toProtoBucketSpace(apiMsg.getBucketSpace())) + .setSelection(toProtoDocumentSelection(apiMsg.getDocumentSelection())) + .build()) + .decoder(DocapiInspect.StatBucketRequest.getDefaultInstance(), (protoMsg) -> + new StatBucketMessage( + fromProtoBucketId(protoMsg.getBucketId()), + fromProtoBucketSpace(protoMsg.getBucketSpace()), + fromProtoDocumentSelection(protoMsg.getSelection()))) + .build(); + } + + static RoutableFactory createStatBucketReplyFactory() { + return ProtobufCodecBuilder + .of(StatBucketReply.class, DocapiInspect.StatBucketResponse.class) + .encoder((apiReply) -> + DocapiInspect.StatBucketResponse.newBuilder() + .setResults(apiReply.getResults()) + .build()) + .decoder(DocapiInspect.StatBucketResponse.getDefaultInstance(), (protoReply) -> + new StatBucketReply(protoReply.getResults())) + .build(); + } + + // --------------------------------------------- + // WrongDistribution response (no request type) + // --------------------------------------------- + + static RoutableFactory createWrongDistributionReplyFactory() { + return ProtobufCodecBuilder + .of(WrongDistributionReply.class, DocapiCommon.WrongDistributionResponse.class) + .encoder((apiReply) -> + DocapiCommon.WrongDistributionResponse.newBuilder() + .setClusterState(toProtoClusterState(apiReply.getSystemState())) + .build()) + .decoder(DocapiCommon.WrongDistributionResponse.getDefaultInstance(), (protoReply) -> + new WrongDistributionReply(fromProtoClusterState(protoReply.getClusterState()))) + .build(); + } + + // --------------------------------------------- + // DocumentIgnored response (no request type) + // --------------------------------------------- + + static RoutableFactory createDocumentIgnoredReplyFactory() { + return ProtobufCodecBuilder + .of(DocumentIgnoredReply.class, DocapiCommon.DocumentIgnoredResponse.class) + .encoder((apiReply) -> DocapiCommon.DocumentIgnoredResponse.newBuilder().build()) + .decoder(DocapiCommon.DocumentIgnoredResponse.getDefaultInstance(), + (protoReply) -> new DocumentIgnoredReply()) + .build(); + } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java index fa53a3f8568..73ce4dbbc68 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketReply.java @@ -9,6 +9,11 @@ public class StatBucketReply extends DocumentReply { super(DocumentProtocol.REPLY_STATBUCKET); } + public StatBucketReply(String results) { + super(DocumentProtocol.REPLY_STATBUCKET); + this.results = results; + } + public String getResults() { return results; } diff --git a/documentapi/src/protobuf/docapi_common.proto b/documentapi/src/protobuf/docapi_common.proto new file mode 100644 index 00000000000..5f4cfeb299c --- /dev/null +++ b/documentapi/src/protobuf/docapi_common.proto @@ -0,0 +1,50 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +package documentapi.protobuf; + +option cc_enable_arenas = true; +option java_package = "ai.vespa.documentapi.protobuf"; + +message BucketSpace { + string name = 1; +} + +message BucketId { + fixed64 raw_id = 1; +} + +message Document { + bytes payload = 1; +} + +message DocumentId { + string id = 1; +} + +message FieldSet { + string spec = 1; +} + +message GlobalId { + // Shall always be 12 bytes (96 bits) + bytes raw_gid = 1; +} + +message DocumentSelection { + string selection = 1; +} + +message ClusterState { + string state_string = 1; +} + +// Polymorphic response type shared by other responses +message WrongDistributionResponse { + ClusterState cluster_state = 1; +} + +// Polymorphic response type shared by other responses +message DocumentIgnoredResponse { + // empty +} diff --git a/documentapi/src/protobuf/docapi_feed.proto b/documentapi/src/protobuf/docapi_feed.proto new file mode 100644 index 00000000000..8d15fd9a536 --- /dev/null +++ b/documentapi/src/protobuf/docapi_feed.proto @@ -0,0 +1,71 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +package documentapi.protobuf; + +option cc_enable_arenas = true; +option java_package = "ai.vespa.documentapi.protobuf"; + +import "docapi_common.proto"; + +message TestAndSetCondition { + string selection = 1; +} + +message DocumentUpdate { + bytes payload = 1; +} + +message GetDocumentRequest { + DocumentId document_id = 1; + FieldSet field_set = 2; +} + +message GetDocumentResponse { + Document document = 1; + uint64 last_modified = 2; +} + +message PutDocumentRequest { + // Note: document contains embedded document ID + Document document = 1; + TestAndSetCondition condition = 2; + bool create_if_missing = 3; + uint64 force_assign_timestamp = 4; +} + +message PutDocumentResponse { + uint64 modification_timestamp = 1; +} + +message UpdateDocumentRequest { + // Note: update contains embedded document ID + DocumentUpdate update = 1; + TestAndSetCondition condition = 2; + uint64 expected_old_timestamp = 3; + uint64 force_assign_timestamp = 4; +} + +message UpdateDocumentResponse { + bool was_found = 1; + uint64 modification_timestamp = 2; +} + +message RemoveDocumentRequest { + DocumentId document_id = 1; + TestAndSetCondition condition = 2; +} + +message RemoveDocumentResponse { + bool was_found = 1; + uint64 modification_timestamp = 2; +} + +message RemoveLocationRequest { + DocumentSelection selection = 1; + BucketSpace bucket_space = 2; +} + +message RemoveLocationResponse { + // empty +} diff --git a/documentapi/src/protobuf/docapi_inspect.proto b/documentapi/src/protobuf/docapi_inspect.proto new file mode 100644 index 00000000000..efdc8062e0a --- /dev/null +++ b/documentapi/src/protobuf/docapi_inspect.proto @@ -0,0 +1,48 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +package documentapi.protobuf; + +option cc_enable_arenas = true; +option java_package = "ai.vespa.documentapi.protobuf"; + +import "docapi_common.proto"; + +message GetBucketListRequest { + BucketId bucket_id = 1; + BucketSpace bucket_space = 2; +} + +message BucketInformation { + BucketId bucket_id = 1; + string info = 2; +} + +message GetBucketListResponse { + repeated BucketInformation bucket_info = 1; +} + +message GetBucketStateRequest { + BucketId bucket_id = 1; +} + +message DocumentState { + DocumentId document_id = 1; + GlobalId global_id = 2; + uint64 timestamp = 3; + bool is_tombstone = 4; +} + +message GetBucketStateResponse { + repeated DocumentState states = 1; +} + +message StatBucketRequest { + BucketId bucket_id = 1; + DocumentSelection selection = 2; + BucketSpace bucket_space = 3; +} + +message StatBucketResponse { + string results = 1; +} diff --git a/documentapi/src/protobuf/docapi_visiting.proto b/documentapi/src/protobuf/docapi_visiting.proto new file mode 100644 index 00000000000..ecf71ddab55 --- /dev/null +++ b/documentapi/src/protobuf/docapi_visiting.proto @@ -0,0 +1,115 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +package documentapi.protobuf; + +option cc_enable_arenas = true; +option java_package = "ai.vespa.documentapi.protobuf"; + +import "docapi_common.proto"; + +message VisitorParameter { + string key = 1; + bytes value = 2; +} + +message CreateVisitorRequest { + string visitor_library_name = 1; + string instance_id = 2; + string control_destination = 3; + string data_destination = 4; + DocumentSelection selection = 5; + uint32 max_pending_reply_count = 6; + BucketSpace bucket_space = 7; + repeated BucketId buckets = 8; + uint64 from_timestamp = 9; + uint64 to_timestamp = 10; + bool visit_tombstones = 11; + FieldSet field_set = 12; + bool visit_inconsistent_buckets = 13; + uint32 max_buckets_per_visitor = 14; + repeated VisitorParameter parameters = 15; +} + +message VisitorStatistics { + uint32 buckets_visited = 1; + uint64 documents_visited = 2; + uint64 bytes_visited = 3; + uint64 documents_returned = 4; + uint64 bytes_returned = 5; +} + +message CreateVisitorResponse { + BucketId last_bucket = 1; + VisitorStatistics statistics = 2; +} + +message DestroyVisitorRequest { + string instance_id = 1; +} + +message DestroyVisitorResponse { + // empty +} + +message VisitorInfoRequest { + repeated BucketId finished_buckets = 1; + string error_message = 2; +} + +message VisitorInfoResponse { + // empty +} + +message MapVisitorRequest { + repeated VisitorParameter data = 1; +} + +message MapVisitorResponse { + // empty +} + +message SearchResult { + bytes payload = 1; +} + +message DocumentSummary { + bytes payload = 1; +} + +// We consider streaming search query-related messages to be part of the visiting family +message QueryResultRequest { + SearchResult search_result = 1; + DocumentSummary document_summary = 2; +} + +message QueryResultResponse { + // empty +} + +// TODO deprecate, only used by "recovery visitor" (?!) +message DocumentListRequest { + message Entry { + Document document = 1; + uint64 timestamp = 2; + bool is_tombstone = 3; + } + + BucketId bucket_id = 1; + repeated Entry entries = 2; +} + +// TODO deprecate +message DocumentListResponse { + // TODO +} + +// TODO deprecate, not sent by backend +message EmptyBucketsRequest { + repeated BucketId bucket_ids = 1; +} + +// TODO deprecate, not sent by backend +message EmptyBucketsResponse { + // empty +} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java index 81904837632..42f200a0b6b 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/Messages60TestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages60TestCase.java @@ -1,5 +1,5 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol.test; +package com.yahoo.documentapi.messagebus.protocol; import com.yahoo.component.Version; import com.yahoo.document.BucketId; @@ -12,35 +12,6 @@ import com.yahoo.document.GlobalId; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.fieldpathupdate.RemoveFieldPathUpdate; import com.yahoo.document.idstring.IdString; -import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage; -import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply; -import com.yahoo.documentapi.messagebus.protocol.DestroyVisitorMessage; -import com.yahoo.documentapi.messagebus.protocol.DocumentIgnoredReply; -import com.yahoo.documentapi.messagebus.protocol.DocumentListMessage; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.DocumentReply; -import com.yahoo.documentapi.messagebus.protocol.DocumentState; -import com.yahoo.documentapi.messagebus.protocol.EmptyBucketsMessage; -import com.yahoo.documentapi.messagebus.protocol.GetBucketListMessage; -import com.yahoo.documentapi.messagebus.protocol.GetBucketListReply; -import com.yahoo.documentapi.messagebus.protocol.GetBucketStateMessage; -import com.yahoo.documentapi.messagebus.protocol.GetBucketStateReply; -import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.GetDocumentReply; -import com.yahoo.documentapi.messagebus.protocol.MapVisitorMessage; -import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.QueryResultMessage; -import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentReply; -import com.yahoo.documentapi.messagebus.protocol.RemoveLocationMessage; -import com.yahoo.documentapi.messagebus.protocol.StatBucketMessage; -import com.yahoo.documentapi.messagebus.protocol.StatBucketReply; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply; -import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage; -import com.yahoo.documentapi.messagebus.protocol.VisitorReply; -import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply; -import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply; import com.yahoo.messagebus.Routable; import com.yahoo.text.Utf8; import com.yahoo.vdslib.SearchResult; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java new file mode 100644 index 00000000000..f2c039af6c0 --- /dev/null +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/Messages80TestCase.java @@ -0,0 +1,729 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.component.Version; +import com.yahoo.document.BucketId; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.GlobalId; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.document.fieldpathupdate.RemoveFieldPathUpdate; +import com.yahoo.document.idstring.IdString; +import com.yahoo.messagebus.Routable; +import com.yahoo.text.Utf8; +import com.yahoo.vdslib.SearchResult; + +import java.util.ArrayList; +import java.util.Map; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class Messages80TestCase extends MessagesTestBase { + + @Override + protected void registerTests(Map<Integer, RunnableTest> out) { + out.put(DocumentProtocol.MESSAGE_CREATEVISITOR, new CreateVisitorMessageTest()); + out.put(DocumentProtocol.MESSAGE_DESTROYVISITOR, new DestroyVisitorMessageTest()); + out.put(DocumentProtocol.MESSAGE_DOCUMENTLIST, new DocumentListMessageTest()); + out.put(DocumentProtocol.MESSAGE_EMPTYBUCKETS, new EmptyBucketsMessageTest()); + out.put(DocumentProtocol.MESSAGE_GETBUCKETLIST, new GetBucketListMessageTest()); + out.put(DocumentProtocol.MESSAGE_GETBUCKETSTATE, new GetBucketStateMessageTest()); + out.put(DocumentProtocol.MESSAGE_GETDOCUMENT, new GetDocumentMessageTest()); + out.put(DocumentProtocol.MESSAGE_MAPVISITOR, new MapVisitorMessageTest()); + out.put(DocumentProtocol.MESSAGE_PUTDOCUMENT, new PutDocumentMessageTest()); + out.put(DocumentProtocol.MESSAGE_QUERYRESULT, new QueryResultMessageTest()); + out.put(DocumentProtocol.MESSAGE_REMOVEDOCUMENT, new RemoveDocumentMessageTest()); + out.put(DocumentProtocol.MESSAGE_REMOVELOCATION, new RemoveLocationMessageTest()); + out.put(DocumentProtocol.MESSAGE_STATBUCKET, new StatBucketMessageTest()); + out.put(DocumentProtocol.MESSAGE_UPDATEDOCUMENT, new UpdateDocumentMessageTest()); + out.put(DocumentProtocol.MESSAGE_VISITORINFO, new VisitorInfoMessageTest()); + out.put(DocumentProtocol.REPLY_CREATEVISITOR, new CreateVisitorReplyTest()); + out.put(DocumentProtocol.REPLY_DESTROYVISITOR, new DestroyVisitorReplyTest()); + out.put(DocumentProtocol.REPLY_DOCUMENTIGNORED, new DocumentIgnoredReplyTest()); + out.put(DocumentProtocol.REPLY_DOCUMENTLIST, new DocumentListReplyTest()); + out.put(DocumentProtocol.REPLY_EMPTYBUCKETS, new EmptyBucketsReplyTest()); + out.put(DocumentProtocol.REPLY_GETBUCKETLIST, new GetBucketListReplyTest()); + out.put(DocumentProtocol.REPLY_GETBUCKETSTATE, new GetBucketStateReplyTest()); + out.put(DocumentProtocol.REPLY_GETDOCUMENT, new GetDocumentReplyTest()); + out.put(DocumentProtocol.REPLY_MAPVISITOR, new MapVisitorReplyTest()); + out.put(DocumentProtocol.REPLY_PUTDOCUMENT, new PutDocumentReplyTest()); + out.put(DocumentProtocol.REPLY_QUERYRESULT, new QueryResultReplyTest()); + out.put(DocumentProtocol.REPLY_REMOVEDOCUMENT, new RemoveDocumentReplyTest()); + out.put(DocumentProtocol.REPLY_REMOVELOCATION, new RemoveLocationReplyTest()); + out.put(DocumentProtocol.REPLY_STATBUCKET, new StatBucketReplyTest()); + out.put(DocumentProtocol.REPLY_UPDATEDOCUMENT, new UpdateDocumentReplyTest()); + out.put(DocumentProtocol.REPLY_VISITORINFO, new VisitorInfoReplyTest()); + out.put(DocumentProtocol.REPLY_WRONGDISTRIBUTION, new WrongDistributionReplyTest()); + } + + @Override + protected Version version() { + return new Version(8, 305); + } + + @Override + protected boolean shouldTestCoverage() { + return true; + } + + private static void forEachLanguage(Consumer<Language> fun) { + for (var lang : MessagesTestBase.LANGUAGES) { + fun.accept(lang); + } + } + + class GetDocumentMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new GetDocumentMessage(new DocumentId("id:ns:testdoc::"), "foo bar"); + serialize("GetDocumentMessage", msg); + forEachLanguage((lang) -> { + var msg2 = (GetDocumentMessage)deserialize("GetDocumentMessage", DocumentProtocol.MESSAGE_GETDOCUMENT, lang); + assertEquals("id:ns:testdoc::", msg2.getDocumentId().toString()); + assertEquals("foo bar", msg2.getFieldSet()); + }); + } + } + + class GetDocumentReplyTest implements RunnableTest { + @Override + public void run() { + testDocumentReturnedCase(); + testEmptyCase(); + } + + void testDocumentReturnedCase() { + var reply = new GetDocumentReply(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::")); + reply.setLastModified(1234567L); + serialize("GetDocumentReply", reply); + forEachLanguage((lang) -> { + var reply2 = (GetDocumentReply)deserialize("GetDocumentReply", DocumentProtocol.REPLY_GETDOCUMENT, lang); + assertEquals(1234567L, reply2.getLastModified()); + var doc = reply2.getDocument(); + assertNotNull(doc); + assertEquals("testdoc", doc.getDataType().getName()); + assertEquals("id:ns:testdoc::", doc.getId().toString()); + assertNotNull(doc.getLastModified()); + assertEquals(1234567L, doc.getLastModified().longValue()); + }); + } + + void testEmptyCase() { + var reply = new GetDocumentReply(null); + serialize("GetDocumentReply-empty", reply); + forEachLanguage((lang) -> { + var reply2 = (GetDocumentReply)deserialize("GetDocumentReply-empty", DocumentProtocol.REPLY_GETDOCUMENT, lang); + assertEquals(0L, reply2.getLastModified()); + assertNull(reply2.getDocument()); + }); + } + } + + private static final String CONDITION_STRING = "There's just one condition"; + + class PutDocumentMessageTest implements RunnableTest { + + void verifyCreateIfNonExistentFlag() { + var msg = new PutDocumentMessage(new DocumentPut(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::"))); + msg.setCreateIfNonExistent(true); + serialize("PutDocumentMessage-create", msg); + forEachLanguage((lang) -> { + var decoded = (PutDocumentMessage)deserialize("PutDocumentMessage-create", DocumentProtocol.MESSAGE_PUTDOCUMENT, lang); + assertTrue(decoded.getCreateIfNonExistent()); + assertEquals(decoded.getDocumentPut(), decoded.getDocumentPut()); + }); + } + + @Override + public void run() { + var msg = new PutDocumentMessage(new DocumentPut(new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:ns:testdoc::"))); + msg.setTimestamp(666); + msg.setCondition(new TestAndSetCondition(CONDITION_STRING)); + serialize("PutDocumentMessage", msg); + + forEachLanguage((lang) -> { + var deserializedMsg = (PutDocumentMessage)deserialize("PutDocumentMessage", DocumentProtocol.MESSAGE_PUTDOCUMENT, lang); + var deserializedDoc = deserializedMsg.getDocumentPut().getDocument(); + assertNotNull(deserializedDoc); + assertEquals(msg.getDocumentPut().getDocument().getDataType().getName(), deserializedDoc.getDataType().getName()); + assertEquals(msg.getDocumentPut().getDocument().getId().toString(), deserializedDoc.getId().toString()); + assertEquals(msg.getTimestamp(), deserializedMsg.getTimestamp()); + assertEquals(msg.getCondition().getSelection(), deserializedMsg.getCondition().getSelection()); + assertFalse(deserializedMsg.getCreateIfNonExistent()); + }); + verifyCreateIfNonExistentFlag(); + } + } + + class PutDocumentReplyTest implements RunnableTest { + @Override + public void run() { + var reply = new WriteDocumentReply(DocumentProtocol.REPLY_PUTDOCUMENT); + reply.setHighestModificationTimestamp(30); + serialize("PutDocumentReply", reply); + forEachLanguage((lang) -> { + var obj = (WriteDocumentReply)deserialize("PutDocumentReply", DocumentProtocol.REPLY_PUTDOCUMENT, lang); + assertEquals(30, obj.getHighestModificationTimestamp()); + }); + } + } + + class UpdateDocumentMessageTest implements RunnableTest { + @Override + public void run() { + var docType = protocol.getDocumentTypeManager().getDocumentType("testdoc"); + var update = new DocumentUpdate(docType, new DocumentId("id:ns:testdoc::")); + update.addFieldPathUpdate(new RemoveFieldPathUpdate(docType, "intfield", "testdoc.intfield > 0")); + + var msg = new UpdateDocumentMessage(update); + msg.setNewTimestamp(777); + msg.setOldTimestamp(666); + msg.setCondition(new TestAndSetCondition(CONDITION_STRING)); + + serialize("UpdateDocumentMessage", msg); + + forEachLanguage((lang) -> { + var deserializedMsg = (UpdateDocumentMessage)deserialize("UpdateDocumentMessage", DocumentProtocol.MESSAGE_UPDATEDOCUMENT, lang); + assertEquals(msg.getDocumentUpdate(), deserializedMsg.getDocumentUpdate()); + assertEquals(msg.getNewTimestamp(), deserializedMsg.getNewTimestamp()); + assertEquals(msg.getOldTimestamp(), deserializedMsg.getOldTimestamp()); + assertEquals(msg.getCondition().getSelection(), deserializedMsg.getCondition().getSelection()); + }); + } + } + + class UpdateDocumentReplyTest implements RunnableTest { + @Override + public void run() { + var reply = new UpdateDocumentReply(); + reply.setHighestModificationTimestamp(30); + reply.setWasFound(true); + serialize("UpdateDocumentReply", reply); + forEachLanguage((lang) -> { + var obj = (UpdateDocumentReply)deserialize("UpdateDocumentReply", DocumentProtocol.REPLY_UPDATEDOCUMENT, lang); + assertNotNull(obj); + assertEquals(30, reply.getHighestModificationTimestamp()); + assertTrue(obj.wasFound()); + }); + } + } + + class RemoveDocumentMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new RemoveDocumentMessage(new DocumentId("id:ns:testdoc::")); + msg.setCondition(new TestAndSetCondition(CONDITION_STRING)); + serialize("RemoveDocumentMessage", msg); + forEachLanguage((lang) -> { + var deserializedMsg = (RemoveDocumentMessage)deserialize("RemoveDocumentMessage", DocumentProtocol.MESSAGE_REMOVEDOCUMENT, lang); + assertEquals(msg.getDocumentId().toString(), deserializedMsg.getDocumentId().toString()); + assertEquals(msg.getCondition(), deserializedMsg.getCondition()); + }); + } + } + + class RemoveDocumentReplyTest implements RunnableTest { + @Override + public void run() { + var reply = new RemoveDocumentReply(); + reply.setHighestModificationTimestamp(30); + reply.setWasFound(true); + serialize("RemoveDocumentReply", reply); + forEachLanguage((lang) -> { + var obj = (RemoveDocumentReply)deserialize("RemoveDocumentReply", DocumentProtocol.REPLY_REMOVEDOCUMENT, lang); + assertNotNull(obj); + assertEquals(30, obj.getHighestModificationTimestamp()); + assertTrue(obj.wasFound()); + }); + } + } + + class RemoveLocationMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new RemoveLocationMessage("id.group == \"mygroup\"", "bjarne"); + serialize("RemoveLocationMessage", msg); + forEachLanguage((lang) -> { + var msg2 = (RemoveLocationMessage)deserialize("RemoveLocationMessage", DocumentProtocol.MESSAGE_REMOVELOCATION, lang); + assertEquals("id.group == \"mygroup\"", msg2.getDocumentSelection()); + assertEquals("bjarne", msg2.getBucketSpace()); + }); + } + } + + class RemoveLocationReplyTest implements RunnableTest { + @Override + public void run() { + testDocumentReply("RemoveLocationReply", DocumentProtocol.REPLY_REMOVELOCATION); + } + } + + class CreateVisitorMessageTest implements RunnableTest { + private static final String BUCKET_SPACE = "bjarne"; + + @Override + public void run() { + var msg = new CreateVisitorMessage("SomeLibrary", "myvisitor", "newyork", "london"); + msg.setDocumentSelection("true and false or true"); + msg.getParameters().put("myvar", Utf8.toBytes("somevalue")); + msg.getParameters().put("anothervar", Utf8.toBytes("34")); + msg.getBuckets().add(new BucketId(16, 1234)); + msg.setVisitRemoves(true); + msg.setVisitInconsistentBuckets(true); + msg.setFieldSet("foo bar"); + msg.setMaxBucketsPerVisitor(2); + msg.setBucketSpace(BUCKET_SPACE); + msg.setMaxPendingReplyCount(12); + + serialize("CreateVisitorMessage", msg); + + forEachLanguage((lang) -> { + var msg2 = (CreateVisitorMessage)deserialize("CreateVisitorMessage", DocumentProtocol.MESSAGE_CREATEVISITOR, lang); + assertEquals("SomeLibrary", msg2.getLibraryName()); + assertEquals("myvisitor", msg2.getInstanceId()); + assertEquals("newyork", msg2.getControlDestination()); + assertEquals("london", msg2.getDataDestination()); + assertEquals("true and false or true", msg2.getDocumentSelection()); + assertEquals(12, msg2.getMaxPendingReplyCount()); + assertTrue(msg2.getVisitRemoves()); + assertEquals("foo bar", msg2.getFieldSet()); + assertTrue(msg2.getVisitInconsistentBuckets()); + assertEquals(1, msg2.getBuckets().size()); + assertEquals(new BucketId(16, 1234), msg2.getBuckets().iterator().next()); + assertEquals(2, msg2.getParameters().size()); + assertEquals("somevalue", Utf8.toString(msg2.getParameters().get("myvar"))); + assertEquals("34", Utf8.toString(msg2.getParameters().get("anothervar"))); + assertEquals(2, msg2.getMaxBucketsPerVisitor()); + assertEquals(BUCKET_SPACE, msg2.getBucketSpace()); + }); + } + } + + class CreateVisitorReplyTest implements RunnableTest { + @Override + public void run() { + var reply = new CreateVisitorReply(DocumentProtocol.REPLY_CREATEVISITOR); + reply.setLastBucket(new BucketId(16, 123)); + reply.getVisitorStatistics().setBucketsVisited(3); + reply.getVisitorStatistics().setDocumentsVisited(1000); + reply.getVisitorStatistics().setBytesVisited(1024000); + reply.getVisitorStatistics().setDocumentsReturned(123); + reply.getVisitorStatistics().setBytesReturned(512000); + + serialize("CreateVisitorReply", reply); + + forEachLanguage((lang) -> { + var reply2 = (CreateVisitorReply)deserialize("CreateVisitorReply", DocumentProtocol.REPLY_CREATEVISITOR, lang); + assertNotNull(reply2); + assertEquals(new BucketId(16, 123), reply2.getLastBucket()); + assertEquals(3, reply2.getVisitorStatistics().getBucketsVisited()); + assertEquals(1000, reply2.getVisitorStatistics().getDocumentsVisited()); + assertEquals(1024000, reply2.getVisitorStatistics().getBytesVisited()); + assertEquals(123, reply2.getVisitorStatistics().getDocumentsReturned()); + assertEquals(512000, reply2.getVisitorStatistics().getBytesReturned()); + }); + } + } + + class DestroyVisitorMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new DestroyVisitorMessage("myvisitor"); + serialize("DestroyVisitorMessage", msg); + forEachLanguage((lang) -> { + var msg2 = (DestroyVisitorMessage)deserialize("DestroyVisitorMessage", DocumentProtocol.MESSAGE_DESTROYVISITOR, lang); + assertEquals("myvisitor", msg2.getInstanceId()); + }); + } + } + + class DestroyVisitorReplyTest implements RunnableTest { + @Override + public void run() { + testVisitorReply("DestroyVisitorReply", DocumentProtocol.REPLY_DESTROYVISITOR); + } + } + + class MapVisitorMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new MapVisitorMessage(); + msg.getData().put("foo", "3"); + msg.getData().put("bar", "5"); + serialize("MapVisitorMessage", msg); + forEachLanguage((lang) -> { + var msg2 = (MapVisitorMessage) deserialize("MapVisitorMessage", DocumentProtocol.MESSAGE_MAPVISITOR, lang); + assertEquals(2, msg2.getData().size()); + assertEquals("3", msg2.getData().get("foo")); + assertEquals("5", msg2.getData().get("bar")); + }); + } + } + + class MapVisitorReplyTest implements RunnableTest { + @Override + public void run() { + testVisitorReply("MapVisitorReply", DocumentProtocol.REPLY_MAPVISITOR); + } + } + + class QueryResultMessageTest implements RunnableTest { + @Override + public void run() throws Exception { + test_result_with_match_features(); + + Routable routable = deserialize("QueryResultMessage-1", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP); + assertTrue(routable instanceof QueryResultMessage); + + QueryResultMessage msg = (QueryResultMessage)routable; + assertEquals(0, msg.getResult().getHitCount()); + + routable = deserialize("QueryResultMessage-2", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP); + assertTrue(routable instanceof QueryResultMessage); + + msg = (QueryResultMessage)routable; + assertEquals(2, msg.getResult().getHitCount()); + com.yahoo.vdslib.SearchResult.Hit h = msg.getResult().getHit(0); + assertEquals(89.0, h.getRank(), 1E-6); + assertEquals("doc1", h.getDocId()); + assertFalse(h.getMatchFeatures().isPresent()); + h = msg.getResult().getHit(1); + assertEquals(109.0, h.getRank(), 1E-6); + assertEquals("doc17", h.getDocId()); + assertFalse(h.getMatchFeatures().isPresent()); + + routable = deserialize("QueryResultMessage-3", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP); + assertTrue(routable instanceof QueryResultMessage); + + msg = (QueryResultMessage)routable; + assertEquals(2, msg.getResult().getHitCount()); + h = msg.getResult().getHit(0); + assertEquals(109.0, h.getRank(), 1E-6); + assertEquals("doc17", h.getDocId()); + assertFalse(h.getMatchFeatures().isPresent()); + h = msg.getResult().getHit(1); + assertEquals(89.0, h.getRank(), 1E-6); + assertEquals("doc1", h.getDocId()); + assertFalse(h.getMatchFeatures().isPresent()); + + routable = deserialize("QueryResultMessage-4", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP); + assertTrue(routable instanceof QueryResultMessage); + + msg = (QueryResultMessage)routable; + assertEquals(3, msg.getResult().getHitCount()); + h = msg.getResult().getHit(0); + assertTrue(h instanceof SearchResult.HitWithSortBlob); + assertEquals(89.0, h.getRank(), 1E-6); + assertEquals("doc1", h.getDocId()); + byte[] b = ((SearchResult.HitWithSortBlob)h).getSortBlob(); + assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '2' }, b); + + h = msg.getResult().getHit(1); + assertTrue(h instanceof SearchResult.HitWithSortBlob); + assertEquals(109.0, h.getRank(), 1E-6); + assertEquals("doc17", h.getDocId()); + b = ((SearchResult.HitWithSortBlob)h).getSortBlob(); + assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '1' }, b); + + h = msg.getResult().getHit(2); + assertTrue(h instanceof SearchResult.HitWithSortBlob); + assertEquals(90.0, h.getRank(), 1E-6); + assertEquals("doc18", h.getDocId()); + b = ((SearchResult.HitWithSortBlob)h).getSortBlob(); + assertEqualsData(new byte[] { 's', 'o', 'r', 't', 'd', 'a', 't', 'a', '3' }, b); + } + + void assertEqualsData(byte[] exp, byte[] act) { + assertEquals(exp.length, act.length); + for (int i = 0; i < exp.length; ++i) { + assertEquals(exp[i], act[i]); + } + } + + void test_result_with_match_features() { + Routable routable = deserialize("QueryResultMessage-6", DocumentProtocol.MESSAGE_QUERYRESULT, Language.CPP); + assertTrue(routable instanceof QueryResultMessage); + + var msg = (QueryResultMessage)routable; + assertEquals(2, msg.getResult().getHitCount()); + + var h = msg.getResult().getHit(0); + assertTrue(h instanceof SearchResult.Hit); + assertEquals(7.0, h.getRank(), 1E-6); + assertEquals("doc2", h.getDocId()); + assertTrue(h.getMatchFeatures().isPresent()); + var mf = h.getMatchFeatures().get(); + assertEquals(12.0, mf.field("foo").asDouble(), 1E-6); + assertEqualsData(new byte[] { 'T', 'h', 'e', 'r', 'e' }, mf.field("bar").asData()); + + h = msg.getResult().getHit(1); + assertTrue(h instanceof SearchResult.Hit); + assertEquals(5.0, h.getRank(), 1E-6); + assertEquals("doc1", h.getDocId()); + assertTrue(h.getMatchFeatures().isPresent()); + mf = h.getMatchFeatures().get(); + assertEquals(1.0, mf.field("foo").asDouble(), 1E-6); + assertEqualsData(new byte[] { 'H', 'i' }, mf.field("bar").asData()); + } + } + + class QueryResultReplyTest implements RunnableTest { + @Override + public void run() { + testVisitorReply("QueryResultReply", DocumentProtocol.REPLY_QUERYRESULT); + } + } + + class VisitorInfoMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new VisitorInfoMessage(); + msg.getFinishedBuckets().add(new BucketId(16, 1)); + msg.getFinishedBuckets().add(new BucketId(16, 2)); + msg.getFinishedBuckets().add(new BucketId(16, 4)); + msg.setErrorMessage("error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6"); + + serialize("VisitorInfoMessage", msg); + + forEachLanguage((lang) -> { + var msg2 = (VisitorInfoMessage)deserialize("VisitorInfoMessage", DocumentProtocol.MESSAGE_VISITORINFO, lang); + assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 1))); + assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 2))); + assertTrue(msg2.getFinishedBuckets().contains(new BucketId(16, 4))); + assertEquals("error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6", msg2.getErrorMessage()); + }); + } + } + + class VisitorInfoReplyTest implements RunnableTest { + @Override + public void run() { + testVisitorReply("VisitorInfoReply", DocumentProtocol.REPLY_VISITORINFO); + } + } + + class DocumentListMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new DocumentListMessage(); + msg.setBucketId(new BucketId(17, 1234)); + var doc = new Document(protocol.getDocumentTypeManager().getDocumentType("testdoc"), "id:scheme:testdoc:n=1234:1"); + msg.getDocuments().add(new DocumentListEntry(doc, 1234, true)); + + serialize("DocumentListMessage", msg); + forEachLanguage((lang) -> { + var msg2 = (DocumentListMessage) deserialize("DocumentListMessage", DocumentProtocol.MESSAGE_DOCUMENTLIST, lang); + assertEquals(new BucketId(17, 1234), msg2.getBucketId()); + assertEquals(1, msg2.getDocuments().size()); + var entry = msg2.getDocuments().get(0); + assertEquals("id:scheme:testdoc:n=1234:1", entry.getDocument().getId().toString()); + assertEquals(1234, entry.getTimestamp()); + assertTrue(entry.isRemoveEntry()); + }); + } + } + + class DocumentListReplyTest implements RunnableTest { + @Override + public void run() { + testVisitorReply("DocumentListReply", DocumentProtocol.REPLY_DOCUMENTLIST); + } + } + + class EmptyBucketsMessageTest implements RunnableTest { + @Override + public void run() { + var bids = new ArrayList<BucketId>(); + for (int i = 0; i < 13; ++i) { + bids.add(new BucketId(16, i)); + } + var ebm = new EmptyBucketsMessage(bids); + serialize("EmptyBucketsMessage", ebm); + forEachLanguage((lang) -> { + var ebm2 = (EmptyBucketsMessage)deserialize("EmptyBucketsMessage", DocumentProtocol.MESSAGE_EMPTYBUCKETS, lang); + assertEquals(13, ebm2.getBucketIds().size()); + for (int i = 0; i < 13; ++i) { + assertEquals(new BucketId(16, i), ebm2.getBucketIds().get(i)); + } + }); + } + } + + class EmptyBucketsReplyTest implements RunnableTest { + @Override + public void run() { + testVisitorReply("EmptyBucketsReply", DocumentProtocol.REPLY_EMPTYBUCKETS); + } + } + + class GetBucketListMessageTest implements RunnableTest { + private static final String BUCKET_SPACE = "beartato"; + + @Override + public void run() { + var msg = new GetBucketListMessage(new BucketId(16, 123)); + msg.setBucketSpace(BUCKET_SPACE); + serialize("GetBucketListMessage", msg); + forEachLanguage((lang) -> { + var msg2 = (GetBucketListMessage)deserialize("GetBucketListMessage", DocumentProtocol.MESSAGE_GETBUCKETLIST, lang); + assertEquals(new BucketId(16, 123), msg2.getBucketId()); + assertEquals(BUCKET_SPACE, msg2.getBucketSpace()); + }); + } + } + + class GetBucketListReplyTest implements RunnableTest { + @Override + public void run() { + var reply = new GetBucketListReply(); + reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(16, 123), "foo")); + reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(17, 1123), "bar")); + reply.getBuckets().add(new GetBucketListReply.BucketInfo(new BucketId(18, 11123), "zoink")); + + serialize("GetBucketListReply", reply); + + forEachLanguage((lang) -> { + var reply2 = (GetBucketListReply)deserialize("GetBucketListReply", DocumentProtocol.REPLY_GETBUCKETLIST, lang); + assertEquals(3, reply2.getBuckets().size()); + assertEquals(reply2.getBuckets().get(0), new GetBucketListReply.BucketInfo(new BucketId(16, 123), "foo")); + assertEquals(reply2.getBuckets().get(1), new GetBucketListReply.BucketInfo(new BucketId(17, 1123), "bar")); + assertEquals(reply2.getBuckets().get(2), new GetBucketListReply.BucketInfo(new BucketId(18, 11123), "zoink")); + }); + } + } + + class GetBucketStateMessageTest implements RunnableTest { + @Override + public void run() { + var msg = new GetBucketStateMessage(new BucketId(16, 666)); + serialize("GetBucketStateMessage", msg); + + forEachLanguage((lang) -> { + var msg2 = (GetBucketStateMessage)deserialize("GetBucketStateMessage", DocumentProtocol.MESSAGE_GETBUCKETSTATE, lang); + assertEquals(16, msg2.getBucketId().getUsedBits()); + assertEquals(4611686018427388570L, msg2.getBucketId().getId()); + }); + } + } + + class GetBucketStateReplyTest implements RunnableTest { + @Override + public void run() { + var foo = new GlobalId(IdString.createIdString("id:ns:testdoc::foo")); + var bar = new GlobalId(IdString.createIdString("id:ns:testdoc::bar")); + var baz = new DocumentId("id:ns:testdoc::baz"); + + var reply = new GetBucketStateReply(); + var state = new ArrayList<DocumentState>(3); + state.add(new DocumentState(foo, 777, false)); + state.add(new DocumentState(bar, 888, true)); + state.add(new DocumentState(baz, 999, false)); + reply.setBucketState(state); + + serialize("GetBucketStateReply", reply); + + forEachLanguage((lang) -> { + var reply2 = (GetBucketStateReply)deserialize("GetBucketStateReply", DocumentProtocol.REPLY_GETBUCKETSTATE, lang); + assertEquals(3, reply2.getBucketState().size()); + + assertEquals(777, reply2.getBucketState().get(0).getTimestamp()); + assertEquals(foo, reply2.getBucketState().get(0).getGid()); + assertFalse(reply2.getBucketState().get(0).hasDocId()); + assertFalse(reply2.getBucketState().get(0).isRemoveEntry()); + + assertEquals(888, reply2.getBucketState().get(1).getTimestamp()); + assertEquals(bar, reply2.getBucketState().get(1).getGid()); + assertFalse(reply2.getBucketState().get(1).hasDocId()); + assertTrue(reply2.getBucketState().get(1).isRemoveEntry()); + + assertEquals(999, reply2.getBucketState().get(2).getTimestamp()); + assertTrue(reply2.getBucketState().get(2).hasDocId()); + assertEquals(new GlobalId(baz.getGlobalId()), reply2.getBucketState().get(2).getGid()); + assertEquals(baz, reply2.getBucketState().get(2).getDocId()); + assertFalse(reply2.getBucketState().get(2).isRemoveEntry()); + }); + } + } + + class StatBucketMessageTest implements RunnableTest { + private static final String BUCKET_SPACE = "andrei"; + + @Override + public void run() { + var msg = new StatBucketMessage(new BucketId(16, 123), "id.user=123"); + msg.setBucketSpace(BUCKET_SPACE); + serialize("StatBucketMessage", msg); + forEachLanguage((lang) -> { + var msg2 = (StatBucketMessage)deserialize("StatBucketMessage", DocumentProtocol.MESSAGE_STATBUCKET, lang); + assertEquals(new BucketId(16, 123), msg2.getBucketId()); + assertEquals("id.user=123", msg2.getDocumentSelection()); + assertEquals(BUCKET_SPACE, msg2.getBucketSpace()); + }); + } + } + + class StatBucketReplyTest implements RunnableTest { + @Override + public void run() { + var msg = new StatBucketReply(); + msg.setResults("These are the votes of the Norwegian jury"); + serialize("StatBucketReply", msg); + forEachLanguage((lang) -> { + var msg2 = (StatBucketReply)deserialize("StatBucketReply", DocumentProtocol.REPLY_STATBUCKET, lang); + assertEquals("These are the votes of the Norwegian jury", msg2.getResults()); + }); + } + } + + class WrongDistributionReplyTest implements RunnableTest { + @Override + public void run() { + var reply = new WrongDistributionReply("distributor:3 storage:2"); + serialize("WrongDistributionReply", reply); + forEachLanguage((lang) -> { + var reply2 = (WrongDistributionReply)deserialize("WrongDistributionReply", DocumentProtocol.REPLY_WRONGDISTRIBUTION, lang); + assertEquals("distributor:3 storage:2", reply2.getSystemState()); + }); + } + } + + class DocumentIgnoredReplyTest implements RunnableTest { + @Override + public void run() { + var reply = new DocumentIgnoredReply(); + serialize("DocumentIgnoredReply", reply); + forEachLanguage((lang) -> { + var reply2 = (DocumentIgnoredReply)deserialize("DocumentIgnoredReply", DocumentProtocol.REPLY_DOCUMENTIGNORED, lang); + assertNotNull(reply2); + }); + } + } + + private void testDocumentReply(String filename, int type) { + var reply = new DocumentReply(type); + serialize(filename, reply); + + forEachLanguage((lang) -> { + var reply2 = (DocumentReply)deserialize(filename, type, lang); + assertNotNull(reply2); + }); + } + + private void testVisitorReply(String filename, int type) { + VisitorReply reply = new VisitorReply(type); + serialize(filename, reply); + + forEachLanguage((lang) -> { + var reply2 = (VisitorReply)deserialize(filename, type, lang); + assertNotNull(reply2); + }); + } + +} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java index f15b0fe3995..16938ab843a 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/MessagesTestBase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessagesTestBase.java @@ -1,10 +1,11 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol.test; +package com.yahoo.documentapi.messagebus.protocol; import com.yahoo.component.Version; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentTypeManagerConfigurer; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.test.TestFileUtil; import com.yahoo.messagebus.Routable; import org.junit.Test; @@ -20,7 +21,7 @@ import static org.junit.Assert.*; */ public abstract class MessagesTestBase { - protected enum Language { + public enum Language { JAVA, CPP } diff --git a/documentapi/src/tests/messages/CMakeLists.txt b/documentapi/src/tests/messages/CMakeLists.txt index dec61432e4b..3428c34786c 100644 --- a/documentapi/src/tests/messages/CMakeLists.txt +++ b/documentapi/src/tests/messages/CMakeLists.txt @@ -8,6 +8,16 @@ vespa_add_executable(documentapi_messages60_test_app TEST documentapi ) vespa_add_test(NAME documentapi_messages60_test_app COMMAND documentapi_messages60_test_app) + +vespa_add_executable(documentapi_messages80_test_app TEST + SOURCES + testbase.cpp + messages80test.cpp + DEPENDS + documentapi +) +vespa_add_test(NAME documentapi_messages80_test_app COMMAND documentapi_messages80_test_app) + vespa_add_executable(documentapi_error_codes_test_app_app TEST SOURCES error_codes_test.cpp diff --git a/documentapi/src/tests/messages/messages60test.cpp b/documentapi/src/tests/messages/messages60test.cpp index 99ecb3644a5..281e1123e54 100644 --- a/documentapi/src/tests/messages/messages60test.cpp +++ b/documentapi/src/tests/messages/messages60test.cpp @@ -751,7 +751,7 @@ Messages60Test::testVisitorInfoMessage() bool Messages60Test::testDestroyVisitorReply() { - return tryDocumentReply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR); + return tryVisitorReply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR); } bool @@ -922,23 +922,6 @@ Messages60Test::testRemoveLocationReply() //////////////////////////////////////////////////////////////////////////////// bool -Messages60Test::tryDocumentReply(const string &filename, uint32_t type) -{ - DocumentReply tmp(type); - - EXPECT_EQUAL((uint32_t)5, serialize(filename, tmp)); - - for (uint32_t lang = 0; lang < NUM_LANGUAGES; ++lang) { - mbus::Routable::UP obj = deserialize(filename, type, lang); - if (EXPECT_TRUE(obj)) { - DocumentReply *ref = dynamic_cast<DocumentReply*>(obj.get()); - EXPECT_TRUE(ref != NULL); - } - } - return true; -} - -bool Messages60Test::tryVisitorReply(const string &filename, uint32_t type) { VisitorReply tmp(type); diff --git a/documentapi/src/tests/messages/messages60test.h b/documentapi/src/tests/messages/messages60test.h index 88bc88097eb..d1060a83962 100644 --- a/documentapi/src/tests/messages/messages60test.h +++ b/documentapi/src/tests/messages/messages60test.h @@ -6,9 +6,9 @@ class Messages60Test : public TestBase { protected: - const vespalib::Version getVersion() const override { return vespalib::Version(6, 221); } + vespalib::Version getVersion() const override { return vespalib::Version(6, 221); } bool shouldTestCoverage() const override { return true; } - bool tryDocumentReply(const string &filename, uint32_t type); + bool tryVisitorReply(const string &filename, uint32_t type); static size_t serializedLength(const string & str) { return sizeof(int32_t) + str.size(); } @@ -23,7 +23,6 @@ public: bool testDocumentIgnoredReply(); bool testDocumentListMessage(); bool testDocumentListReply(); - bool testDocumentSummaryMessage(); bool testEmptyBucketsMessage(); bool testEmptyBucketsReply(); bool testGetBucketListMessage(); @@ -42,7 +41,6 @@ public: bool testRemoveDocumentReply(); bool testRemoveLocationMessage(); bool testRemoveLocationReply(); - bool testSearchResultMessage(); bool testStatBucketMessage(); bool testStatBucketReply(); bool testUpdateDocumentMessage(); diff --git a/documentapi/src/tests/messages/messages80test.cpp b/documentapi/src/tests/messages/messages80test.cpp new file mode 100644 index 00000000000..9b97f332318 --- /dev/null +++ b/documentapi/src/tests/messages/messages80test.cpp @@ -0,0 +1,908 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "testbase.h" +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/document/datatype/documenttype.h> +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/select/parser.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/document/update/fieldpathupdates.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/vespalib/test/insertion_operators.h> +#include <vespa/vespalib/util/featureset.h> +#include <array> + +using document::DataType; +using document::DocumentTypeRepo; +using vespalib::FeatureValues; + +// TODO rewrite to GTest! +class Messages80Test : public TestBase { +protected: + vespalib::Version getVersion() const override { + // Must be as high--or higher--than the v8 protocol version specified in documentprocotol.cpp + // (and equal to its corresponding value in the Java implementation). + return {8, 305}; + } + bool shouldTestCoverage() const override { return true; } + + bool try_visitor_reply(const string& filename, uint32_t type); + + static constexpr std::array<uint32_t, 2> languages() noexcept { + return {TestBase::LANG_CPP, TestBase::LANG_JAVA}; + } + +public: + Messages80Test(); + ~Messages80Test() override = default; + + bool test_create_visitor_message(); + bool test_create_visitor_reply(); + bool test_destroy_visitor_message(); + bool test_destroy_visitor_reply(); + bool test_document_ignored_reply(); + bool test_document_list_message(); + bool test_document_list_reply(); + bool test_empty_buckets_message(); + bool test_empty_buckets_reply(); + bool test_get_bucket_list_message(); + bool test_get_bucket_list_reply(); + bool test_get_bucket_state_message(); + bool test_get_bucket_state_reply(); + bool test_get_document_message(); + bool test_get_document_reply(); + bool test_map_visitor_message(); + bool test_map_visitor_reply(); + bool test_put_document_message(); + bool test_put_document_reply(); + bool test_query_result_message(); + bool test_query_result_reply(); + bool test_remove_document_message(); + bool test_remove_document_reply(); + bool test_remove_location_message(); + bool test_remove_location_reply(); + bool test_stat_bucket_message(); + bool test_stat_bucket_reply(); + bool test_update_document_message(); + bool test_update_document_reply(); + bool test_visitor_info_message(); + bool test_visitor_info_reply(); + bool test_wrong_distribution_reply(); + + void do_test_get_reply_with_doc(); + void do_test_empty_get_reply(); +}; + +namespace { + +std::vector<char> doc1_mf_data{'H', 'i'}; +std::vector<char> doc2_mf_data{'T', 'h', 'e', 'r', 'e'}; + +} + +Messages80Test::Messages80Test() { + putTest(DocumentProtocol::MESSAGE_CREATEVISITOR, TEST_METHOD(Messages80Test::test_create_visitor_message)); + putTest(DocumentProtocol::MESSAGE_DESTROYVISITOR, TEST_METHOD(Messages80Test::test_destroy_visitor_message)); + putTest(DocumentProtocol::MESSAGE_DOCUMENTLIST, TEST_METHOD(Messages80Test::test_document_list_message)); + putTest(DocumentProtocol::MESSAGE_EMPTYBUCKETS, TEST_METHOD(Messages80Test::test_empty_buckets_message)); + putTest(DocumentProtocol::MESSAGE_GETBUCKETLIST, TEST_METHOD(Messages80Test::test_get_bucket_list_message)); + putTest(DocumentProtocol::MESSAGE_GETBUCKETSTATE, TEST_METHOD(Messages80Test::test_get_bucket_state_message)); + putTest(DocumentProtocol::MESSAGE_GETDOCUMENT, TEST_METHOD(Messages80Test::test_get_document_message)); + putTest(DocumentProtocol::MESSAGE_MAPVISITOR, TEST_METHOD(Messages80Test::test_map_visitor_message)); + putTest(DocumentProtocol::MESSAGE_PUTDOCUMENT, TEST_METHOD(Messages80Test::test_put_document_message)); + putTest(DocumentProtocol::MESSAGE_QUERYRESULT, TEST_METHOD(Messages80Test::test_query_result_message)); + putTest(DocumentProtocol::MESSAGE_REMOVEDOCUMENT, TEST_METHOD(Messages80Test::test_remove_document_message)); + putTest(DocumentProtocol::MESSAGE_REMOVELOCATION, TEST_METHOD(Messages80Test::test_remove_location_message)); + putTest(DocumentProtocol::MESSAGE_STATBUCKET, TEST_METHOD(Messages80Test::test_stat_bucket_message)); + putTest(DocumentProtocol::MESSAGE_UPDATEDOCUMENT, TEST_METHOD(Messages80Test::test_update_document_message)); + putTest(DocumentProtocol::MESSAGE_VISITORINFO, TEST_METHOD(Messages80Test::test_visitor_info_message)); + + putTest(DocumentProtocol::REPLY_CREATEVISITOR, TEST_METHOD(Messages80Test::test_create_visitor_reply)); + putTest(DocumentProtocol::REPLY_DESTROYVISITOR, TEST_METHOD(Messages80Test::test_destroy_visitor_reply)); + putTest(DocumentProtocol::REPLY_DOCUMENTIGNORED, TEST_METHOD(Messages80Test::test_document_ignored_reply)); + putTest(DocumentProtocol::REPLY_DOCUMENTLIST, TEST_METHOD(Messages80Test::test_document_list_reply)); + putTest(DocumentProtocol::REPLY_EMPTYBUCKETS, TEST_METHOD(Messages80Test::test_empty_buckets_reply)); + putTest(DocumentProtocol::REPLY_GETBUCKETLIST, TEST_METHOD(Messages80Test::test_get_bucket_list_reply)); + putTest(DocumentProtocol::REPLY_GETBUCKETSTATE, TEST_METHOD(Messages80Test::test_get_bucket_state_reply)); + putTest(DocumentProtocol::REPLY_GETDOCUMENT, TEST_METHOD(Messages80Test::test_get_document_reply)); + putTest(DocumentProtocol::REPLY_MAPVISITOR, TEST_METHOD(Messages80Test::test_map_visitor_reply)); + putTest(DocumentProtocol::REPLY_PUTDOCUMENT, TEST_METHOD(Messages80Test::test_put_document_reply)); + putTest(DocumentProtocol::REPLY_QUERYRESULT, TEST_METHOD(Messages80Test::test_query_result_reply)); + putTest(DocumentProtocol::REPLY_REMOVEDOCUMENT, TEST_METHOD(Messages80Test::test_remove_document_reply)); + putTest(DocumentProtocol::REPLY_REMOVELOCATION, TEST_METHOD(Messages80Test::test_remove_location_reply)); + putTest(DocumentProtocol::REPLY_STATBUCKET, TEST_METHOD(Messages80Test::test_stat_bucket_reply)); + putTest(DocumentProtocol::REPLY_UPDATEDOCUMENT, TEST_METHOD(Messages80Test::test_update_document_reply)); + putTest(DocumentProtocol::REPLY_VISITORINFO, TEST_METHOD(Messages80Test::test_visitor_info_reply)); + putTest(DocumentProtocol::REPLY_WRONGDISTRIBUTION, TEST_METHOD(Messages80Test::test_wrong_distribution_reply)); +} + +namespace { + +document::Document::SP +createDoc(const DocumentTypeRepo& repo, const string& type_name, const string& id) { + return std::make_shared<document::Document>(repo, *repo.getDocumentType(type_name), document::DocumentId(id)); +} + +} + +bool Messages80Test::test_get_document_message() { + GetDocumentMessage tmp(document::DocumentId("id:ns:testdoc::"), "foo bar"); + EXPECT_EQUAL(280u, sizeof(GetDocumentMessage)); // FIXME doesn't belong here + serialize("GetDocumentMessage", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("GetDocumentMessage", DocumentProtocol::MESSAGE_GETDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<GetDocumentMessage&>(*obj); + EXPECT_EQUAL(ref.getDocumentId().toString(), "id:ns:testdoc::"); + EXPECT_EQUAL(ref.getFieldSet(), "foo bar"); + } + } + return true; +} + +void Messages80Test::do_test_get_reply_with_doc() { + auto doc = createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::"); + GetDocumentReply tmp(doc); + tmp.setLastModified(1234567); + + EXPECT_EQUAL(128u, sizeof(GetDocumentReply)); // FIXME doesn't belong here! + serialize("GetDocumentReply", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("GetDocumentReply", DocumentProtocol::REPLY_GETDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<GetDocumentReply&>(*obj); + EXPECT_EQUAL(ref.getLastModified(), 1234567ULL); // FIXME signed vs. unsigned... -_- + ASSERT_TRUE(ref.hasDocument()); + auto& doc2 = ref.getDocument(); + EXPECT_EQUAL(doc2.getType().getName(), "testdoc"); + EXPECT_EQUAL(doc2.getId().toString(), "id:ns:testdoc::"); + EXPECT_EQUAL(doc2.getLastModified(), 1234567LL); // FIXME signed vs. unsigned... -_- + } + } +} + +void Messages80Test::do_test_empty_get_reply() { + GetDocumentReply tmp; + serialize("GetDocumentReply-empty", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("GetDocumentReply-empty", DocumentProtocol::REPLY_GETDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<GetDocumentReply&>(*obj); + EXPECT_EQUAL(ref.getLastModified(), 0ULL); + EXPECT_FALSE(ref.hasDocument()); + } + } +} + +bool Messages80Test::test_get_document_reply() { + TEST_DO(do_test_get_reply_with_doc()); + TEST_DO(do_test_empty_get_reply()); + return true; +} + +bool Messages80Test::test_put_document_message() { + auto doc = createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::"); + PutDocumentMessage msg(doc); + + msg.setTimestamp(666); + msg.setCondition(TestAndSetCondition("There's just one condition")); + + // FIXME these don't belong here! + EXPECT_EQUAL(64u, sizeof(vespalib::string)); + EXPECT_EQUAL(sizeof(vespalib::string), sizeof(TestAndSetCondition)); + EXPECT_EQUAL(112u, sizeof(DocumentMessage)); + EXPECT_EQUAL(sizeof(TestAndSetCondition) + sizeof(DocumentMessage), sizeof(TestAndSetMessage)); + EXPECT_EQUAL(sizeof(TestAndSetMessage) + 32, sizeof(PutDocumentMessage)); + + serialize("PutDocumentMessage", msg); + + for (auto lang : languages()) { + auto routableUp = deserialize("PutDocumentMessage", DocumentProtocol::MESSAGE_PUTDOCUMENT, lang); + if (EXPECT_TRUE(routableUp)) { + auto& deserializedMsg = dynamic_cast<PutDocumentMessage &>(*routableUp); + + EXPECT_EQUAL(deserializedMsg.getDocument().getType().getName(), msg.getDocument().getType().getName()); + EXPECT_EQUAL(deserializedMsg.getDocument().getId().toString(), msg.getDocument().getId().toString()); + EXPECT_EQUAL(deserializedMsg.getTimestamp(), msg.getTimestamp()); + EXPECT_GREATER(deserializedMsg.getApproxSize(), 0u); + EXPECT_EQUAL(deserializedMsg.getCondition().getSelection(), msg.getCondition().getSelection()); + EXPECT_FALSE(deserializedMsg.get_create_if_non_existent()); + } + } + + //------------------------------------------------------------------------- + + PutDocumentMessage msg2(createDoc(getTypeRepo(), "testdoc", "id:ns:testdoc::")); + msg2.set_create_if_non_existent(true); + serialize("PutDocumentMessage-create", msg2); + for (auto lang : languages()) { + auto obj = deserialize("PutDocumentMessage-create", DocumentProtocol::MESSAGE_PUTDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& decoded = dynamic_cast<PutDocumentMessage&>(*obj); + EXPECT_TRUE(decoded.get_create_if_non_existent()); + } + } + return true; +} + +bool Messages80Test::test_put_document_reply() { + WriteDocumentReply reply(DocumentProtocol::REPLY_PUTDOCUMENT); + reply.setHighestModificationTimestamp(30); + + serialize("PutDocumentReply", reply); + EXPECT_EQUAL(sizeof(WriteDocumentReply), 112u); // FIXME doesn't belong here! + + for (auto lang : languages()) { + auto obj = deserialize("PutDocumentReply", DocumentProtocol::REPLY_PUTDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<WriteDocumentReply&>(*obj); + EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u); + } + } + return true; +} + +bool Messages80Test::test_update_document_message() { + const DocumentTypeRepo& repo = getTypeRepo(); + const document::DocumentType& docType = *repo.getDocumentType("testdoc"); + + auto doc_update = std::make_shared<document::DocumentUpdate>(repo, docType, document::DocumentId("id:ns:testdoc::")); + doc_update->addFieldPathUpdate(std::make_unique<document::RemoveFieldPathUpdate>("intfield", "testdoc.intfield > 0")); + + UpdateDocumentMessage msg(std::move(doc_update)); + msg.setOldTimestamp(666u); + msg.setNewTimestamp(777u); + msg.setCondition(TestAndSetCondition("There's just one condition")); + + EXPECT_EQUAL(sizeof(TestAndSetMessage) + 32, sizeof(UpdateDocumentMessage)); // FIXME doesn't belong here! + serialize("UpdateDocumentMessage", msg); + + for (auto lang : languages()) { + auto obj = deserialize("UpdateDocumentMessage", DocumentProtocol::MESSAGE_UPDATEDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& decoded = dynamic_cast<UpdateDocumentMessage&>(*obj); + EXPECT_EQUAL(decoded.getDocumentUpdate(), msg.getDocumentUpdate()); + EXPECT_EQUAL(decoded.getOldTimestamp(), msg.getOldTimestamp()); + EXPECT_EQUAL(decoded.getNewTimestamp(), msg.getNewTimestamp()); + EXPECT_GREATER(decoded.getApproxSize(), 0u); // Actual value depends on protobuf size + EXPECT_EQUAL(decoded.getCondition().getSelection(), msg.getCondition().getSelection()); + } + } + return true; +} + +bool Messages80Test::test_update_document_reply() { + UpdateDocumentReply reply; + reply.setWasFound(true); + reply.setHighestModificationTimestamp(30); + + serialize("UpdateDocumentReply", reply); + EXPECT_EQUAL(120u, sizeof(UpdateDocumentReply)); // FIXME doesn't belong here! + + for (auto lang : languages()) { + auto obj = deserialize("UpdateDocumentReply", DocumentProtocol::REPLY_UPDATEDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<UpdateDocumentReply&>(*obj); + EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u); + EXPECT_TRUE(ref.wasFound()); + } + } + return true; +} + +bool Messages80Test::test_remove_document_message() { + RemoveDocumentMessage msg(document::DocumentId("id:ns:testdoc::")); + msg.setCondition(TestAndSetCondition("There's just one condition")); + + EXPECT_EQUAL(sizeof(TestAndSetMessage) + 104, sizeof(RemoveDocumentMessage)); // FIXME doesn't belong here! + serialize("RemoveDocumentMessage", msg); + + for (auto lang : languages()) { + auto obj = deserialize("RemoveDocumentMessage", DocumentProtocol::MESSAGE_REMOVEDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<RemoveDocumentMessage &>(*obj); + EXPECT_EQUAL(ref.getDocumentId().toString(), "id:ns:testdoc::"); + EXPECT_EQUAL(ref.getCondition().getSelection(), msg.getCondition().getSelection()); + } + } + return true; +} + +bool Messages80Test::test_remove_document_reply() { + RemoveDocumentReply reply; + std::vector<uint64_t> ts; + reply.setWasFound(true); + reply.setHighestModificationTimestamp(30); + EXPECT_EQUAL(120u, sizeof(RemoveDocumentReply)); // FIXME doesn't belong here! + + serialize("RemoveDocumentReply", reply); + + for (auto lang : languages()) { + auto obj = deserialize("RemoveDocumentReply", DocumentProtocol::REPLY_REMOVEDOCUMENT, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<RemoveDocumentReply&>(*obj); + EXPECT_EQUAL(ref.getHighestModificationTimestamp(), 30u); + EXPECT_TRUE(ref.wasFound()); + } + } + return true; +} + +bool Messages80Test::test_remove_location_message() { + document::BucketIdFactory factory; + document::select::Parser parser(getTypeRepo(), factory); + RemoveLocationMessage msg(factory, parser, "id.group == \"mygroup\""); + msg.setBucketSpace("bjarne"); + serialize("RemoveLocationMessage", msg); + + for (auto lang : languages()) { + auto obj = deserialize("RemoveLocationMessage", DocumentProtocol::MESSAGE_REMOVELOCATION, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<RemoveLocationMessage&>(*obj); + EXPECT_EQUAL(ref.getDocumentSelection(), "id.group == \"mygroup\""); + EXPECT_EQUAL(ref.getBucketSpace(), "bjarne"); + } + } + return true; +} + +bool Messages80Test::test_remove_location_reply() { + DocumentReply tmp(DocumentProtocol::REPLY_REMOVELOCATION); + serialize("RemoveLocationReply", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("RemoveLocationReply", DocumentProtocol::REPLY_REMOVELOCATION, lang); + EXPECT_TRUE(obj); + } + return true; +} + +bool Messages80Test::test_create_visitor_message() { + CreateVisitorMessage tmp("SomeLibrary", "myvisitor", "newyork", "london"); + tmp.setDocumentSelection("true and false or true"); + tmp.getParameters().set("myvar", "somevalue"); + tmp.getParameters().set("anothervar", uint64_t(34)); + tmp.getBuckets().emplace_back(16, 1234); + tmp.setVisitRemoves(true); + tmp.setVisitInconsistentBuckets(true); + tmp.setFieldSet("foo bar"); + tmp.setMaxBucketsPerVisitor(2); + tmp.setMaximumPendingReplyCount(12); + tmp.setBucketSpace("bjarne"); + + serialize("CreateVisitorMessage", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("CreateVisitorMessage", DocumentProtocol::MESSAGE_CREATEVISITOR, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<CreateVisitorMessage&>(*obj); + + EXPECT_EQUAL(ref.getLibraryName(), "SomeLibrary"); + EXPECT_EQUAL(ref.getInstanceId(), "myvisitor"); + EXPECT_EQUAL(ref.getControlDestination(), "newyork"); + EXPECT_EQUAL(ref.getDataDestination(), "london"); + EXPECT_EQUAL(ref.getDocumentSelection(), "true and false or true"); + EXPECT_EQUAL(ref.getFieldSet(), "foo bar"); + EXPECT_EQUAL(ref.getMaximumPendingReplyCount(), uint32_t(12)); + EXPECT_TRUE(ref.visitRemoves()); + EXPECT_TRUE(ref.visitInconsistentBuckets()); + ASSERT_EQUAL(ref.getBuckets().size(), size_t(1)); + EXPECT_EQUAL(ref.getBuckets()[0], document::BucketId(16, 1234)); + EXPECT_EQUAL(ref.getParameters().get("myvar"), "somevalue"); + EXPECT_EQUAL(ref.getParameters().get("anothervar", uint64_t(1)), uint64_t(34)); + EXPECT_EQUAL(ref.getMaxBucketsPerVisitor(), uint32_t(2)); + EXPECT_EQUAL(ref.getBucketSpace(), "bjarne"); + } + } + return true; +} + +bool Messages80Test::test_create_visitor_reply() { + CreateVisitorReply reply(DocumentProtocol::REPLY_CREATEVISITOR); + reply.setLastBucket(document::BucketId(16, 123)); + vdslib::VisitorStatistics vs; + vs.setBucketsVisited(3); + vs.setDocumentsVisited(1000); + vs.setBytesVisited(1024000); + vs.setDocumentsReturned(123); + vs.setBytesReturned(512000); + reply.setVisitorStatistics(vs); + + serialize("CreateVisitorReply", reply); + + for (auto lang : languages()) { + auto obj = deserialize("CreateVisitorReply", DocumentProtocol::REPLY_CREATEVISITOR, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<CreateVisitorReply&>(*obj); + EXPECT_EQUAL(ref.getLastBucket(), document::BucketId(16, 123)); + EXPECT_EQUAL(ref.getVisitorStatistics().getBucketsVisited(), (uint32_t)3); + EXPECT_EQUAL(ref.getVisitorStatistics().getDocumentsVisited(), (uint64_t)1000); + EXPECT_EQUAL(ref.getVisitorStatistics().getBytesVisited(), (uint64_t)1024000); + EXPECT_EQUAL(ref.getVisitorStatistics().getDocumentsReturned(), (uint64_t)123); + EXPECT_EQUAL(ref.getVisitorStatistics().getBytesReturned(), (uint64_t)512000); + } + } + return true; +} + +bool Messages80Test::test_destroy_visitor_message() { + DestroyVisitorMessage tmp("myvisitor"); + serialize("DestroyVisitorMessage", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("DestroyVisitorMessage", DocumentProtocol::MESSAGE_DESTROYVISITOR, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<DestroyVisitorMessage&>(*obj); + EXPECT_EQUAL(ref.getInstanceId(), "myvisitor"); + } + } + return true; +} + +bool Messages80Test::test_destroy_visitor_reply() { + return try_visitor_reply("DestroyVisitorReply", DocumentProtocol::REPLY_DESTROYVISITOR); +} + +bool Messages80Test::test_map_visitor_message() { + MapVisitorMessage tmp; + tmp.getData().set("foo", 3); + tmp.getData().set("bar", 5); + + serialize("MapVisitorMessage", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("MapVisitorMessage", DocumentProtocol::MESSAGE_MAPVISITOR, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<MapVisitorMessage&>(*obj); + EXPECT_EQUAL(ref.getData().size(), 2u); + EXPECT_EQUAL(ref.getData().get("foo", 0), 3); + EXPECT_EQUAL(ref.getData().get("bar", 0), 5); + } + } + return true; +} + +bool Messages80Test::test_map_visitor_reply() { + return try_visitor_reply("MapVisitorReply", DocumentProtocol::REPLY_MAPVISITOR); +} + +bool Messages80Test::test_query_result_message() { + QueryResultMessage srm; + vdslib::SearchResult& sr(srm.getSearchResult()); + EXPECT_EQUAL(srm.getSequenceId(), 0u); + EXPECT_EQUAL(sr.getHitCount(), 0u); + EXPECT_EQUAL(sr.getAggregatorList().getSerializedSize(), 4u); + EXPECT_EQUAL(sr.getSerializedSize(), 20u); + EXPECT_EQUAL(srm.getApproxSize(), 28u); + + serialize("QueryResultMessage-1", srm); + + // Serialization is only implemented in C++ + { + auto routable = deserialize("QueryResultMessage-1", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP); + if (!EXPECT_TRUE(routable)) { + return false; + } + auto& dm = dynamic_cast<QueryResultMessage&>(*routable); + vdslib::SearchResult& dr = dm.getSearchResult(); + EXPECT_EQUAL(dm.getSequenceId(), size_t(0)); + EXPECT_EQUAL(dr.getHitCount(), size_t(0)); + } + + sr.addHit(0, "doc1", 89); + sr.addHit(1, "doc17", 109); + serialize("QueryResultMessage-2", srm); + + const char* doc_id; + vdslib::SearchResult::RankType rank; + + { + auto routable = deserialize("QueryResultMessage-2", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP); + if (!EXPECT_TRUE(routable)) { + return false; + } + auto& dm = dynamic_cast<QueryResultMessage&>(*routable); + auto& dr = dm.getSearchResult(); + EXPECT_EQUAL(dr.getHitCount(), size_t(2)); + dr.getHit(0, doc_id, rank); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89)); + EXPECT_EQUAL(strcmp("doc1", doc_id), 0); + dr.getHit(1, doc_id, rank); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109)); + EXPECT_EQUAL(strcmp("doc17", doc_id), 0); + } + + sr.sort(); + serialize("QueryResultMessage-3", srm); + + { + auto routable = deserialize("QueryResultMessage-3", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP); + if (!EXPECT_TRUE(routable)) { + return false; + } + auto& dm = dynamic_cast<QueryResultMessage&>(*routable); + auto& dr = dm.getSearchResult(); + EXPECT_EQUAL(dr.getHitCount(), size_t(2)); + dr.getHit(0, doc_id, rank); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109)); + EXPECT_EQUAL(strcmp("doc17", doc_id), 0); + dr.getHit(1, doc_id, rank); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89)); + EXPECT_EQUAL(strcmp("doc1", doc_id), 0); + } + + QueryResultMessage srm2; + vdslib::SearchResult& sr2(srm2.getSearchResult()); + sr2.addHit(0, "doc1", 89, "sortdata2", 9); + sr2.addHit(1, "doc17", 109, "sortdata1", 9); + sr2.addHit(2, "doc18", 90, "sortdata3", 9); + serialize("QueryResultMessage-4", srm2); + + { + auto routable = deserialize("QueryResultMessage-4", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP); + if (!EXPECT_TRUE(routable)) { + return false; + } + auto& dm = dynamic_cast<QueryResultMessage&>(*routable); + auto& dr = dm.getSearchResult(); + EXPECT_EQUAL(dr.getHitCount(), size_t(3)); + dr.getHit(0, doc_id, rank); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89)); + EXPECT_EQUAL(strcmp("doc1", doc_id), 0); + dr.getHit(1, doc_id, rank); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109)); + EXPECT_EQUAL(strcmp("doc17", doc_id), 0); + dr.getHit(2, doc_id, rank); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90)); + EXPECT_EQUAL(strcmp("doc18", doc_id), 0); + } + + sr2.sort(); + const void* buf; + size_t sz; + sr2.getHit(0, doc_id, rank); + sr2.getSortBlob(0, buf, sz); + EXPECT_EQUAL(sz, 9u); + EXPECT_EQUAL(memcmp("sortdata1", buf, sz), 0); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109)); + EXPECT_EQUAL(strcmp("doc17", doc_id), 0); + sr2.getHit(1, doc_id, rank); + sr2.getSortBlob(1, buf, sz); + EXPECT_EQUAL(sz, 9u); + EXPECT_EQUAL(memcmp("sortdata2", buf, sz), 0); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89)); + EXPECT_EQUAL(strcmp("doc1", doc_id), 0); + sr2.getHit(2, doc_id, rank); + sr2.getSortBlob(2, buf, sz); + EXPECT_EQUAL(sz, 9u); + EXPECT_EQUAL(memcmp("sortdata3", buf, sz), 0); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90)); + EXPECT_EQUAL(strcmp("doc18", doc_id), 0); + + serialize("QueryResultMessage-5", srm2); + { + auto routable = deserialize("QueryResultMessage-5", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP); + if (!EXPECT_TRUE(routable)) { + return false; + } + auto& dm = dynamic_cast<QueryResultMessage&>(*routable); + auto& dr = dm.getSearchResult(); + EXPECT_EQUAL(dr.getHitCount(), size_t(3)); + dr.getHit(0, doc_id, rank); + dr.getSortBlob(0, buf, sz); + EXPECT_EQUAL(sz, 9u); + EXPECT_EQUAL(memcmp("sortdata1", buf, sz), 0); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(109)); + EXPECT_EQUAL(strcmp("doc17", doc_id), 0); + dr.getHit(1, doc_id, rank); + dr.getSortBlob(1, buf, sz); + EXPECT_EQUAL(sz, 9u); + EXPECT_EQUAL(memcmp("sortdata2", buf, sz), 0); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(89)); + EXPECT_EQUAL(strcmp("doc1", doc_id), 0); + dr.getHit(2, doc_id, rank); + dr.getSortBlob(2, buf, sz); + EXPECT_EQUAL(sz, 9u); + EXPECT_EQUAL(memcmp("sortdata3", buf, sz), 0); + EXPECT_EQUAL(rank, vdslib::SearchResult::RankType(90)); + EXPECT_EQUAL(strcmp("doc18", doc_id), 0); + } + + QueryResultMessage qrm3; + auto& sr3 = qrm3.getSearchResult(); + sr3.addHit(0, "doc1", 5); + sr3.addHit(1, "doc2", 7); + FeatureValues mf; + mf.names.emplace_back("foo"); + mf.names.emplace_back("bar"); + mf.values.resize(4); + mf.values[0].set_double(1.0); + mf.values[1].set_data({doc1_mf_data.data(), doc1_mf_data.size()}); + mf.values[2].set_double(12.0); + mf.values[3].set_data({doc2_mf_data.data(), doc2_mf_data.size()}); + sr3.set_match_features(FeatureValues(mf)); + sr3.sort(); + + serialize("QueryResultMessage-6", qrm3); + { + auto routable = deserialize("QueryResultMessage-6", DocumentProtocol::MESSAGE_QUERYRESULT, LANG_CPP); + if (!EXPECT_TRUE(routable)) { + return false; + } + auto& dm = dynamic_cast<QueryResultMessage&>(*routable); + auto& dr = dm.getSearchResult(); + EXPECT_EQUAL(dr.getHitCount(), size_t(2)); + dr.getHit(0, doc_id, rank); + EXPECT_EQUAL(vdslib::SearchResult::RankType(7), rank); + EXPECT_EQUAL(strcmp("doc2", doc_id), 0); + dr.getHit(1, doc_id, rank); + EXPECT_EQUAL(vdslib::SearchResult::RankType(5), rank); + EXPECT_EQUAL(strcmp("doc1", doc_id), 0); + auto mfv = dr.get_match_feature_values(0); + EXPECT_EQUAL(mfv.size(), 2u); + EXPECT_EQUAL(mfv[0].as_double(), 12.0); + EXPECT_EQUAL(mfv[1].as_data().make_string(), "There"); + mfv = dr.get_match_feature_values(1); + EXPECT_EQUAL(mfv.size(), 2u); + EXPECT_EQUAL(mfv[0].as_double(), 1.0); + EXPECT_EQUAL(mfv[1].as_data().make_string(), "Hi"); + const auto& mf_names = dr.get_match_features().names; + EXPECT_EQUAL(mf_names.size(), 2u); + EXPECT_EQUAL(mf_names[0], "foo"); + EXPECT_EQUAL(mf_names[1], "bar"); + } + return true; +} + +bool Messages80Test::test_query_result_reply() { + return try_visitor_reply("QueryResultReply", DocumentProtocol::REPLY_QUERYRESULT); +} + +bool Messages80Test::test_visitor_info_message() { + VisitorInfoMessage tmp; + tmp.getFinishedBuckets().emplace_back(16, 1); + tmp.getFinishedBuckets().emplace_back(16, 2); + tmp.getFinishedBuckets().emplace_back(16, 4); + string utf8 = "error message: \u00e6\u00c6\u00f8\u00d8\u00e5\u00c5\u00f6\u00d6"; // FIXME utf-8 literal + tmp.setErrorMessage(utf8); + + serialize("VisitorInfoMessage", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("VisitorInfoMessage", DocumentProtocol::MESSAGE_VISITORINFO, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<VisitorInfoMessage&>(*obj); + ASSERT_EQUAL(ref.getFinishedBuckets().size(), 3u); + EXPECT_EQUAL(ref.getFinishedBuckets()[0], document::BucketId(16, 1)); + EXPECT_EQUAL(ref.getFinishedBuckets()[1], document::BucketId(16, 2)); + EXPECT_EQUAL(ref.getFinishedBuckets()[2], document::BucketId(16, 4)); + EXPECT_EQUAL(ref.getErrorMessage(), utf8); + } + } + return true; +} + +bool Messages80Test::test_visitor_info_reply() { + return try_visitor_reply("VisitorInfoReply", DocumentProtocol::REPLY_VISITORINFO); +} + +bool Messages80Test::test_document_list_message() { + auto doc = createDoc(getTypeRepo(), "testdoc", "id:scheme:testdoc:n=1234:1"); + DocumentListMessage::Entry entry(1234, std::move(doc), true); + DocumentListMessage tmp(document::BucketId(17, 1234)); + tmp.getDocuments().push_back(std::move(entry)); + + serialize("DocumentListMessage", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("DocumentListMessage", DocumentProtocol::MESSAGE_DOCUMENTLIST, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<DocumentListMessage&>(*obj); + ASSERT_EQUAL(ref.getDocuments().size(), 1u); + EXPECT_EQUAL(ref.getDocuments()[0].getDocument()->getId().toString(), "id:scheme:testdoc:n=1234:1"); + EXPECT_EQUAL(ref.getDocuments()[0].getTimestamp(), 1234); + EXPECT_TRUE(ref.getDocuments()[0].isRemoveEntry()); + } + } + return true; +} + +bool Messages80Test::test_document_list_reply() { + return try_visitor_reply("DocumentListReply", DocumentProtocol::REPLY_DOCUMENTLIST); +} + +bool Messages80Test::test_empty_buckets_message() { + std::vector<document::BucketId> bids; + for (size_t i=0; i < 13; ++i) { + bids.emplace_back(16, i); + } + EmptyBucketsMessage msg(bids); + + serialize("EmptyBucketsMessage", msg); + + for (auto lang : languages()) { + auto obj = deserialize("EmptyBucketsMessage", DocumentProtocol::MESSAGE_EMPTYBUCKETS, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<EmptyBucketsMessage&>(*obj); + ASSERT_EQUAL(ref.getBucketIds().size(), 13u); + for (size_t i = 0; i < 13; ++i) { + EXPECT_EQUAL(ref.getBucketIds()[i], document::BucketId(16, i)); + } + } + } + return true; +} + +bool Messages80Test::test_empty_buckets_reply() { + return try_visitor_reply("EmptyBucketsReply", DocumentProtocol::REPLY_EMPTYBUCKETS); +} + +bool Messages80Test::test_get_bucket_list_message() { + GetBucketListMessage msg(document::BucketId(16, 123)); + msg.setBucketSpace("beartato"); + + serialize("GetBucketListMessage", msg); + + for (auto lang : languages()) { + auto obj = deserialize("GetBucketListMessage", DocumentProtocol::MESSAGE_GETBUCKETLIST, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<GetBucketListMessage&>(*obj); + EXPECT_EQUAL(ref.getBucketId(), document::BucketId(16, 123)); + EXPECT_EQUAL(ref.getBucketSpace(), "beartato"); + } + } + return true; +} + +bool Messages80Test::test_get_bucket_list_reply() { + GetBucketListReply reply; + reply.getBuckets().emplace_back(document::BucketId(16, 123), "foo"); + reply.getBuckets().emplace_back(document::BucketId(17, 1123), "bar"); + reply.getBuckets().emplace_back(document::BucketId(18, 11123), "zoink"); + + serialize("GetBucketListReply", reply); + + for (auto lang : languages()) { + auto obj = deserialize("GetBucketListReply", DocumentProtocol::REPLY_GETBUCKETLIST, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<GetBucketListReply&>(*obj); + ASSERT_EQUAL(ref.getBuckets().size(), 3u); + EXPECT_EQUAL(ref.getBuckets()[0], GetBucketListReply::BucketInfo(document::BucketId(16, 123), "foo")); + EXPECT_EQUAL(ref.getBuckets()[1], GetBucketListReply::BucketInfo(document::BucketId(17, 1123), "bar")); + EXPECT_EQUAL(ref.getBuckets()[2], GetBucketListReply::BucketInfo(document::BucketId(18, 11123), "zoink")); + } + } + return true; +} + +bool Messages80Test::test_get_bucket_state_message() { + GetBucketStateMessage tmp; + tmp.setBucketId(document::BucketId(16, 666)); + + serialize("GetBucketStateMessage", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("GetBucketStateMessage", DocumentProtocol::MESSAGE_GETBUCKETSTATE, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<GetBucketStateMessage&>(*obj); + EXPECT_EQUAL(ref.getBucketId().getUsedBits(), 16u); + EXPECT_EQUAL(ref.getBucketId().getId(), 4611686018427388570ULL); + } + } + return true; +} + +bool Messages80Test::test_get_bucket_state_reply() { + auto foo = document::DocumentId("id:ns:testdoc::foo").getGlobalId(); + auto bar = document::DocumentId("id:ns:testdoc::bar").getGlobalId(); + auto baz = document::DocumentId("id:ns:testdoc::baz"); + + GetBucketStateReply reply; + reply.getBucketState().emplace_back(foo, 777, false); + reply.getBucketState().emplace_back(bar, 888, true); + reply.getBucketState().emplace_back(baz, 999, false); + serialize("GetBucketStateReply", reply); + + for (auto lang : languages()) { + auto obj = deserialize("GetBucketStateReply", DocumentProtocol::REPLY_GETBUCKETSTATE, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<GetBucketStateReply&>(*obj); + ASSERT_EQUAL(ref.getBucketState().size(), 3u); + EXPECT_EQUAL(ref.getBucketState()[0].getTimestamp(), 777u); + EXPECT_FALSE(ref.getBucketState()[0].getDocumentId()); + EXPECT_EQUAL(ref.getBucketState()[0].getGlobalId(), foo); + EXPECT_FALSE(ref.getBucketState()[0].isRemoveEntry()); + + EXPECT_EQUAL(ref.getBucketState()[1].getTimestamp(), 888u); + EXPECT_FALSE(ref.getBucketState()[1].getDocumentId()); + EXPECT_EQUAL(ref.getBucketState()[1].getGlobalId(), bar); + EXPECT_TRUE(ref.getBucketState()[1].isRemoveEntry()); + + EXPECT_EQUAL(ref.getBucketState()[2].getTimestamp(), 999u); + EXPECT_EQUAL(ref.getBucketState()[2].getGlobalId(), baz.getGlobalId()); + EXPECT_FALSE(ref.getBucketState()[2].isRemoveEntry()); + ASSERT_TRUE(ref.getBucketState()[2].getDocumentId()); + EXPECT_EQUAL(*ref.getBucketState()[2].getDocumentId(), baz); + } + } + return true; +} + +bool Messages80Test::test_stat_bucket_message() { + StatBucketMessage msg(document::BucketId(16, 123), "id.user=123"); + msg.setBucketSpace("andrei"); + + serialize("StatBucketMessage", msg); + + for (auto lang : languages()) { + auto obj = deserialize("StatBucketMessage", DocumentProtocol::MESSAGE_STATBUCKET, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<StatBucketMessage&>(*obj); + EXPECT_EQUAL(ref.getBucketId(), document::BucketId(16, 123)); + EXPECT_EQUAL(ref.getDocumentSelection(), "id.user=123"); + EXPECT_EQUAL(ref.getBucketSpace(), "andrei"); + } + } + return true; +} + +bool Messages80Test::test_stat_bucket_reply() { + StatBucketReply msg; + msg.setResults("These are the votes of the Norwegian jury"); + + serialize("StatBucketReply", msg); + + for (auto lang : languages()) { + auto obj = deserialize("StatBucketReply", DocumentProtocol::REPLY_STATBUCKET, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<StatBucketReply&>(*obj); + EXPECT_EQUAL(ref.getResults(), "These are the votes of the Norwegian jury"); + } + } + return true; +} + +bool Messages80Test::test_wrong_distribution_reply() { + WrongDistributionReply tmp("distributor:3 storage:2"); + + serialize("WrongDistributionReply", tmp); + + for (auto lang : languages()) { + auto obj = deserialize("WrongDistributionReply", DocumentProtocol::REPLY_WRONGDISTRIBUTION, lang); + if (EXPECT_TRUE(obj)) { + auto& ref = dynamic_cast<WrongDistributionReply&>(*obj); + EXPECT_EQUAL(ref.getSystemState(), "distributor:3 storage:2"); + } + } + return true; +} + +bool Messages80Test::test_document_ignored_reply() { + DocumentIgnoredReply tmp; + serialize("DocumentIgnoredReply", tmp); + for (auto lang : languages()) { + auto obj = deserialize("DocumentIgnoredReply", DocumentProtocol::REPLY_DOCUMENTIGNORED, lang); + EXPECT_TRUE(obj); + } + return true; +} + +bool Messages80Test::try_visitor_reply(const string& filename, uint32_t type) { + VisitorReply tmp(type); + serialize(filename, tmp); + + for (auto lang : languages()) { + auto obj = deserialize(filename, type, lang); + if (EXPECT_TRUE(obj)) { + auto* ptr = dynamic_cast<VisitorReply*>(obj.get()); + EXPECT_TRUE(ptr); + } + } + return true; +} + +// TODO rewrite to Gtest +TEST_APPHOOK(Messages80Test); diff --git a/documentapi/src/tests/messages/testbase.cpp b/documentapi/src/tests/messages/testbase.cpp index 4ea770e7309..e5647ceaef8 100644 --- a/documentapi/src/tests/messages/testbase.cpp +++ b/documentapi/src/tests/messages/testbase.cpp @@ -25,6 +25,8 @@ TestBase::TestBase() : { } +TestBase::~TestBase() = default; + int TestBase::Main() { @@ -34,16 +36,15 @@ TestBase::Main() LOG(info, "Running tests for version %s.", getVersion().toString().c_str()); // Run registered tests. - for (std::map<uint32_t, TEST_METHOD_PT>::iterator it = _tests.begin(); - it != _tests.end(); ++it) - { - LOG(info, "Running test for routable type %d.", it->first); - EXPECT_TRUE( (this->*(it->second))() ); + for (const auto& test : _tests) { + LOG(info, "Running test for routable type %d.", test.first); + EXPECT_TRUE( (this->*(test.second))() ); TEST_FLUSH(); } // Test routable type coverage. - std::vector<uint32_t> expected, actual; + std::vector<uint32_t> expected; + std::vector<uint32_t> actual; EXPECT_TRUE(testCoverage(expected, actual)); expected.push_back(0); EXPECT_TRUE(!testCoverage(expected, actual)); @@ -58,10 +59,8 @@ TestBase::Main() _protocol.getRoutableTypes(getVersion(), expected); actual.clear(); - for (std::map<uint32_t, TEST_METHOD_PT>::iterator it = _tests.begin(); - it != _tests.end(); ++it) - { - actual.push_back(it->first); + for (const auto& test : _tests) { + actual.push_back(test.first); } if (shouldTestCoverage()) { EXPECT_TRUE(testCoverage(expected, actual, true)); @@ -100,13 +99,11 @@ TestBase::testCoverage(const std::vector<uint32_t> &expected, const std::vector< bool ret = true; std::vector<uint32_t> lst(actual); - for (std::vector<uint32_t>::const_iterator it = expected.begin(); - it != expected.end(); ++it) - { - std::vector<uint32_t>::iterator occ = std::find(lst.begin(), lst.end(), *it); + for (uint32_t wanted : expected) { + auto occ = std::find(lst.begin(), lst.end(), wanted); if (occ == lst.end()) { if (report) { - LOG(error, "Routable type %d is registered in DocumentProtocol but not tested.", *it); + LOG(error, "Routable type %d is registered in DocumentProtocol but not tested.", wanted); } ret = false; } else { @@ -115,10 +112,8 @@ TestBase::testCoverage(const std::vector<uint32_t> &expected, const std::vector< } if (!lst.empty()) { if (report) { - for (std::vector<uint32_t>::iterator it = lst.begin(); - it != lst.end(); ++it) - { - LOG(error, "Routable type %d is tested but not registered in DocumentProtocol.", *it); + for (uint32_t missing : lst) { + LOG(error, "Routable type %d is tested but not registered in DocumentProtocol.", missing); } } ret = false; @@ -151,7 +146,7 @@ TestBase::serialize(const string &filename, const mbus::Routable &routable, Tamp return 0; } mbus::Routable::UP obj = _protocol.decode(version, blob); - if (!EXPECT_TRUE(obj.get() != NULL)) { + if (!EXPECT_TRUE(obj.get() != nullptr)) { LOG(error, "Protocol failed to decode serialized data."); return 0; } @@ -172,7 +167,7 @@ TestBase::deserialize(const string &filename, uint32_t classId, uint32_t lang) mbus::Blob blob = readFile(path); if (!EXPECT_TRUE(blob.size() != 0)) { LOG(error, "Could not open file '%s' for reading.", path.c_str()); - return mbus::Routable::UP(); + return {}; } mbus::Routable::UP ret = _protocol.decode(version, blob); @@ -180,7 +175,7 @@ TestBase::deserialize(const string &filename, uint32_t classId, uint32_t lang) LOG(error, "Unable to decode class %d", classId); } else if (!EXPECT_TRUE(classId == ret->getType())) { LOG(error, "Expected class %d, got %d.", classId, ret->getType()); - return mbus::Routable::UP(); + return {}; } return ret; } diff --git a/documentapi/src/tests/messages/testbase.h b/documentapi/src/tests/messages/testbase.h index 313f2d1f293..ad371bcc3bc 100644 --- a/documentapi/src/tests/messages/testbase.h +++ b/documentapi/src/tests/messages/testbase.h @@ -36,8 +36,8 @@ protected: }; TestBase(); - virtual ~TestBase() { /* empty */ } - virtual const vespalib::Version getVersion() const = 0; + ~TestBase() override; + virtual vespalib::Version getVersion() const = 0; virtual bool shouldTestCoverage() const = 0; TestBase &putTest(uint32_t type, TEST_METHOD_PT test); int Main() override; diff --git a/documentapi/src/vespa/documentapi/CMakeLists.txt b/documentapi/src/vespa/documentapi/CMakeLists.txt index a3a2815cc4f..1d0b3784a9d 100644 --- a/documentapi/src/vespa/documentapi/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/CMakeLists.txt @@ -7,3 +7,5 @@ vespa_add_library(documentapi INSTALL lib64 DEPENDS ) + +vespa_add_target_package_dependency(documentapi Protobuf) diff --git a/documentapi/src/vespa/documentapi/messagebus/.gitignore b/documentapi/src/vespa/documentapi/messagebus/.gitignore index d58390943e2..488f2e6355d 100644 --- a/documentapi/src/vespa/documentapi/messagebus/.gitignore +++ b/documentapi/src/vespa/documentapi/messagebus/.gitignore @@ -2,3 +2,5 @@ Makefile config-*.cpp config-*.h +*.pb.cc +*.pb.h diff --git a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt index c3198ba7b2b..d59fd56037d 100644 --- a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt @@ -1,12 +1,35 @@ # Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +find_package(Protobuf REQUIRED) + +# .proto files are in a higher-level directory as they are shared across languages +set(documentapi_messagebus_PROTOBUF_REL_PATH "../../../protobuf") +PROTOBUF_GENERATE_CPP(documentapi_messagebus_PROTOBUF_SRCS documentapi_messagebus_PROTOBUF_HDRS + "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_common.proto" + "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_feed.proto" + "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_inspect.proto" + "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_visiting.proto") + +vespa_add_source_target(protobufgen_documentapi_messagebus DEPENDS + ${documentapi_messagebus_PROTOBUF_SRCS} + ${documentapi_messagebus_PROTOBUF_HDRS}) + +vespa_suppress_warnings_for_protobuf_sources(SOURCES ${documentapi_messagebus_PROTOBUF_SRCS}) + +# protoc explicitly annotates methods with inline, which triggers -Werror=inline when +# the header file grows over a certain size. +set_source_files_properties(routable_factories_8.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline") + vespa_add_library(documentapi_documentapimessagebus OBJECT SOURCES documentprotocol.cpp replymerger.cpp + routable_factories_8.cpp routablefactories60.cpp routablerepository.cpp routingpolicyfactories.cpp routingpolicyrepository.cpp + ${documentapi_messagebus_PROTOBUF_SRCS} DEPENDS documentapi_documentapipolicies ) diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index f16f63029ee..f53594c6d32 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -1,16 +1,17 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "replymerger.h" +#include "routable_factories_8.h" #include "routablefactories60.h" -#include "routingpolicyfactories.h" #include "routablerepository.h" +#include "routingpolicyfactories.h" #include "routingpolicyrepository.h" -#include "replymerger.h" #include <vespa/document/util/stringutil.h> #include <vespa/documentapi/documentapi.h> -#include <vespa/vespalib/util/exceptions.h> #include <vespa/messagebus/error.h> -#include <sstream> +#include <vespa/vespalib/util/exceptions.h> #include <cassert> +#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".documentprotocol"); @@ -40,47 +41,98 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo, putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>()); putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>()); + add_legacy_v6_factories(); + add_v8_factories(); +} + +DocumentProtocol::~DocumentProtocol() = default; + +void +DocumentProtocol::add_legacy_v6_factories() +{ // Prepare version specifications to use when adding routable factories. vespalib::VersionSpecification version6(6, 221); - std::vector<vespalib::VersionSpecification> from6 = { version6 }; // Add 6.x serialization - putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6); - putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6); - putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6); - putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6); - putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6); - putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6); - putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6); - putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6); - putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6); - putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6); - putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6); - putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6); - putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6); - putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6); - putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6); - putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6); - putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6); - putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6); - putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6); - putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6); - putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6); - putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6); - putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6); - putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6); - putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6); - putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6); - putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6); - putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6); - putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6); - putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6); - putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6); + putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6); + putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6); + putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6); + putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6); + putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6); + putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6); + putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6); + putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6); + putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6); + putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6); + putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6); + putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6); + putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6); + putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6); + putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6); + putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6); + putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6); + putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6); + putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6); + putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6); + putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6); + putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6); + putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6); + putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6); + putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6); + putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6); + putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6); + putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6); + putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6); + putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6); + putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6); putRoutableFactory(REPLY_WRONGDISTRIBUTION, std::make_shared<RoutableFactories60::WrongDistributionReplyFactory>(), from6); } -DocumentProtocol::~DocumentProtocol() = default; +void +DocumentProtocol::add_v8_factories() +{ + vespalib::VersionSpecification version8(8, 304); + std::vector<vespalib::VersionSpecification> from8 = { version8 }; + + using RF8 = messagebus::RoutableFactories80; + auto put_v8_factory = [&](auto msg_id, auto factory) { + putRoutableFactory(msg_id, std::move(factory), from8); + }; + + put_v8_factory(MESSAGE_CREATEVISITOR, RF8::create_visitor_message_factory()); + put_v8_factory(MESSAGE_DESTROYVISITOR, RF8::destroy_visitor_message_factory()); + put_v8_factory(MESSAGE_DOCUMENTLIST, RF8::document_list_message_factory(_repo)); + put_v8_factory(MESSAGE_EMPTYBUCKETS, RF8::empty_buckets_message_factory()); + put_v8_factory(MESSAGE_GETBUCKETLIST, RF8::get_bucket_list_message_factory()); + put_v8_factory(MESSAGE_GETBUCKETSTATE, RF8::get_bucket_state_message_factory()); + put_v8_factory(MESSAGE_GETDOCUMENT, RF8::get_document_message_factory()); + put_v8_factory(MESSAGE_MAPVISITOR, RF8::map_visitor_message_factory()); + put_v8_factory(MESSAGE_PUTDOCUMENT, RF8::put_document_message_factory(_repo)); + put_v8_factory(MESSAGE_QUERYRESULT, RF8::query_result_message_factory()); + put_v8_factory(MESSAGE_REMOVEDOCUMENT, RF8::remove_document_message_factory()); + put_v8_factory(MESSAGE_REMOVELOCATION, RF8::remove_location_message_factory(_repo)); + put_v8_factory(MESSAGE_STATBUCKET, RF8::stat_bucket_message_factory()); + put_v8_factory(MESSAGE_UPDATEDOCUMENT, RF8::update_document_message_factory(_repo)); + put_v8_factory(MESSAGE_VISITORINFO, RF8::visitor_info_message_factory()); + put_v8_factory(REPLY_CREATEVISITOR, RF8::create_visitor_reply_factory()); + put_v8_factory(REPLY_DESTROYVISITOR, RF8::destroy_visitor_reply_factory()); + put_v8_factory(REPLY_DOCUMENTIGNORED, RF8::document_ignored_reply_factory()); + put_v8_factory(REPLY_DOCUMENTLIST, RF8::document_list_reply_factory()); + put_v8_factory(REPLY_EMPTYBUCKETS, RF8::empty_buckets_reply_factory()); + put_v8_factory(REPLY_GETBUCKETLIST, RF8::get_bucket_list_reply_factory()); + put_v8_factory(REPLY_GETBUCKETSTATE, RF8::get_bucket_state_reply_factory()); + put_v8_factory(REPLY_GETDOCUMENT, RF8::get_document_reply_factory(_repo)); + put_v8_factory(REPLY_MAPVISITOR, RF8::map_visitor_reply_factory()); + put_v8_factory(REPLY_PUTDOCUMENT, RF8::put_document_reply_factory()); + put_v8_factory(REPLY_QUERYRESULT, RF8::query_result_reply_factory()); + put_v8_factory(REPLY_REMOVEDOCUMENT, RF8::remove_document_reply_factory()); + put_v8_factory(REPLY_REMOVELOCATION, RF8::remove_location_reply_factory()); + put_v8_factory(REPLY_STATBUCKET, RF8::stat_bucket_reply_factory()); + put_v8_factory(REPLY_UPDATEDOCUMENT, RF8::update_document_reply_factory()); + put_v8_factory(REPLY_VISITORINFO, RF8::visitor_info_reply_factory()); + put_v8_factory(REPLY_WRONGDISTRIBUTION, RF8::wrong_distribution_reply_factory()); +} mbus::IRoutingPolicy::UP DocumentProtocol::createPolicy(const mbus::string &name, const mbus::string ¶m) const @@ -99,13 +151,9 @@ mbus::Blob DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable &routable) const { mbus::Blob blob(_routableRepository->encode(version, routable)); - // When valgrind reports errors of uninitialized data being written to - // the network, it is useful to be able to see the serialized data to - // try to identify what bits are uninitialized. if (LOG_WOULD_LOG(spam)) { std::ostringstream message; - document::StringUtil::printAsHex( - message, blob.data(), blob.size()); + document::StringUtil::printAsHex(message, blob.data(), blob.size()); LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s", routable.getProtocol().c_str(), routable.getType(), version.toString().c_str(), message.str().c_str()); @@ -120,7 +168,7 @@ DocumentProtocol::decode(const vespalib::Version &version, mbus::BlobRef data) c return _routableRepository->decode(version, data); } catch (vespalib::Exception &e) { LOG(warning, "%s", e.getMessage().c_str()); - return mbus::Routable::UP(); + return {}; } } diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h index c771e86031d..b9658750ada 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h @@ -294,6 +294,10 @@ public: mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string ¶m) const override; mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override; mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override; + +private: + void add_legacy_v6_factories(); + void add_v8_factories(); }; } diff --git a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h index bd2f245ad15..12c6a50fa24 100644 --- a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h +++ b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h @@ -22,18 +22,13 @@ class IRoutableFactory { protected: IRoutableFactory() = default; public: - /** - * Convenience typedefs. - */ using UP = std::unique_ptr<IRoutableFactory>; using SP = std::shared_ptr<IRoutableFactory>; IRoutableFactory(const IRoutableFactory &) = delete; IRoutableFactory & operator = (const IRoutableFactory &) = delete; - /** - * Virtual destructor required for inheritance. - */ - virtual ~IRoutableFactory() { } + + virtual ~IRoutableFactory() = default; /** * This method encodes the content of the given routable into a byte buffer that can later be decoded @@ -45,11 +40,11 @@ public: * @param out The buffer to write into. * @return True if the routable could be encoded. */ - virtual bool encode(const mbus::Routable &obj, - vespalib::GrowableByteBuffer &out) const = 0; + [[nodiscard]] virtual bool encode(const mbus::Routable &obj, + vespalib::GrowableByteBuffer &out) const = 0; /** - * This method decodes the given byte bufer to a routable. + * This method decodes the given byte buffer to a routable. * * This method is NOT exception safe. Return null to signal failure. * @@ -57,7 +52,7 @@ public: * @param loadTypes The set of configured load types. * @return The decoded routable. */ - virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0; + [[nodiscard]] virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0; }; } diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h index 3d713c95da8..c47c2421fcb 100644 --- a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h +++ b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h @@ -18,7 +18,7 @@ protected: * * @return A document reply that corresponds to this message. */ - virtual DocumentReply::UP doCreateReply() const = 0; + [[nodiscard]] virtual DocumentReply::UP doCreateReply() const = 0; public: /** @@ -37,16 +37,16 @@ public: * * @return The created reply. */ - std::unique_ptr<mbus::Reply> createReply() const; + [[nodiscard]] std::unique_ptr<mbus::Reply> createReply() const; /** * Returns the priority of this message. * * @return The priority. */ - Priority::Value getPriority() const { return _priority; }; + [[nodiscard]] Priority::Value getPriority() const { return _priority; }; - uint8_t priority() const override { return (uint8_t)_priority; } + [[nodiscard]] uint8_t priority() const override { return (uint8_t)_priority; } /** * Sets the priority tag for this message. @@ -55,7 +55,7 @@ public: */ void setPriority(Priority::Value p) { _priority = p; }; - uint32_t getApproxSize() const override; + [[nodiscard]] uint32_t getApproxSize() const override; void setApproxSize(uint32_t approxSize) { _approxSize = approxSize; diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp index 755e523c065..2c5833beb20 100644 --- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp @@ -14,13 +14,12 @@ EmptyBucketsMessage::EmptyBucketsMessage(const std::vector<document::BucketId> & { } -EmptyBucketsMessage::~EmptyBucketsMessage() { -} +EmptyBucketsMessage::~EmptyBucketsMessage() = default; void -EmptyBucketsMessage::setBucketIds(const std::vector<document::BucketId> &bucketIds) +EmptyBucketsMessage::setBucketIds(std::vector<document::BucketId> bucketIds) { - _bucketIds = bucketIds; + _bucketIds = std::move(bucketIds); } void diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h index 7cecd1e1a2b..7078efc778c 100644 --- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h +++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h @@ -27,7 +27,7 @@ public: std::vector<document::BucketId> &getBucketIds() { return _bucketIds; } const std::vector<document::BucketId> &getBucketIds() const { return _bucketIds; } - void setBucketIds(const std::vector<document::BucketId> &bucketIds); + void setBucketIds(std::vector<document::BucketId> bucketIds); void resize(uint32_t size); uint32_t getType() const override; string toString() const override { return "emptybucketsmessage"; } diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h index dbab28e3172..1ebc867156e 100644 --- a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h +++ b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h @@ -30,8 +30,8 @@ public: * * @param document The document to put. */ - PutDocumentMessage(DocumentSP document); - ~PutDocumentMessage(); + explicit PutDocumentMessage(DocumentSP document); + ~PutDocumentMessage() override; /** * Returns the document to put. @@ -39,7 +39,7 @@ public: * @return The document. */ const DocumentSP & getDocumentSP() const { return _document; } - DocumentSP stealDocument() { return std::move(_document); } + [[nodiscard]] DocumentSP stealDocument() { return std::move(_document); } const document::Document & getDocument() const { return *_document; } /** @@ -68,7 +68,7 @@ public: string toString() const override { return "putdocumentmessage"; } void set_create_if_non_existent(bool value) noexcept { _create_if_non_existent = value; } - bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; } + [[nodiscard]] bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; } }; } diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h index 87c3456d62c..4608350f8a6 100644 --- a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h +++ b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h @@ -10,7 +10,7 @@ namespace document { class BucketIdFactory; } namespace documentapi { /** - * Message (VDS only) to remove an entire location for users using user or group schemes for their documents. + * Message to remove an entire location for users using user or group schemes for their documents. * A location in this context is either a user id or a group name. */ class RemoveLocationMessage : public DocumentMessage { diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h index cd0cb5e1d3a..da3219d999f 100644 --- a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h +++ b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h @@ -74,12 +74,16 @@ public: const vdslib::Parameters& getParameters() const { return _params; } vdslib::Parameters& getParameters() { return _params; } void setParameters(const vdslib::Parameters& params) { _params = params; } + void setParameters(vdslib::Parameters&& params) noexcept { _params = std::move(params); } uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; } void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; } const std::vector<document::BucketId>& getBuckets() const { return _buckets; } std::vector<document::BucketId>& getBuckets() { return _buckets; } + void setBuckets(std::vector<document::BucketId> buckets) noexcept { + _buckets = std::move(buckets); + } const document::BucketId getBucketId() const { return *_buckets.begin(); } @@ -196,6 +200,9 @@ public: std::vector<document::BucketId>& getFinishedBuckets() { return _finishedBuckets; } const std::vector<document::BucketId>& getFinishedBuckets() const { return _finishedBuckets; } + void setFinishedBuckets(std::vector<document::BucketId> buckets) noexcept { + _finishedBuckets = std::move(buckets); + } const string& getErrorMessage() const { return _errorMessage; } void setErrorMessage(const string& errorMessage) { _errorMessage = errorMessage; }; @@ -224,6 +231,7 @@ public: vdslib::Parameters& getData() { return _data; }; const vdslib::Parameters& getData() const { return _data; }; + void setData(vdslib::Parameters&& data) noexcept { _data = std::move(data); } uint32_t getApproxSize() const override; uint32_t getType() const override; @@ -246,9 +254,9 @@ public: Entry(const Entry& other); Entry(const document::DocumentTypeRepo &repo, document::ByteBuffer& buf); - int64_t getTimestamp() { return _timestamp; } - const DocumentSP & getDocument() { return _document; } - bool isRemoveEntry() { return _removeEntry; } + int64_t getTimestamp() const noexcept { return _timestamp; } + const DocumentSP & getDocument() const noexcept { return _document; } + bool isRemoveEntry() const noexcept { return _removeEntry; } void serialize(vespalib::GrowableByteBuffer& buf) const; private: diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp new file mode 100644 index 00000000000..399801526ba --- /dev/null +++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp @@ -0,0 +1,883 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "routable_factories_8.h" +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/select/parser.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/document/util/serializableexceptions.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/documentapi/messagebus/docapi_common.pb.h> +#include <vespa/documentapi/messagebus/docapi_feed.pb.h> +#include <vespa/documentapi/messagebus/docapi_visiting.pb.h> +#include <vespa/documentapi/messagebus/docapi_inspect.pb.h> +#include <vespa/vespalib/objects/nbostream.h> + +namespace documentapi::messagebus { + +namespace { + +// Protobuf codec helpers for common types + +void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) { + dest.set_raw_id(src.getRawId()); +} + +document::BucketId get_bucket_id(const protobuf::BucketId& src) { + return document::BucketId(src.raw_id()); +} + +void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) { + auto doc_id = src.toString(); + dest.set_id(doc_id.data(), doc_id.size()); +} + +document::DocumentId get_document_id(const protobuf::DocumentId& src) { + return document::DocumentId(src.id()); +} + +// TODO DocumentAPI should be extended to use actual document::FieldSet enums instead of always passing strings. +void set_raw_field_set(protobuf::FieldSet& dest, vespalib::stringref src) { + dest.set_spec(src.data(), src.size()); +} + +// Note: returns by ref +vespalib::stringref get_raw_field_set(const protobuf::FieldSet& src) noexcept { + return src.spec(); +} + +void set_raw_selection(protobuf::DocumentSelection& dest, vespalib::stringref src) { + dest.set_selection(src.data(), src.size()); +} + +// Note: returns by ref +vespalib::stringref get_raw_selection(const protobuf::DocumentSelection& src) noexcept { + return src.selection(); +} + +void set_bucket_space(protobuf::BucketSpace& dest, vespalib::stringref space_name) { + dest.set_name(space_name.data(), space_name.size()); +} + +// Note: returns by ref +vespalib::stringref get_bucket_space(const protobuf::BucketSpace& src) noexcept { + return src.name(); +} + +void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) { + char tmp[document::GlobalId::LENGTH]; + memcpy(tmp, src.get(), document::GlobalId::LENGTH); + dest.set_raw_gid(tmp, document::GlobalId::LENGTH); +} + +document::GlobalId get_global_id(const protobuf::GlobalId& src) { + if (src.raw_gid().size() != document::GlobalId::LENGTH) { + throw document::DeserializeException("Unexpected serialized protobuf GlobalId size"); + } + return document::GlobalId(src.raw_gid().data()); // By copy +} + +documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) { + return documentapi::TestAndSetCondition(src.selection()); +} + +void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) { + dest.set_selection(src.getSelection().data(), src.getSelection().size()); +} + +std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc, + const document::DocumentTypeRepo& type_repo) +{ + if (!src_doc.payload().empty()) { + vespalib::nbostream doc_buf(src_doc.payload().data(), src_doc.payload().size()); + return std::make_shared<document::Document>(type_repo, doc_buf); + } + return {}; +} + +std::shared_ptr<document::Document> get_document_or_throw(const protobuf::Document& src_doc, + const document::DocumentTypeRepo& type_repo) +{ + auto doc = get_document(src_doc, type_repo); + if (!doc) [[unlikely]] { + throw document::DeserializeException("Message does not contain a required document object", VESPA_STRLOC); + } + return doc; +} + +void set_document(protobuf::Document& target_doc, const document::Document& src_doc) { + vespalib::nbostream stream; + src_doc.serialize(stream); + target_doc.set_payload(stream.peek(), stream.size()); +} + +void set_update(protobuf::DocumentUpdate& dest, const document::DocumentUpdate& src) { + vespalib::nbostream stream; + src.serializeHEAD(stream); + dest.set_payload(stream.peek(), stream.size()); +} + +std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::DocumentUpdate& src, + const document::DocumentTypeRepo& type_repo) +{ + if (!src.payload().empty()) { + return document::DocumentUpdate::createHEAD( + type_repo, vespalib::nbostream(src.payload().data(), src.payload().size())); + } + return {}; +} + +std::shared_ptr<document::DocumentUpdate> get_update_or_throw(const protobuf::DocumentUpdate& src, + const document::DocumentTypeRepo& type_repo) +{ + auto upd = get_update(src, type_repo); + if (!upd) [[unlikely]] { + throw document::DeserializeException("Message does not contain a required document update object", VESPA_STRLOC); + } + return upd; +} + +template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn> +requires std::is_invocable_r_v<void, EncodeFn, const DocApiType&, ProtobufType&> && + std::is_invocable_r_v<std::unique_ptr<DocApiType>, DecodeFn, const ProtobufType&> +class ProtobufRoutableFactory final : public IRoutableFactory { + EncodeFn _encode_fn; + DecodeFn _decode_fn; +public: + template <typename EncFn, typename DecFn> + ProtobufRoutableFactory(EncFn&& enc_fn, DecFn&& dec_fn) noexcept + : _encode_fn(std::forward<EncFn>(enc_fn)), + _decode_fn(std::forward<DecFn>(dec_fn)) + {} + ~ProtobufRoutableFactory() override = default; + + bool encode(const mbus::Routable& obj, vespalib::GrowableByteBuffer& out) const override { + ::google::protobuf::Arena arena; + auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena); + + _encode_fn(dynamic_cast<const DocApiType&>(obj), *proto_obj); + + const auto sz = proto_obj->ByteSizeLong(); + assert(sz <= INT32_MAX); + auto* buf = reinterpret_cast<uint8_t*>(out.allocate(sz)); + return proto_obj->SerializeWithCachedSizesToArray(buf); + } + + mbus::Routable::UP decode(document::ByteBuffer& in) const override { + ::google::protobuf::Arena arena; + auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena); + const auto buf_size = in.getRemaining(); + assert(buf_size <= INT_MAX); + bool ok = proto_obj->ParseFromArray(in.getBufferAtPos(), buf_size); + if (!ok) { + return {}; // Malformed protobuf payload + } + auto msg = _decode_fn(*proto_obj); + if constexpr (std::is_base_of_v<DocumentMessage, DocApiType>) { + msg->setApproxSize(buf_size); // Wire size is a proxy for in-memory size + } + return msg; + } +}; + +template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn> +auto make_codec(EncodeFn&& enc_fn, DecodeFn&& dec_fn) { + return std::make_shared<ProtobufRoutableFactory<DocApiType, ProtobufType, EncodeFn, DecodeFn>>( + std::forward<EncodeFn>(enc_fn), std::forward<DecodeFn>(dec_fn)); +} + +} // anon ns + +// --------------------------------------------- +// Get request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_message_factory() { + return make_codec<GetDocumentMessage, protobuf::GetDocumentRequest>( + [](const GetDocumentMessage& src, protobuf::GetDocumentRequest& dest) { + set_document_id(*dest.mutable_document_id(), src.getDocumentId()); + set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet()); + }, + [](const protobuf::GetDocumentRequest& src) { + return std::make_unique<GetDocumentMessage>(get_document_id(src.document_id()), get_raw_field_set(src.field_set())); + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) { + return make_codec<GetDocumentReply, protobuf::GetDocumentResponse>( + [](const GetDocumentReply& src, protobuf::GetDocumentResponse& dest) { + if (src.hasDocument()) { + set_document(*dest.mutable_document(), src.getDocument()); + } + dest.set_last_modified(src.getLastModified()); + }, + [type_repo = std::move(repo)](const protobuf::GetDocumentResponse& src) { + auto msg = std::make_unique<GetDocumentReply>(); + if (src.has_document()) { + auto doc = get_document(src.document(), *type_repo); + doc->setLastModified(static_cast<int64_t>(src.last_modified())); + msg->setDocument(std::move(doc)); + } + msg->setLastModified(src.last_modified()); + return msg; + } + ); +} + +// --------------------------------------------- +// Put request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) { + return make_codec<PutDocumentMessage, protobuf::PutDocumentRequest>( + [](const PutDocumentMessage& src, protobuf::PutDocumentRequest& dest) { + dest.set_force_assign_timestamp(src.getTimestamp()); + if (src.getCondition().isPresent()) { + set_tas_condition(*dest.mutable_condition(), src.getCondition()); + } + if (src.getDocumentSP()) { // This should always be present in practice + set_document(*dest.mutable_document(), src.getDocument()); + } + dest.set_create_if_missing(src.get_create_if_non_existent()); + }, + [type_repo = std::move(repo)](const protobuf::PutDocumentRequest& src) { + auto msg = std::make_unique<PutDocumentMessage>(); + msg->setDocument(get_document_or_throw(src.document(), *type_repo)); + if (src.has_condition()) { + msg->setCondition(get_tas_condition(src.condition())); + } + msg->setTimestamp(src.force_assign_timestamp()); + msg->set_create_if_non_existent(src.create_if_missing()); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_reply_factory() { + return make_codec<WriteDocumentReply, protobuf::PutDocumentResponse>( + [](const WriteDocumentReply& src, protobuf::PutDocumentResponse& dest) { + dest.set_modification_timestamp(src.getHighestModificationTimestamp()); + }, + [](const protobuf::PutDocumentResponse& src) { + auto msg = std::make_unique<WriteDocumentReply>(DocumentProtocol::REPLY_PUTDOCUMENT); + msg->setHighestModificationTimestamp(src.modification_timestamp()); + return msg; + } + ); +} + +// --------------------------------------------- +// Update request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) { + return make_codec<UpdateDocumentMessage, protobuf::UpdateDocumentRequest>( + [](const UpdateDocumentMessage& src, protobuf::UpdateDocumentRequest& dest) { + set_update(*dest.mutable_update(), src.getDocumentUpdate()); + if (src.getCondition().isPresent()) { + set_tas_condition(*dest.mutable_condition(), src.getCondition()); + } + dest.set_expected_old_timestamp(src.getOldTimestamp()); + dest.set_force_assign_timestamp(src.getNewTimestamp()); + }, + [type_repo = std::move(repo)](const protobuf::UpdateDocumentRequest& src) { + auto msg = std::make_unique<UpdateDocumentMessage>(); + msg->setDocumentUpdate(get_update_or_throw(src.update(), *type_repo)); + if (src.has_condition()) { + msg->setCondition(get_tas_condition(src.condition())); + } + msg->setOldTimestamp(src.expected_old_timestamp()); + msg->setNewTimestamp(src.force_assign_timestamp()); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_reply_factory() { + return make_codec<UpdateDocumentReply, protobuf::UpdateDocumentResponse>( + [](const UpdateDocumentReply& src, protobuf::UpdateDocumentResponse& dest) { + dest.set_was_found(src.wasFound()); + dest.set_modification_timestamp(src.getHighestModificationTimestamp()); + }, + [](const protobuf::UpdateDocumentResponse& src) { + auto msg = std::make_unique<UpdateDocumentReply>(); + msg->setWasFound(src.was_found()); + msg->setHighestModificationTimestamp(src.modification_timestamp()); + return msg; + } + ); +} + +// --------------------------------------------- +// Remove request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_message_factory() { + return make_codec<RemoveDocumentMessage, protobuf::RemoveDocumentRequest>( + [](const RemoveDocumentMessage& src, protobuf::RemoveDocumentRequest& dest) { + set_document_id(*dest.mutable_document_id(), src.getDocumentId()); + if (src.getCondition().isPresent()) { + set_tas_condition(*dest.mutable_condition(), src.getCondition()); + } + }, + [](const protobuf::RemoveDocumentRequest& src) { + auto msg = std::make_unique<RemoveDocumentMessage>(); + msg->setDocumentId(get_document_id(src.document_id())); + if (src.has_condition()) { + msg->setCondition(get_tas_condition(src.condition())); + } + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_reply_factory() { + return make_codec<RemoveDocumentReply, protobuf::RemoveDocumentResponse>( + [](const RemoveDocumentReply& src, protobuf::RemoveDocumentResponse& dest) { + dest.set_was_found(src.wasFound()); + dest.set_modification_timestamp(src.getHighestModificationTimestamp()); + }, + [](const protobuf::RemoveDocumentResponse& src) { + auto msg = std::make_unique<RemoveDocumentReply>(); + msg->setWasFound(src.was_found()); + msg->setHighestModificationTimestamp(src.modification_timestamp()); + return msg; + } + ); +} + +// --------------------------------------------- +// RemoveLocation request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> +RoutableFactories80::remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) { + return make_codec<RemoveLocationMessage, protobuf::RemoveLocationRequest>( + [](const RemoveLocationMessage& src, protobuf::RemoveLocationRequest& dest) { + set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection()); + set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace()); + }, + [type_repo = std::move(repo)](const protobuf::RemoveLocationRequest& src) { + document::BucketIdFactory factory; + document::select::Parser parser(*type_repo, factory); + auto msg = std::make_unique<RemoveLocationMessage>(factory, parser, get_raw_selection(src.selection())); + msg->setBucketSpace(get_bucket_space(src.bucket_space())); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_location_reply_factory() { + return make_codec<DocumentReply, protobuf::RemoveLocationResponse>( + []([[maybe_unused]] const DocumentReply& src, [[maybe_unused]] protobuf::RemoveLocationResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::RemoveLocationResponse& src) { + // The lack of 1-1 type mapping is pretty awkward :I + return std::make_unique<DocumentReply>(DocumentProtocol::REPLY_REMOVELOCATION); + } + ); +} + +// --------------------------------------------- +// CreateVisitor request and response +// --------------------------------------------- + +namespace { + +void set_bucket_id_vector(::google::protobuf::RepeatedPtrField<protobuf::BucketId>& dest, + const std::vector<document::BucketId>& src) +{ + assert(src.size() <= INT_MAX); + dest.Reserve(static_cast<int>(src.size())); + for (const auto& bucket_id : src) { + set_bucket_id(*dest.Add(), bucket_id); + } +} + +std::vector<document::BucketId> get_bucket_id_vector(const ::google::protobuf::RepeatedPtrField<protobuf::BucketId>& src) { + std::vector<document::BucketId> ids; + ids.reserve(src.size()); + for (const auto& proto_bucket : src) { + ids.emplace_back(proto_bucket.raw_id()); + } + return ids; +} + +void set_visitor_params(::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& dest, + const vdslib::Parameters& src) +{ + assert(src.size() <= INT_MAX); + dest.Reserve(static_cast<int>(src.size())); + for (const auto& kv : src) { + auto* proto_kv = dest.Add(); + proto_kv->set_key(kv.first.data(), kv.first.size()); + proto_kv->set_value(kv.second.data(), kv.second.size()); + } +} + +vdslib::Parameters get_visitor_params(const ::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& src) { + vdslib::Parameters params; + for (const auto& proto_kv : src) { + params.set(proto_kv.key(), proto_kv.value()); + } + return params; +} + +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_message_factory() { + return make_codec<CreateVisitorMessage, protobuf::CreateVisitorRequest>( + [](const CreateVisitorMessage& src, protobuf::CreateVisitorRequest& dest) { + dest.set_visitor_library_name(src.getLibraryName().data(), src.getLibraryName().size()); + dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size()); + dest.set_control_destination(src.getControlDestination().data(), src.getControlDestination().size()); + dest.set_data_destination(src.getDataDestination().data(), src.getDataDestination().size()); + set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection()); + dest.set_max_pending_reply_count(src.getMaximumPendingReplyCount()); + + set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace()); + set_bucket_id_vector(*dest.mutable_buckets(), src.getBuckets()); + + dest.set_from_timestamp(src.getFromTimestamp()); + dest.set_to_timestamp(src.getToTimestamp()); + dest.set_visit_tombstones(src.visitRemoves()); + set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet()); + dest.set_visit_inconsistent_buckets(src.visitInconsistentBuckets()); + dest.set_max_buckets_per_visitor(src.getMaxBucketsPerVisitor()); + + set_visitor_params(*dest.mutable_parameters(), src.getParameters()); + }, + [](const protobuf::CreateVisitorRequest& src) { + auto msg = std::make_unique<CreateVisitorMessage>(); + msg->setLibraryName(src.visitor_library_name()); + msg->setInstanceId(src.instance_id()); + msg->setControlDestination(src.control_destination()); + msg->setDataDestination(src.data_destination()); + msg->setDocumentSelection(get_raw_selection(src.selection())); + msg->setMaximumPendingReplyCount(src.max_pending_reply_count()); + msg->setBucketSpace(get_bucket_space(src.bucket_space())); + msg->setBuckets(get_bucket_id_vector(src.buckets())); + msg->setFromTimestamp(src.from_timestamp()); + msg->setToTimestamp(src.to_timestamp()); + msg->setVisitRemoves(src.visit_tombstones()); + msg->setFieldSet(get_raw_field_set(src.field_set())); + msg->setVisitInconsistentBuckets(src.visit_inconsistent_buckets()); + msg->setMaxBucketsPerVisitor(src.max_buckets_per_visitor()); + msg->setVisitorDispatcherVersion(50); // Hard-coded; same as for v6 serialization + msg->setParameters(get_visitor_params(src.parameters())); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_reply_factory() { + return make_codec<CreateVisitorReply, protobuf::CreateVisitorResponse>( + [](const CreateVisitorReply& src, protobuf::CreateVisitorResponse& dest) { + set_bucket_id(*dest.mutable_last_bucket(), src.getLastBucket()); + const auto& vs = src.getVisitorStatistics(); + auto* stats = dest.mutable_statistics(); + stats->set_buckets_visited(vs.getBucketsVisited()); + stats->set_documents_visited(vs.getDocumentsVisited()); + stats->set_bytes_visited(vs.getBytesVisited()); + stats->set_documents_returned(vs.getDocumentsReturned()); + stats->set_bytes_returned(vs.getBytesReturned()); + }, + [](const protobuf::CreateVisitorResponse& src) { + auto reply = std::make_unique<CreateVisitorReply>(DocumentProtocol::REPLY_CREATEVISITOR); + reply->setLastBucket(get_bucket_id(src.last_bucket())); + const auto& vs = src.statistics(); + vdslib::VisitorStatistics stats; + stats.setBucketsVisited(vs.buckets_visited()); + stats.setDocumentsVisited(vs.documents_visited()); + stats.setBytesVisited(vs.bytes_visited()); + stats.setDocumentsReturned(vs.documents_returned()); + stats.setBytesReturned(vs.bytes_returned()); + reply->setVisitorStatistics(stats); + return reply; + } + ); +} + +// --------------------------------------------- +// DestroyVisitor request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_message_factory() { + return make_codec<DestroyVisitorMessage, protobuf::DestroyVisitorRequest>( + [](const DestroyVisitorMessage& src, protobuf::DestroyVisitorRequest& dest) { + dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size()); + }, + [](const protobuf::DestroyVisitorRequest& src) { + auto msg = std::make_unique<DestroyVisitorMessage>(); + msg->setInstanceId(src.instance_id()); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_reply_factory() { + return make_codec<VisitorReply, protobuf::DestroyVisitorResponse>( + []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DestroyVisitorResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::DestroyVisitorResponse& src) { + return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DESTROYVISITOR); + } + ); +} + +// --------------------------------------------- +// MapVisitor request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_message_factory() { + return make_codec<MapVisitorMessage, protobuf::MapVisitorRequest>( + [](const MapVisitorMessage& src, protobuf::MapVisitorRequest& dest) { + set_visitor_params(*dest.mutable_data(), src.getData()); + }, + [](const protobuf::MapVisitorRequest& src) { + auto msg = std::make_unique<MapVisitorMessage>(); + msg->setData(get_visitor_params(src.data())); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_reply_factory() { + return make_codec<VisitorReply, protobuf::MapVisitorResponse>( + []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::MapVisitorResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::MapVisitorResponse& src) { + return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_MAPVISITOR); + } + ); +} + +// --------------------------------------------- +// QueryResult request and response +// --------------------------------------------- + +namespace { + +void set_search_result(protobuf::SearchResult& dest, const vdslib::SearchResult& src) { + // We treat these as opaque blobs for now. Should ideally be protobuf as well. + vespalib::GrowableByteBuffer buf; + src.serialize(buf); + assert(buf.position() <= INT_MAX); + dest.set_payload(buf.getBuffer(), buf.position()); +} + +void set_document_summary(protobuf::DocumentSummary& dest, const vdslib::DocumentSummary& src) { + // We treat these as opaque blobs for now. Should ideally be protobuf as well. + vespalib::GrowableByteBuffer buf; + src.serialize(buf); + assert(buf.position() <= INT_MAX); + dest.set_payload(buf.getBuffer(), buf.position()); +} + +document::ByteBuffer wrap_as_buffer(std::string_view buf) { + assert(buf.size() <= UINT32_MAX); + return {buf.data(), static_cast<uint32_t>(buf.size())}; +} + +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_message_factory() { + return make_codec<QueryResultMessage, protobuf::QueryResultRequest>( + [](const QueryResultMessage& src, protobuf::QueryResultRequest& dest) { + set_search_result(*dest.mutable_search_result(), src.getSearchResult()); + set_document_summary(*dest.mutable_document_summary(), src.getDocumentSummary()); + }, + [](const protobuf::QueryResultRequest& src) { + auto msg = std::make_unique<QueryResultMessage>(); + // Explicitly enforce presence of result/summary fields, as our object is not necessarily + // well-defined if these have not been initialized. + if (!src.has_search_result() || !src.has_document_summary()) { + throw document::DeserializeException("Query result does not have all required fields set", VESPA_STRLOC); + } + { + auto buf_view = wrap_as_buffer(src.search_result().payload()); // Must be lvalue + msg->getSearchResult().deserialize(buf_view); + } + { + auto buf_view = wrap_as_buffer(src.document_summary().payload()); // Also lvalue + msg->getDocumentSummary().deserialize(buf_view); + } + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_reply_factory() { + return make_codec<VisitorReply, protobuf::QueryResultResponse>( + []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::QueryResultResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::QueryResultResponse& src) { + return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_QUERYRESULT); + } + ); +} + +// --------------------------------------------- +// VisitorInfo request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_message_factory() { + return make_codec<VisitorInfoMessage, protobuf::VisitorInfoRequest>( + [](const VisitorInfoMessage& src, protobuf::VisitorInfoRequest& dest) { + set_bucket_id_vector(*dest.mutable_finished_buckets(), src.getFinishedBuckets()); + dest.set_error_message(src.getErrorMessage()); + }, + [](const protobuf::VisitorInfoRequest& src) { + auto msg = std::make_unique<VisitorInfoMessage>(); + msg->setFinishedBuckets(get_bucket_id_vector(src.finished_buckets())); + msg->setErrorMessage(src.error_message()); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_reply_factory() { + return make_codec<VisitorReply, protobuf::VisitorInfoResponse>( + []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::VisitorInfoResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::VisitorInfoResponse& src) { + return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_VISITORINFO); + } + ); +} + +// --------------------------------------------- +// DocumentList request and response +// TODO deprecate +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> +RoutableFactories80::document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) { + return make_codec<DocumentListMessage, protobuf::DocumentListRequest>( + [](const DocumentListMessage& src, protobuf::DocumentListRequest& dest) { + set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId()); + for (const auto& doc : src.getDocuments()) { + auto* proto_entry = dest.add_entries(); + proto_entry->set_timestamp(doc.getTimestamp()); + proto_entry->set_is_tombstone(doc.isRemoveEntry()); + set_document(*proto_entry->mutable_document(), *doc.getDocument()); + } + }, + [type_repo = std::move(repo)](const protobuf::DocumentListRequest& src) { + auto msg = std::make_unique<DocumentListMessage>(); + msg->setBucketId(get_bucket_id(src.bucket_id())); + for (const auto& proto_entry : src.entries()) { + auto doc = get_document_or_throw(proto_entry.document(), *type_repo); + msg->getDocuments().emplace_back(proto_entry.timestamp(), std::move(doc), proto_entry.is_tombstone()); + } + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::document_list_reply_factory() { + return make_codec<VisitorReply, protobuf::DocumentListResponse>( + []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DocumentListResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::DocumentListResponse& src) { + return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DOCUMENTLIST); + } + ); +} + +// --------------------------------------------- +// EmptyBuckets request and response +// TODO this should be deprecated +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_message_factory() { + return make_codec<EmptyBucketsMessage, protobuf::EmptyBucketsRequest>( + [](const EmptyBucketsMessage& src, protobuf::EmptyBucketsRequest& dest) { + set_bucket_id_vector(*dest.mutable_bucket_ids(), src.getBucketIds()); + }, + [](const protobuf::EmptyBucketsRequest& src) { + auto msg = std::make_unique<EmptyBucketsMessage>(); + msg->setBucketIds(get_bucket_id_vector(src.bucket_ids())); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_reply_factory() { + return make_codec<VisitorReply, protobuf::EmptyBucketsResponse>( + []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::EmptyBucketsResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::EmptyBucketsResponse& src) { + return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_EMPTYBUCKETS); // ugh + } + ); +} + +// --------------------------------------------- +// GetBucketList request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_message_factory() { + return make_codec<GetBucketListMessage, protobuf::GetBucketListRequest>( + [](const GetBucketListMessage& src, protobuf::GetBucketListRequest& dest) { + set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId()); + set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace()); + }, + [](const protobuf::GetBucketListRequest& src) { + auto msg = std::make_unique<GetBucketListMessage>(get_bucket_id(src.bucket_id())); + msg->setBucketSpace(get_bucket_space(src.bucket_space())); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_reply_factory() { + return make_codec<GetBucketListReply, protobuf::GetBucketListResponse>( + [](const GetBucketListReply& src, protobuf::GetBucketListResponse& dest) { + auto* proto_info = dest.mutable_bucket_info(); + assert(src.getBuckets().size() <= INT_MAX); + proto_info->Reserve(static_cast<int>(src.getBuckets().size())); + for (const auto& info : src.getBuckets()) { + auto* entry = proto_info->Add(); + set_bucket_id(*entry->mutable_bucket_id(), info._bucket); + entry->set_info(info._bucketInformation.data(), info._bucketInformation.size()); + } + }, + [](const protobuf::GetBucketListResponse& src) { + auto reply = std::make_unique<GetBucketListReply>(); + reply->getBuckets().reserve(src.bucket_info_size()); + for (const auto& proto_info : src.bucket_info()) { + GetBucketListReply::BucketInfo info; + info._bucket = get_bucket_id(proto_info.bucket_id()); + info._bucketInformation = proto_info.info(); + reply->getBuckets().emplace_back(std::move(info)); + } + return reply; + } + ); +} + +// --------------------------------------------- +// GetBucketState request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_message_factory() { + return make_codec<GetBucketStateMessage, protobuf::GetBucketStateRequest>( + [](const GetBucketStateMessage& src, protobuf::GetBucketStateRequest& dest) { + // FIXME misses bucket space, but does not seem to be in use? + set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId()); + }, + [](const protobuf::GetBucketStateRequest& src) { + return std::make_unique<GetBucketStateMessage>(get_bucket_id(src.bucket_id())); + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_reply_factory() { + return make_codec<GetBucketStateReply, protobuf::GetBucketStateResponse>( + [](const GetBucketStateReply& src, protobuf::GetBucketStateResponse& dest) { + assert(src.getBucketState().size() <= INT_MAX); + auto* proto_states = dest.mutable_states(); + proto_states->Reserve(static_cast<int>(src.getBucketState().size())); + for (const auto& state : src.getBucketState()) { + auto* ps = proto_states->Add(); + if (state.getDocumentId()) { + set_document_id(*ps->mutable_document_id(), *state.getDocumentId()); + } else { + set_global_id(*ps->mutable_global_id(), state.getGlobalId()); + } + ps->set_timestamp(state.getTimestamp()); + ps->set_is_tombstone(state.isRemoveEntry()); + } + }, + [](const protobuf::GetBucketStateResponse& src) { + auto reply = std::make_unique<GetBucketStateReply>(); + reply->getBucketState().reserve(src.states_size()); + for (const auto& proto_state : src.states()) { + if (proto_state.has_document_id()) { + reply->getBucketState().emplace_back(get_document_id(proto_state.document_id()), + proto_state.timestamp(), proto_state.is_tombstone()); + } else { + reply->getBucketState().emplace_back(get_global_id(proto_state.global_id()), + proto_state.timestamp(), proto_state.is_tombstone()); + } + } + return reply; + } + ); +} + +// --------------------------------------------- +// StatBucket request and response +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_message_factory() { + return make_codec<StatBucketMessage, protobuf::StatBucketRequest>( + [](const StatBucketMessage& src, protobuf::StatBucketRequest& dest) { + set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId()); + set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection()); + set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace()); + }, + [](const protobuf::StatBucketRequest& src) { + auto msg = std::make_unique<StatBucketMessage>(); + msg->setBucketId(get_bucket_id(src.bucket_id())); + msg->setDocumentSelection(get_raw_selection(src.selection())); + msg->setBucketSpace(get_bucket_space(src.bucket_space())); + return msg; + } + ); +} + +std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_reply_factory() { + return make_codec<StatBucketReply, protobuf::StatBucketResponse>( + [](const StatBucketReply& src, protobuf::StatBucketResponse& dest) { + dest.set_results(src.getResults()); + }, + [](const protobuf::StatBucketResponse& src) { + auto reply = std::make_unique<StatBucketReply>(); + reply->setResults(src.results()); + return reply; + } + ); +} + +// --------------------------------------------- +// WrongDistribution response (no request type) +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::wrong_distribution_reply_factory() { + return make_codec<WrongDistributionReply, protobuf::WrongDistributionResponse>( + [](const WrongDistributionReply& src, protobuf::WrongDistributionResponse& dest) { + dest.mutable_cluster_state()->set_state_string(src.getSystemState()); + }, + [](const protobuf::WrongDistributionResponse& src) { + auto reply = std::make_unique<WrongDistributionReply>(); + reply->setSystemState(src.cluster_state().state_string()); + return reply; + } + ); +} + +// --------------------------------------------- +// DocumentIgnored response (no request type) +// --------------------------------------------- + +std::shared_ptr<IRoutableFactory> RoutableFactories80::document_ignored_reply_factory() { + return make_codec<DocumentIgnoredReply, protobuf::DocumentIgnoredResponse>( + []([[maybe_unused]] const DocumentIgnoredReply& src, [[maybe_unused]] protobuf::DocumentIgnoredResponse& dest) { + // no-op + }, + []([[maybe_unused]] const protobuf::DocumentIgnoredResponse& src) { + return std::make_unique<DocumentIgnoredReply>(); + } + ); +} + +} // documentapi::messagebus diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h new file mode 100644 index 00000000000..76da2f7dc9f --- /dev/null +++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h @@ -0,0 +1,72 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "iroutablefactory.h" + +namespace document { class DocumentTypeRepo; } + +namespace documentapi::messagebus { + +class RoutableFactories80 { +public: + RoutableFactories80() = delete; + + // CRUD messages + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_reply_factory(); + + // Visitor-related messages + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_reply_factory(); + + // Inspection-related messages + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_message_factory(); + [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_reply_factory(); + + // Polymorphic reply messages + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> wrong_distribution_reply_factory(); + + [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_ignored_reply_factory(); +}; + +} diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp index 54774f142ba..3e1ae07f7ca 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp @@ -14,11 +14,13 @@ RoutableRepository::VersionMap::VersionMap() : _factoryVersions() { } +RoutableRepository::VersionMap::~VersionMap() = default; + bool RoutableRepository::VersionMap::putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory) { bool ret = _factoryVersions.find(version) != _factoryVersions.end(); - _factoryVersions[version] = factory; + _factoryVersions[version] = std::move(factory); return ret; } @@ -28,14 +30,14 @@ RoutableRepository::VersionMap::getFactory(const vespalib::Version &version) con { const vespalib::VersionSpecification versionSpec{version.getMajor(), version.getMinor(), version.getMicro()}; - std::vector< std::pair<vespalib::VersionSpecification, IRoutableFactory::SP> > candidates; - for (auto & entry : _factoryVersions) { + std::vector<std::pair<vespalib::VersionSpecification, IRoutableFactory::SP>> candidates; + for (const auto & entry : _factoryVersions) { if (entry.first.compareTo(versionSpec) <= 0) { - candidates.push_back(std::make_pair(entry.first, entry.second)); + candidates.emplace_back(entry.first, entry.second); } } if (candidates.empty()) { - return IRoutableFactory::SP(); + return {}; } return std::max_element(candidates.begin(), candidates.end(), @@ -54,7 +56,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data) { if (data.size() == 0) { LOG(error, "Received empty byte array for deserialization."); - return mbus::Routable::UP(); + return {}; } document::ByteBuffer in(data.data(), data.size()); @@ -64,7 +66,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data) if (!factory) { LOG(error, "No routable factory found for routable type %d (version %s).", type, version.toString().c_str()); - return mbus::Routable::UP(); + return {}; } mbus::Routable::UP ret = factory->decode(in); if (!ret) { @@ -74,7 +76,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data) std::ostringstream ost; document::StringUtil::printAsHex(ost, data.data(), data.size()); LOG(error, "%s", ost.str().c_str()); - return mbus::Routable::UP(); + return {}; } return ret; } @@ -107,7 +109,7 @@ RoutableRepository::putFactory(const vespalib::VersionSpecification &version, uint32_t type, IRoutableFactory::SP factory) { std::lock_guard guard(_lock); - if (_factoryTypes[type].putFactory(version, factory)) { + if (_factoryTypes[type].putFactory(version, std::move(factory))) { _cache.clear(); } } @@ -117,17 +119,17 @@ RoutableRepository::getFactory(const vespalib::Version &version, uint32_t type) { std::lock_guard guard(_lock); CacheKey cacheKey(version, type); - FactoryCache::const_iterator cit = _cache.find(cacheKey); + auto cit = _cache.find(cacheKey); if (cit != _cache.end()) { return cit->second; } - TypeMap::const_iterator vit = _factoryTypes.find(type); + auto vit = _factoryTypes.find(type); if (vit == _factoryTypes.end()) { - return IRoutableFactory::SP(); + return {}; } - IRoutableFactory::SP factory = vit->second.getFactory(version); + auto factory = vit->second.getFactory(version); if (!factory) { - return IRoutableFactory::SP(); + return {}; } _cache[cacheKey] = factory; return factory; diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h index 5060d1b3817..62b5103d8f5 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h +++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h @@ -27,8 +27,9 @@ private: public: VersionMap(); - bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory); - IRoutableFactory::SP getFactory(const vespalib::Version &version) const; + ~VersionMap(); + [[nodiscard]] bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory); + [[nodiscard]] IRoutableFactory::SP getFactory(const vespalib::Version &version) const; }; using CacheKey = std::pair<vespalib::Version, uint32_t>; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index 0373609e806..3ab62542ace 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -2,7 +2,6 @@ package com.yahoo.messagebus.network.rpc; import com.yahoo.component.Version; -import com.yahoo.component.Vtag; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.jrt.Acceptor; import com.yahoo.jrt.ListenFailedException; @@ -301,15 +300,34 @@ public class RPCNetwork implements Network, MethodHandler { return false; } + private static Version deriveSupportedProtocolVersion() { + // This is a very leaky abstraction, but since MessageBus only exchanges versions + // (and not a set of supported protocols), we have to do this workaround. + // Disallow-version MUST be lower than that used as a protocol lower bound in + // DocumentProtocol.java and the exact same as that used in C++ for the same purposes. + // ... Or else! + // TODO remove this glorious hack once protobuf protocol is enabled by default + var maybeEnvVal = System.getenv("VESPA_MBUS_DOCUMENTAPI_USE_PROTOBUF"); + if ("true".equals(maybeEnvVal) || "yes".equals(maybeEnvVal)) { + return new Version(8, 304); // _Allows_ new protobuf protocol + } + return new Version(8, 303); // _Disallows_ new protobuf protocol + } + + private static final Version REPORTED_VERSION = deriveSupportedProtocolVersion(); + /** - * Returns the version of this network. This gets called when the "mbus.getVersion" method is invoked on this - * network, and is separated into its own function so that unit tests can override it to simulate other versions - * than current. + * Returns the (protocol) version of this network. This gets called when the "mbus.getVersion" method is invoked + * on this network, and is separated into its own function so that unit tests can override it to simulate other + * versions than current. + * + * Note that this version reflects the highest supported <em>protocol</em> version, and is not necessarily + * 1-1 with the actual Vespa release version of the underlying binary. * * @return the version to claim to be */ protected Version getVersion() { - return Vtag.currentVersion; + return REPORTED_VERSION; } /** diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index cacd18430a7..f626e2c325b 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -18,6 +18,8 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> +#include <cstdlib> +#include <string_view> #include <thread> #include <vespa/log/log.h> @@ -25,6 +27,7 @@ LOG_SETUP(".rpcnetwork"); using vespalib::make_string; using namespace std::chrono_literals; +using namespace std::string_view_literals; namespace mbus { @@ -148,10 +151,26 @@ RPCNetwork::flushTargetPool() _targetPool->flushTargets(true); } +namespace { + +[[nodiscard]] vespalib::Version derive_supported_protocol_version() { + // TODO remove this hilariously leaky abstraction once protobuf protocol is the default :D + // Disallow-version MUST be lower than that used as a protocol lower bound in documentprotocol.cpp + // and the exact same as that used in Java for the same purposes. Or else! + const char* maybe_env_val = getenv("VESPA_MBUS_DOCUMENTAPI_USE_PROTOBUF"); + if (maybe_env_val && (("true"sv == maybe_env_val) || ("yes"sv == maybe_env_val))) { + return {8, 304}; // _Allows_ new protobuf protocol + } + return {8, 303}; // _Disallows_ new protobuf protocol +} + +} + const vespalib::Version & RPCNetwork::getVersion() const { - return vespalib::Vtag::currentVersion; + static vespalib::Version reported_version = derive_supported_protocol_version(); + return reported_version; } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 05ccaecb2c5..40590d4545f 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -98,11 +98,15 @@ private: protected: /** - * Returns the version of this network. This gets called when the + * Returns the (protocol) version of this network. This gets called when the * "mbus.getVersion" method is invoked on this network, and is separated * into its own function so that unit tests can override it to simulate * other versions than current. * + * Note that this version reflects the highest supported protocol version, and + * is not necessarily 1-1 with the actual Vespa release version of the + * underlying binary. + * * @return The version to claim to be. */ virtual const vespalib::Version &getVersion() const; diff --git a/vdslib/src/vespa/vdslib/container/parameters.cpp b/vdslib/src/vespa/vdslib/container/parameters.cpp index 236b4970396..b5fbe96566d 100644 --- a/vdslib/src/vespa/vdslib/container/parameters.cpp +++ b/vdslib/src/vespa/vdslib/container/parameters.cpp @@ -19,6 +19,12 @@ Parameters::Parameters(document::ByteBuffer& buffer) deserialize(buffer); } +Parameters::Parameters(const Parameters&) = default; +Parameters& Parameters::operator=(const Parameters&) = default; + +Parameters::Parameters(Parameters&&) noexcept = default; +Parameters& Parameters::operator=(Parameters&&) noexcept = default; + Parameters::~Parameters() = default; size_t Parameters::getSerializedSize() const diff --git a/vdslib/src/vespa/vdslib/container/parameters.h b/vdslib/src/vespa/vdslib/container/parameters.h index d28e2cd9890..60ae3028719 100644 --- a/vdslib/src/vespa/vdslib/container/parameters.h +++ b/vdslib/src/vespa/vdslib/container/parameters.h @@ -46,6 +46,11 @@ public: explicit Parameters(document::ByteBuffer& buffer); ~Parameters() override; + Parameters(const Parameters&); + Parameters& operator=(const Parameters&); + Parameters(Parameters&&) noexcept; + Parameters& operator=(Parameters&&) noexcept; + bool operator==(const Parameters &other) const; size_t getSerializedSize() const; |