aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/vespa
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-02-01 15:25:45 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-02-16 13:42:49 +0000
commite0195ce27f47717ad5ba59ea59ab027de31d703f (patch)
tree44a13aa38fcbf95b23df91e8051c1d2b8bb2f688 /documentapi/src/vespa
parent42b1512d4913778dde06ebe0b1a08257ead3155a (diff)
Add new Protobuf-based MessageBus DocumentAPI protocol
This adds an entirely new implementation of the internal MessageBus DocumentAPI protocol, which shall be functionally 1-to-1 compatible with the existing legacy protocol. New protobuf schemas have been added to the top-level documentapi module, which are separated into different domains of responsibility: * CRUD messages * Visiting messages * Data inspection messages As well as a schema for shared, common message types. Both C++ and Java protocol implementations separate serialization and deserialization into a codec abstraction per message type, which hides the boilerplate required for Protobuf buffer management. The Java version is a tad more verbose due to generics type-erasure. This protocol does _not_ currently support lazy (de-)serialization in Java, as the existing mechanisms for doing so are inherently tied to the legacy protocol version. Performance tests will decide if we need to introduce such functionality to the new protocol version. To avoid having the new protocol go live in production, this commit changes the semantics of how MessageBus version reporting works (at least for the near future); instead of reporting the current Vespa _release_ version, it reports the highest supported _protocol_ version. This lets us conditionally enable the new protocol by reporting a MessageBus version greater than or equal to the protocol version _iff_ the protocol should be active. The new protocol is disabled by default. Other changes: * Protocol tests have been moved up one package directory level to be aligned with the actual package of the classes they test. This allows for using package-protected constructors in the serialization tests. * `DocumentDeserializer` now exposes the underlying document type repo/manager. This is done to detangle `Document`/`DocumentUpdate` deserialization from the underlying wire buffer management. * `RemoveLocationMessage` at long last contains a bucket space, which was forgotten when we initially added this concept to the other messages, and where the pain of adding it in later was too big (not so anymore!). Unit tests for both C++ and Java have been hoisted from the legacy test suite, cleaned up and extended with additional cases. The C++ tests use the old unit test kit and should receive a good follow-up washing and GTest-rewrite. **Important**: due to how MessageBus protocol versioning works, the final protocol version is _not_ yet decided, as setting it requires syncing against our build systems. A follow-up commit will assign the final version as well as include all required binary test files.
Diffstat (limited to 'documentapi/src/vespa')
-rw-r--r--documentapi/src/vespa/documentapi/CMakeLists.txt2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/.gitignore2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt23
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp134
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.h4
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h17
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h10
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp7
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h8
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/messages/visitor.h14
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp883
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h72
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp30
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routablerepository.h5
16 files changed, 1127 insertions, 88 deletions
diff --git a/documentapi/src/vespa/documentapi/CMakeLists.txt b/documentapi/src/vespa/documentapi/CMakeLists.txt
index a3a2815cc4f..1d0b3784a9d 100644
--- a/documentapi/src/vespa/documentapi/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/CMakeLists.txt
@@ -7,3 +7,5 @@ vespa_add_library(documentapi
INSTALL lib64
DEPENDS
)
+
+vespa_add_target_package_dependency(documentapi Protobuf)
diff --git a/documentapi/src/vespa/documentapi/messagebus/.gitignore b/documentapi/src/vespa/documentapi/messagebus/.gitignore
index d58390943e2..488f2e6355d 100644
--- a/documentapi/src/vespa/documentapi/messagebus/.gitignore
+++ b/documentapi/src/vespa/documentapi/messagebus/.gitignore
@@ -2,3 +2,5 @@
Makefile
config-*.cpp
config-*.h
+*.pb.cc
+*.pb.h
diff --git a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
index c3198ba7b2b..d59fd56037d 100644
--- a/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/CMakeLists.txt
@@ -1,12 +1,35 @@
# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+find_package(Protobuf REQUIRED)
+
+# .proto files are in a higher-level directory as they are shared across languages
+set(documentapi_messagebus_PROTOBUF_REL_PATH "../../../protobuf")
+PROTOBUF_GENERATE_CPP(documentapi_messagebus_PROTOBUF_SRCS documentapi_messagebus_PROTOBUF_HDRS
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_common.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_feed.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_inspect.proto"
+ "${documentapi_messagebus_PROTOBUF_REL_PATH}/docapi_visiting.proto")
+
+vespa_add_source_target(protobufgen_documentapi_messagebus DEPENDS
+ ${documentapi_messagebus_PROTOBUF_SRCS}
+ ${documentapi_messagebus_PROTOBUF_HDRS})
+
+vespa_suppress_warnings_for_protobuf_sources(SOURCES ${documentapi_messagebus_PROTOBUF_SRCS})
+
+# protoc explicitly annotates methods with inline, which triggers -Werror=inline when
+# the header file grows over a certain size.
+set_source_files_properties(routable_factories_8.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline")
+
vespa_add_library(documentapi_documentapimessagebus OBJECT
SOURCES
documentprotocol.cpp
replymerger.cpp
+ routable_factories_8.cpp
routablefactories60.cpp
routablerepository.cpp
routingpolicyfactories.cpp
routingpolicyrepository.cpp
+ ${documentapi_messagebus_PROTOBUF_SRCS}
DEPENDS
documentapi_documentapipolicies
)
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
index f16f63029ee..f53594c6d32 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
@@ -1,16 +1,17 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "replymerger.h"
+#include "routable_factories_8.h"
#include "routablefactories60.h"
-#include "routingpolicyfactories.h"
#include "routablerepository.h"
+#include "routingpolicyfactories.h"
#include "routingpolicyrepository.h"
-#include "replymerger.h"
#include <vespa/document/util/stringutil.h>
#include <vespa/documentapi/documentapi.h>
-#include <vespa/vespalib/util/exceptions.h>
#include <vespa/messagebus/error.h>
-#include <sstream>
+#include <vespa/vespalib/util/exceptions.h>
#include <cassert>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".documentprotocol");
@@ -40,47 +41,98 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo,
putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>());
putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>());
+ add_legacy_v6_factories();
+ add_v8_factories();
+}
+
+DocumentProtocol::~DocumentProtocol() = default;
+
+void
+DocumentProtocol::add_legacy_v6_factories()
+{
// Prepare version specifications to use when adding routable factories.
vespalib::VersionSpecification version6(6, 221);
-
std::vector<vespalib::VersionSpecification> from6 = { version6 };
// Add 6.x serialization
- putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6);
- putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6);
- putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6);
- putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6);
- putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6);
- putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6);
- putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6);
- putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6);
- putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6);
- putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6);
- putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6);
- putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6);
- putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6);
+ putRoutableFactory(MESSAGE_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketMessageFactory>(), from6);
+ putRoutableFactory(MESSAGE_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentMessageFactory>(*_repo), from6);
+ putRoutableFactory(MESSAGE_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoMessageFactory>(), from6);
+ putRoutableFactory(REPLY_CREATEVISITOR, std::make_shared<RoutableFactories60::CreateVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DESTROYVISITOR, std::make_shared<RoutableFactories60::DestroyVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DOCUMENTIGNORED, std::make_shared<RoutableFactories60::DocumentIgnoredReplyFactory>(), from6);
+ putRoutableFactory(REPLY_DOCUMENTLIST, std::make_shared<RoutableFactories60::DocumentListReplyFactory>(), from6);
+ putRoutableFactory(REPLY_EMPTYBUCKETS, std::make_shared<RoutableFactories60::EmptyBucketsReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETBUCKETLIST, std::make_shared<RoutableFactories60::GetBucketListReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETBUCKETSTATE, std::make_shared<RoutableFactories60::GetBucketStateReplyFactory>(), from6);
+ putRoutableFactory(REPLY_GETDOCUMENT, std::make_shared<RoutableFactories60::GetDocumentReplyFactory>(*_repo), from6);
+ putRoutableFactory(REPLY_MAPVISITOR, std::make_shared<RoutableFactories60::MapVisitorReplyFactory>(), from6);
+ putRoutableFactory(REPLY_PUTDOCUMENT, std::make_shared<RoutableFactories60::PutDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_QUERYRESULT, std::make_shared<RoutableFactories60::QueryResultReplyFactory>(), from6);
+ putRoutableFactory(REPLY_REMOVEDOCUMENT, std::make_shared<RoutableFactories60::RemoveDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_REMOVELOCATION, std::make_shared<RoutableFactories60::RemoveLocationReplyFactory>(), from6);
+ putRoutableFactory(REPLY_STATBUCKET, std::make_shared<RoutableFactories60::StatBucketReplyFactory>(), from6);
+ putRoutableFactory(REPLY_UPDATEDOCUMENT, std::make_shared<RoutableFactories60::UpdateDocumentReplyFactory>(), from6);
+ putRoutableFactory(REPLY_VISITORINFO, std::make_shared<RoutableFactories60::VisitorInfoReplyFactory>(), from6);
putRoutableFactory(REPLY_WRONGDISTRIBUTION, std::make_shared<RoutableFactories60::WrongDistributionReplyFactory>(), from6);
}
-DocumentProtocol::~DocumentProtocol() = default;
+void
+DocumentProtocol::add_v8_factories()
+{
+ vespalib::VersionSpecification version8(8, 304);
+ std::vector<vespalib::VersionSpecification> from8 = { version8 };
+
+ using RF8 = messagebus::RoutableFactories80;
+ auto put_v8_factory = [&](auto msg_id, auto factory) {
+ putRoutableFactory(msg_id, std::move(factory), from8);
+ };
+
+ put_v8_factory(MESSAGE_CREATEVISITOR, RF8::create_visitor_message_factory());
+ put_v8_factory(MESSAGE_DESTROYVISITOR, RF8::destroy_visitor_message_factory());
+ put_v8_factory(MESSAGE_DOCUMENTLIST, RF8::document_list_message_factory(_repo));
+ put_v8_factory(MESSAGE_EMPTYBUCKETS, RF8::empty_buckets_message_factory());
+ put_v8_factory(MESSAGE_GETBUCKETLIST, RF8::get_bucket_list_message_factory());
+ put_v8_factory(MESSAGE_GETBUCKETSTATE, RF8::get_bucket_state_message_factory());
+ put_v8_factory(MESSAGE_GETDOCUMENT, RF8::get_document_message_factory());
+ put_v8_factory(MESSAGE_MAPVISITOR, RF8::map_visitor_message_factory());
+ put_v8_factory(MESSAGE_PUTDOCUMENT, RF8::put_document_message_factory(_repo));
+ put_v8_factory(MESSAGE_QUERYRESULT, RF8::query_result_message_factory());
+ put_v8_factory(MESSAGE_REMOVEDOCUMENT, RF8::remove_document_message_factory());
+ put_v8_factory(MESSAGE_REMOVELOCATION, RF8::remove_location_message_factory(_repo));
+ put_v8_factory(MESSAGE_STATBUCKET, RF8::stat_bucket_message_factory());
+ put_v8_factory(MESSAGE_UPDATEDOCUMENT, RF8::update_document_message_factory(_repo));
+ put_v8_factory(MESSAGE_VISITORINFO, RF8::visitor_info_message_factory());
+ put_v8_factory(REPLY_CREATEVISITOR, RF8::create_visitor_reply_factory());
+ put_v8_factory(REPLY_DESTROYVISITOR, RF8::destroy_visitor_reply_factory());
+ put_v8_factory(REPLY_DOCUMENTIGNORED, RF8::document_ignored_reply_factory());
+ put_v8_factory(REPLY_DOCUMENTLIST, RF8::document_list_reply_factory());
+ put_v8_factory(REPLY_EMPTYBUCKETS, RF8::empty_buckets_reply_factory());
+ put_v8_factory(REPLY_GETBUCKETLIST, RF8::get_bucket_list_reply_factory());
+ put_v8_factory(REPLY_GETBUCKETSTATE, RF8::get_bucket_state_reply_factory());
+ put_v8_factory(REPLY_GETDOCUMENT, RF8::get_document_reply_factory(_repo));
+ put_v8_factory(REPLY_MAPVISITOR, RF8::map_visitor_reply_factory());
+ put_v8_factory(REPLY_PUTDOCUMENT, RF8::put_document_reply_factory());
+ put_v8_factory(REPLY_QUERYRESULT, RF8::query_result_reply_factory());
+ put_v8_factory(REPLY_REMOVEDOCUMENT, RF8::remove_document_reply_factory());
+ put_v8_factory(REPLY_REMOVELOCATION, RF8::remove_location_reply_factory());
+ put_v8_factory(REPLY_STATBUCKET, RF8::stat_bucket_reply_factory());
+ put_v8_factory(REPLY_UPDATEDOCUMENT, RF8::update_document_reply_factory());
+ put_v8_factory(REPLY_VISITORINFO, RF8::visitor_info_reply_factory());
+ put_v8_factory(REPLY_WRONGDISTRIBUTION, RF8::wrong_distribution_reply_factory());
+}
mbus::IRoutingPolicy::UP
DocumentProtocol::createPolicy(const mbus::string &name, const mbus::string &param) const
@@ -99,13 +151,9 @@ mbus::Blob
DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable &routable) const
{
mbus::Blob blob(_routableRepository->encode(version, routable));
- // When valgrind reports errors of uninitialized data being written to
- // the network, it is useful to be able to see the serialized data to
- // try to identify what bits are uninitialized.
if (LOG_WOULD_LOG(spam)) {
std::ostringstream message;
- document::StringUtil::printAsHex(
- message, blob.data(), blob.size());
+ document::StringUtil::printAsHex(message, blob.data(), blob.size());
LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s",
routable.getProtocol().c_str(), routable.getType(),
version.toString().c_str(), message.str().c_str());
@@ -120,7 +168,7 @@ DocumentProtocol::decode(const vespalib::Version &version, mbus::BlobRef data) c
return _routableRepository->decode(version, data);
} catch (vespalib::Exception &e) {
LOG(warning, "%s", e.getMessage().c_str());
- return mbus::Routable::UP();
+ return {};
}
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
index c771e86031d..b9658750ada 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
@@ -294,6 +294,10 @@ public:
mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string &param) const override;
mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override;
mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override;
+
+private:
+ void add_legacy_v6_factories();
+ void add_v8_factories();
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
index bd2f245ad15..12c6a50fa24 100644
--- a/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
+++ b/documentapi/src/vespa/documentapi/messagebus/iroutablefactory.h
@@ -22,18 +22,13 @@ class IRoutableFactory {
protected:
IRoutableFactory() = default;
public:
- /**
- * Convenience typedefs.
- */
using UP = std::unique_ptr<IRoutableFactory>;
using SP = std::shared_ptr<IRoutableFactory>;
IRoutableFactory(const IRoutableFactory &) = delete;
IRoutableFactory & operator = (const IRoutableFactory &) = delete;
- /**
- * Virtual destructor required for inheritance.
- */
- virtual ~IRoutableFactory() { }
+
+ virtual ~IRoutableFactory() = default;
/**
* This method encodes the content of the given routable into a byte buffer that can later be decoded
@@ -45,11 +40,11 @@ public:
* @param out The buffer to write into.
* @return True if the routable could be encoded.
*/
- virtual bool encode(const mbus::Routable &obj,
- vespalib::GrowableByteBuffer &out) const = 0;
+ [[nodiscard]] virtual bool encode(const mbus::Routable &obj,
+ vespalib::GrowableByteBuffer &out) const = 0;
/**
- * This method decodes the given byte bufer to a routable.
+ * This method decodes the given byte buffer to a routable.
*
* This method is NOT exception safe. Return null to signal failure.
*
@@ -57,7 +52,7 @@ public:
* @param loadTypes The set of configured load types.
* @return The decoded routable.
*/
- virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0;
+ [[nodiscard]] virtual mbus::Routable::UP decode(document::ByteBuffer &in) const = 0;
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
index 3d713c95da8..c47c2421fcb 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/documentmessage.h
@@ -18,7 +18,7 @@ protected:
*
* @return A document reply that corresponds to this message.
*/
- virtual DocumentReply::UP doCreateReply() const = 0;
+ [[nodiscard]] virtual DocumentReply::UP doCreateReply() const = 0;
public:
/**
@@ -37,16 +37,16 @@ public:
*
* @return The created reply.
*/
- std::unique_ptr<mbus::Reply> createReply() const;
+ [[nodiscard]] std::unique_ptr<mbus::Reply> createReply() const;
/**
* Returns the priority of this message.
*
* @return The priority.
*/
- Priority::Value getPriority() const { return _priority; };
+ [[nodiscard]] Priority::Value getPriority() const { return _priority; };
- uint8_t priority() const override { return (uint8_t)_priority; }
+ [[nodiscard]] uint8_t priority() const override { return (uint8_t)_priority; }
/**
* Sets the priority tag for this message.
@@ -55,7 +55,7 @@ public:
*/
void setPriority(Priority::Value p) { _priority = p; };
- uint32_t getApproxSize() const override;
+ [[nodiscard]] uint32_t getApproxSize() const override;
void setApproxSize(uint32_t approxSize) {
_approxSize = approxSize;
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
index 755e523c065..2c5833beb20 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.cpp
@@ -14,13 +14,12 @@ EmptyBucketsMessage::EmptyBucketsMessage(const std::vector<document::BucketId> &
{
}
-EmptyBucketsMessage::~EmptyBucketsMessage() {
-}
+EmptyBucketsMessage::~EmptyBucketsMessage() = default;
void
-EmptyBucketsMessage::setBucketIds(const std::vector<document::BucketId> &bucketIds)
+EmptyBucketsMessage::setBucketIds(std::vector<document::BucketId> bucketIds)
{
- _bucketIds = bucketIds;
+ _bucketIds = std::move(bucketIds);
}
void
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
index 7cecd1e1a2b..7078efc778c 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/emptybucketsmessage.h
@@ -27,7 +27,7 @@ public:
std::vector<document::BucketId> &getBucketIds() { return _bucketIds; }
const std::vector<document::BucketId> &getBucketIds() const { return _bucketIds; }
- void setBucketIds(const std::vector<document::BucketId> &bucketIds);
+ void setBucketIds(std::vector<document::BucketId> bucketIds);
void resize(uint32_t size);
uint32_t getType() const override;
string toString() const override { return "emptybucketsmessage"; }
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
index dbab28e3172..1ebc867156e 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/putdocumentmessage.h
@@ -30,8 +30,8 @@ public:
*
* @param document The document to put.
*/
- PutDocumentMessage(DocumentSP document);
- ~PutDocumentMessage();
+ explicit PutDocumentMessage(DocumentSP document);
+ ~PutDocumentMessage() override;
/**
* Returns the document to put.
@@ -39,7 +39,7 @@ public:
* @return The document.
*/
const DocumentSP & getDocumentSP() const { return _document; }
- DocumentSP stealDocument() { return std::move(_document); }
+ [[nodiscard]] DocumentSP stealDocument() { return std::move(_document); }
const document::Document & getDocument() const { return *_document; }
/**
@@ -68,7 +68,7 @@ public:
string toString() const override { return "putdocumentmessage"; }
void set_create_if_non_existent(bool value) noexcept { _create_if_non_existent = value; }
- bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; }
+ [[nodiscard]] bool get_create_if_non_existent() const noexcept { return _create_if_non_existent; }
};
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
index 87c3456d62c..4608350f8a6 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/removelocationmessage.h
@@ -10,7 +10,7 @@ namespace document { class BucketIdFactory; }
namespace documentapi {
/**
- * Message (VDS only) to remove an entire location for users using user or group schemes for their documents.
+ * Message to remove an entire location for users using user or group schemes for their documents.
* A location in this context is either a user id or a group name.
*/
class RemoveLocationMessage : public DocumentMessage {
diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
index cd0cb5e1d3a..da3219d999f 100644
--- a/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
+++ b/documentapi/src/vespa/documentapi/messagebus/messages/visitor.h
@@ -74,12 +74,16 @@ public:
const vdslib::Parameters& getParameters() const { return _params; }
vdslib::Parameters& getParameters() { return _params; }
void setParameters(const vdslib::Parameters& params) { _params = params; }
+ void setParameters(vdslib::Parameters&& params) noexcept { _params = std::move(params); }
uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; }
void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; }
const std::vector<document::BucketId>& getBuckets() const { return _buckets; }
std::vector<document::BucketId>& getBuckets() { return _buckets; }
+ void setBuckets(std::vector<document::BucketId> buckets) noexcept {
+ _buckets = std::move(buckets);
+ }
const document::BucketId getBucketId() const { return *_buckets.begin(); }
@@ -196,6 +200,9 @@ public:
std::vector<document::BucketId>& getFinishedBuckets() { return _finishedBuckets; }
const std::vector<document::BucketId>& getFinishedBuckets() const { return _finishedBuckets; }
+ void setFinishedBuckets(std::vector<document::BucketId> buckets) noexcept {
+ _finishedBuckets = std::move(buckets);
+ }
const string& getErrorMessage() const { return _errorMessage; }
void setErrorMessage(const string& errorMessage) { _errorMessage = errorMessage; };
@@ -224,6 +231,7 @@ public:
vdslib::Parameters& getData() { return _data; };
const vdslib::Parameters& getData() const { return _data; };
+ void setData(vdslib::Parameters&& data) noexcept { _data = std::move(data); }
uint32_t getApproxSize() const override;
uint32_t getType() const override;
@@ -246,9 +254,9 @@ public:
Entry(const Entry& other);
Entry(const document::DocumentTypeRepo &repo, document::ByteBuffer& buf);
- int64_t getTimestamp() { return _timestamp; }
- const DocumentSP & getDocument() { return _document; }
- bool isRemoveEntry() { return _removeEntry; }
+ int64_t getTimestamp() const noexcept { return _timestamp; }
+ const DocumentSP & getDocument() const noexcept { return _document; }
+ bool isRemoveEntry() const noexcept { return _removeEntry; }
void serialize(vespalib::GrowableByteBuffer& buf) const;
private:
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
new file mode 100644
index 00000000000..399801526ba
--- /dev/null
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.cpp
@@ -0,0 +1,883 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "routable_factories_8.h"
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/select/parser.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/util/serializableexceptions.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/documentapi/messagebus/docapi_common.pb.h>
+#include <vespa/documentapi/messagebus/docapi_feed.pb.h>
+#include <vespa/documentapi/messagebus/docapi_visiting.pb.h>
+#include <vespa/documentapi/messagebus/docapi_inspect.pb.h>
+#include <vespa/vespalib/objects/nbostream.h>
+
+namespace documentapi::messagebus {
+
+namespace {
+
+// Protobuf codec helpers for common types
+
+void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) {
+ dest.set_raw_id(src.getRawId());
+}
+
+document::BucketId get_bucket_id(const protobuf::BucketId& src) {
+ return document::BucketId(src.raw_id());
+}
+
+void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) {
+ auto doc_id = src.toString();
+ dest.set_id(doc_id.data(), doc_id.size());
+}
+
+document::DocumentId get_document_id(const protobuf::DocumentId& src) {
+ return document::DocumentId(src.id());
+}
+
+// TODO DocumentAPI should be extended to use actual document::FieldSet enums instead of always passing strings.
+void set_raw_field_set(protobuf::FieldSet& dest, vespalib::stringref src) {
+ dest.set_spec(src.data(), src.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_raw_field_set(const protobuf::FieldSet& src) noexcept {
+ return src.spec();
+}
+
+void set_raw_selection(protobuf::DocumentSelection& dest, vespalib::stringref src) {
+ dest.set_selection(src.data(), src.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_raw_selection(const protobuf::DocumentSelection& src) noexcept {
+ return src.selection();
+}
+
+void set_bucket_space(protobuf::BucketSpace& dest, vespalib::stringref space_name) {
+ dest.set_name(space_name.data(), space_name.size());
+}
+
+// Note: returns by ref
+vespalib::stringref get_bucket_space(const protobuf::BucketSpace& src) noexcept {
+ return src.name();
+}
+
+void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
+ char tmp[document::GlobalId::LENGTH];
+ memcpy(tmp, src.get(), document::GlobalId::LENGTH);
+ dest.set_raw_gid(tmp, document::GlobalId::LENGTH);
+}
+
+document::GlobalId get_global_id(const protobuf::GlobalId& src) {
+ if (src.raw_gid().size() != document::GlobalId::LENGTH) {
+ throw document::DeserializeException("Unexpected serialized protobuf GlobalId size");
+ }
+ return document::GlobalId(src.raw_gid().data()); // By copy
+}
+
+documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) {
+ return documentapi::TestAndSetCondition(src.selection());
+}
+
+void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) {
+ dest.set_selection(src.getSelection().data(), src.getSelection().size());
+}
+
+std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src_doc.payload().empty()) {
+ vespalib::nbostream doc_buf(src_doc.payload().data(), src_doc.payload().size());
+ return std::make_shared<document::Document>(type_repo, doc_buf);
+ }
+ return {};
+}
+
+std::shared_ptr<document::Document> get_document_or_throw(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ auto doc = get_document(src_doc, type_repo);
+ if (!doc) [[unlikely]] {
+ throw document::DeserializeException("Message does not contain a required document object", VESPA_STRLOC);
+ }
+ return doc;
+}
+
+void set_document(protobuf::Document& target_doc, const document::Document& src_doc) {
+ vespalib::nbostream stream;
+ src_doc.serialize(stream);
+ target_doc.set_payload(stream.peek(), stream.size());
+}
+
+void set_update(protobuf::DocumentUpdate& dest, const document::DocumentUpdate& src) {
+ vespalib::nbostream stream;
+ src.serializeHEAD(stream);
+ dest.set_payload(stream.peek(), stream.size());
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::DocumentUpdate& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src.payload().empty()) {
+ return document::DocumentUpdate::createHEAD(
+ type_repo, vespalib::nbostream(src.payload().data(), src.payload().size()));
+ }
+ return {};
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update_or_throw(const protobuf::DocumentUpdate& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ auto upd = get_update(src, type_repo);
+ if (!upd) [[unlikely]] {
+ throw document::DeserializeException("Message does not contain a required document update object", VESPA_STRLOC);
+ }
+ return upd;
+}
+
+template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn>
+requires std::is_invocable_r_v<void, EncodeFn, const DocApiType&, ProtobufType&> &&
+ std::is_invocable_r_v<std::unique_ptr<DocApiType>, DecodeFn, const ProtobufType&>
+class ProtobufRoutableFactory final : public IRoutableFactory {
+ EncodeFn _encode_fn;
+ DecodeFn _decode_fn;
+public:
+ template <typename EncFn, typename DecFn>
+ ProtobufRoutableFactory(EncFn&& enc_fn, DecFn&& dec_fn) noexcept
+ : _encode_fn(std::forward<EncFn>(enc_fn)),
+ _decode_fn(std::forward<DecFn>(dec_fn))
+ {}
+ ~ProtobufRoutableFactory() override = default;
+
+ bool encode(const mbus::Routable& obj, vespalib::GrowableByteBuffer& out) const override {
+ ::google::protobuf::Arena arena;
+ auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena);
+
+ _encode_fn(dynamic_cast<const DocApiType&>(obj), *proto_obj);
+
+ const auto sz = proto_obj->ByteSizeLong();
+ assert(sz <= INT32_MAX);
+ auto* buf = reinterpret_cast<uint8_t*>(out.allocate(sz));
+ return proto_obj->SerializeWithCachedSizesToArray(buf);
+ }
+
+ mbus::Routable::UP decode(document::ByteBuffer& in) const override {
+ ::google::protobuf::Arena arena;
+ auto* proto_obj = ::google::protobuf::Arena::Create<ProtobufType>(&arena);
+ const auto buf_size = in.getRemaining();
+ assert(buf_size <= INT_MAX);
+ bool ok = proto_obj->ParseFromArray(in.getBufferAtPos(), buf_size);
+ if (!ok) {
+ return {}; // Malformed protobuf payload
+ }
+ auto msg = _decode_fn(*proto_obj);
+ if constexpr (std::is_base_of_v<DocumentMessage, DocApiType>) {
+ msg->setApproxSize(buf_size); // Wire size is a proxy for in-memory size
+ }
+ return msg;
+ }
+};
+
+template <typename DocApiType, typename ProtobufType, typename EncodeFn, typename DecodeFn>
+auto make_codec(EncodeFn&& enc_fn, DecodeFn&& dec_fn) {
+ return std::make_shared<ProtobufRoutableFactory<DocApiType, ProtobufType, EncodeFn, DecodeFn>>(
+ std::forward<EncodeFn>(enc_fn), std::forward<DecodeFn>(dec_fn));
+}
+
+} // anon ns
+
+// ---------------------------------------------
+// Get request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_message_factory() {
+ return make_codec<GetDocumentMessage, protobuf::GetDocumentRequest>(
+ [](const GetDocumentMessage& src, protobuf::GetDocumentRequest& dest) {
+ set_document_id(*dest.mutable_document_id(), src.getDocumentId());
+ set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet());
+ },
+ [](const protobuf::GetDocumentRequest& src) {
+ return std::make_unique<GetDocumentMessage>(get_document_id(src.document_id()), get_raw_field_set(src.field_set()));
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<GetDocumentReply, protobuf::GetDocumentResponse>(
+ [](const GetDocumentReply& src, protobuf::GetDocumentResponse& dest) {
+ if (src.hasDocument()) {
+ set_document(*dest.mutable_document(), src.getDocument());
+ }
+ dest.set_last_modified(src.getLastModified());
+ },
+ [type_repo = std::move(repo)](const protobuf::GetDocumentResponse& src) {
+ auto msg = std::make_unique<GetDocumentReply>();
+ if (src.has_document()) {
+ auto doc = get_document(src.document(), *type_repo);
+ doc->setLastModified(static_cast<int64_t>(src.last_modified()));
+ msg->setDocument(std::move(doc));
+ }
+ msg->setLastModified(src.last_modified());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Put request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<PutDocumentMessage, protobuf::PutDocumentRequest>(
+ [](const PutDocumentMessage& src, protobuf::PutDocumentRequest& dest) {
+ dest.set_force_assign_timestamp(src.getTimestamp());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ if (src.getDocumentSP()) { // This should always be present in practice
+ set_document(*dest.mutable_document(), src.getDocument());
+ }
+ dest.set_create_if_missing(src.get_create_if_non_existent());
+ },
+ [type_repo = std::move(repo)](const protobuf::PutDocumentRequest& src) {
+ auto msg = std::make_unique<PutDocumentMessage>();
+ msg->setDocument(get_document_or_throw(src.document(), *type_repo));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ msg->setTimestamp(src.force_assign_timestamp());
+ msg->set_create_if_non_existent(src.create_if_missing());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::put_document_reply_factory() {
+ return make_codec<WriteDocumentReply, protobuf::PutDocumentResponse>(
+ [](const WriteDocumentReply& src, protobuf::PutDocumentResponse& dest) {
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::PutDocumentResponse& src) {
+ auto msg = std::make_unique<WriteDocumentReply>(DocumentProtocol::REPLY_PUTDOCUMENT);
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Update request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<UpdateDocumentMessage, protobuf::UpdateDocumentRequest>(
+ [](const UpdateDocumentMessage& src, protobuf::UpdateDocumentRequest& dest) {
+ set_update(*dest.mutable_update(), src.getDocumentUpdate());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ dest.set_expected_old_timestamp(src.getOldTimestamp());
+ dest.set_force_assign_timestamp(src.getNewTimestamp());
+ },
+ [type_repo = std::move(repo)](const protobuf::UpdateDocumentRequest& src) {
+ auto msg = std::make_unique<UpdateDocumentMessage>();
+ msg->setDocumentUpdate(get_update_or_throw(src.update(), *type_repo));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ msg->setOldTimestamp(src.expected_old_timestamp());
+ msg->setNewTimestamp(src.force_assign_timestamp());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::update_document_reply_factory() {
+ return make_codec<UpdateDocumentReply, protobuf::UpdateDocumentResponse>(
+ [](const UpdateDocumentReply& src, protobuf::UpdateDocumentResponse& dest) {
+ dest.set_was_found(src.wasFound());
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::UpdateDocumentResponse& src) {
+ auto msg = std::make_unique<UpdateDocumentReply>();
+ msg->setWasFound(src.was_found());
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// Remove request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_message_factory() {
+ return make_codec<RemoveDocumentMessage, protobuf::RemoveDocumentRequest>(
+ [](const RemoveDocumentMessage& src, protobuf::RemoveDocumentRequest& dest) {
+ set_document_id(*dest.mutable_document_id(), src.getDocumentId());
+ if (src.getCondition().isPresent()) {
+ set_tas_condition(*dest.mutable_condition(), src.getCondition());
+ }
+ },
+ [](const protobuf::RemoveDocumentRequest& src) {
+ auto msg = std::make_unique<RemoveDocumentMessage>();
+ msg->setDocumentId(get_document_id(src.document_id()));
+ if (src.has_condition()) {
+ msg->setCondition(get_tas_condition(src.condition()));
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_document_reply_factory() {
+ return make_codec<RemoveDocumentReply, protobuf::RemoveDocumentResponse>(
+ [](const RemoveDocumentReply& src, protobuf::RemoveDocumentResponse& dest) {
+ dest.set_was_found(src.wasFound());
+ dest.set_modification_timestamp(src.getHighestModificationTimestamp());
+ },
+ [](const protobuf::RemoveDocumentResponse& src) {
+ auto msg = std::make_unique<RemoveDocumentReply>();
+ msg->setWasFound(src.was_found());
+ msg->setHighestModificationTimestamp(src.modification_timestamp());
+ return msg;
+ }
+ );
+}
+
+// ---------------------------------------------
+// RemoveLocation request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory>
+RoutableFactories80::remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<RemoveLocationMessage, protobuf::RemoveLocationRequest>(
+ [](const RemoveLocationMessage& src, protobuf::RemoveLocationRequest& dest) {
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [type_repo = std::move(repo)](const protobuf::RemoveLocationRequest& src) {
+ document::BucketIdFactory factory;
+ document::select::Parser parser(*type_repo, factory);
+ auto msg = std::make_unique<RemoveLocationMessage>(factory, parser, get_raw_selection(src.selection()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::remove_location_reply_factory() {
+ return make_codec<DocumentReply, protobuf::RemoveLocationResponse>(
+ []([[maybe_unused]] const DocumentReply& src, [[maybe_unused]] protobuf::RemoveLocationResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::RemoveLocationResponse& src) {
+ // The lack of 1-1 type mapping is pretty awkward :I
+ return std::make_unique<DocumentReply>(DocumentProtocol::REPLY_REMOVELOCATION);
+ }
+ );
+}
+
+// ---------------------------------------------
+// CreateVisitor request and response
+// ---------------------------------------------
+
+namespace {
+
+void set_bucket_id_vector(::google::protobuf::RepeatedPtrField<protobuf::BucketId>& dest,
+ const std::vector<document::BucketId>& src)
+{
+ assert(src.size() <= INT_MAX);
+ dest.Reserve(static_cast<int>(src.size()));
+ for (const auto& bucket_id : src) {
+ set_bucket_id(*dest.Add(), bucket_id);
+ }
+}
+
+std::vector<document::BucketId> get_bucket_id_vector(const ::google::protobuf::RepeatedPtrField<protobuf::BucketId>& src) {
+ std::vector<document::BucketId> ids;
+ ids.reserve(src.size());
+ for (const auto& proto_bucket : src) {
+ ids.emplace_back(proto_bucket.raw_id());
+ }
+ return ids;
+}
+
+void set_visitor_params(::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& dest,
+ const vdslib::Parameters& src)
+{
+ assert(src.size() <= INT_MAX);
+ dest.Reserve(static_cast<int>(src.size()));
+ for (const auto& kv : src) {
+ auto* proto_kv = dest.Add();
+ proto_kv->set_key(kv.first.data(), kv.first.size());
+ proto_kv->set_value(kv.second.data(), kv.second.size());
+ }
+}
+
+vdslib::Parameters get_visitor_params(const ::google::protobuf::RepeatedPtrField<protobuf::VisitorParameter>& src) {
+ vdslib::Parameters params;
+ for (const auto& proto_kv : src) {
+ params.set(proto_kv.key(), proto_kv.value());
+ }
+ return params;
+}
+
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_message_factory() {
+ return make_codec<CreateVisitorMessage, protobuf::CreateVisitorRequest>(
+ [](const CreateVisitorMessage& src, protobuf::CreateVisitorRequest& dest) {
+ dest.set_visitor_library_name(src.getLibraryName().data(), src.getLibraryName().size());
+ dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size());
+ dest.set_control_destination(src.getControlDestination().data(), src.getControlDestination().size());
+ dest.set_data_destination(src.getDataDestination().data(), src.getDataDestination().size());
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ dest.set_max_pending_reply_count(src.getMaximumPendingReplyCount());
+
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ set_bucket_id_vector(*dest.mutable_buckets(), src.getBuckets());
+
+ dest.set_from_timestamp(src.getFromTimestamp());
+ dest.set_to_timestamp(src.getToTimestamp());
+ dest.set_visit_tombstones(src.visitRemoves());
+ set_raw_field_set(*dest.mutable_field_set(), src.getFieldSet());
+ dest.set_visit_inconsistent_buckets(src.visitInconsistentBuckets());
+ dest.set_max_buckets_per_visitor(src.getMaxBucketsPerVisitor());
+
+ set_visitor_params(*dest.mutable_parameters(), src.getParameters());
+ },
+ [](const protobuf::CreateVisitorRequest& src) {
+ auto msg = std::make_unique<CreateVisitorMessage>();
+ msg->setLibraryName(src.visitor_library_name());
+ msg->setInstanceId(src.instance_id());
+ msg->setControlDestination(src.control_destination());
+ msg->setDataDestination(src.data_destination());
+ msg->setDocumentSelection(get_raw_selection(src.selection()));
+ msg->setMaximumPendingReplyCount(src.max_pending_reply_count());
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ msg->setBuckets(get_bucket_id_vector(src.buckets()));
+ msg->setFromTimestamp(src.from_timestamp());
+ msg->setToTimestamp(src.to_timestamp());
+ msg->setVisitRemoves(src.visit_tombstones());
+ msg->setFieldSet(get_raw_field_set(src.field_set()));
+ msg->setVisitInconsistentBuckets(src.visit_inconsistent_buckets());
+ msg->setMaxBucketsPerVisitor(src.max_buckets_per_visitor());
+ msg->setVisitorDispatcherVersion(50); // Hard-coded; same as for v6 serialization
+ msg->setParameters(get_visitor_params(src.parameters()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::create_visitor_reply_factory() {
+ return make_codec<CreateVisitorReply, protobuf::CreateVisitorResponse>(
+ [](const CreateVisitorReply& src, protobuf::CreateVisitorResponse& dest) {
+ set_bucket_id(*dest.mutable_last_bucket(), src.getLastBucket());
+ const auto& vs = src.getVisitorStatistics();
+ auto* stats = dest.mutable_statistics();
+ stats->set_buckets_visited(vs.getBucketsVisited());
+ stats->set_documents_visited(vs.getDocumentsVisited());
+ stats->set_bytes_visited(vs.getBytesVisited());
+ stats->set_documents_returned(vs.getDocumentsReturned());
+ stats->set_bytes_returned(vs.getBytesReturned());
+ },
+ [](const protobuf::CreateVisitorResponse& src) {
+ auto reply = std::make_unique<CreateVisitorReply>(DocumentProtocol::REPLY_CREATEVISITOR);
+ reply->setLastBucket(get_bucket_id(src.last_bucket()));
+ const auto& vs = src.statistics();
+ vdslib::VisitorStatistics stats;
+ stats.setBucketsVisited(vs.buckets_visited());
+ stats.setDocumentsVisited(vs.documents_visited());
+ stats.setBytesVisited(vs.bytes_visited());
+ stats.setDocumentsReturned(vs.documents_returned());
+ stats.setBytesReturned(vs.bytes_returned());
+ reply->setVisitorStatistics(stats);
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// DestroyVisitor request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_message_factory() {
+ return make_codec<DestroyVisitorMessage, protobuf::DestroyVisitorRequest>(
+ [](const DestroyVisitorMessage& src, protobuf::DestroyVisitorRequest& dest) {
+ dest.set_instance_id(src.getInstanceId().data(), src.getInstanceId().size());
+ },
+ [](const protobuf::DestroyVisitorRequest& src) {
+ auto msg = std::make_unique<DestroyVisitorMessage>();
+ msg->setInstanceId(src.instance_id());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::destroy_visitor_reply_factory() {
+ return make_codec<VisitorReply, protobuf::DestroyVisitorResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DestroyVisitorResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DestroyVisitorResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DESTROYVISITOR);
+ }
+ );
+}
+
+// ---------------------------------------------
+// MapVisitor request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_message_factory() {
+ return make_codec<MapVisitorMessage, protobuf::MapVisitorRequest>(
+ [](const MapVisitorMessage& src, protobuf::MapVisitorRequest& dest) {
+ set_visitor_params(*dest.mutable_data(), src.getData());
+ },
+ [](const protobuf::MapVisitorRequest& src) {
+ auto msg = std::make_unique<MapVisitorMessage>();
+ msg->setData(get_visitor_params(src.data()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::map_visitor_reply_factory() {
+ return make_codec<VisitorReply, protobuf::MapVisitorResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::MapVisitorResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::MapVisitorResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_MAPVISITOR);
+ }
+ );
+}
+
+// ---------------------------------------------
+// QueryResult request and response
+// ---------------------------------------------
+
+namespace {
+
+void set_search_result(protobuf::SearchResult& dest, const vdslib::SearchResult& src) {
+ // We treat these as opaque blobs for now. Should ideally be protobuf as well.
+ vespalib::GrowableByteBuffer buf;
+ src.serialize(buf);
+ assert(buf.position() <= INT_MAX);
+ dest.set_payload(buf.getBuffer(), buf.position());
+}
+
+void set_document_summary(protobuf::DocumentSummary& dest, const vdslib::DocumentSummary& src) {
+ // We treat these as opaque blobs for now. Should ideally be protobuf as well.
+ vespalib::GrowableByteBuffer buf;
+ src.serialize(buf);
+ assert(buf.position() <= INT_MAX);
+ dest.set_payload(buf.getBuffer(), buf.position());
+}
+
+document::ByteBuffer wrap_as_buffer(std::string_view buf) {
+ assert(buf.size() <= UINT32_MAX);
+ return {buf.data(), static_cast<uint32_t>(buf.size())};
+}
+
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_message_factory() {
+ return make_codec<QueryResultMessage, protobuf::QueryResultRequest>(
+ [](const QueryResultMessage& src, protobuf::QueryResultRequest& dest) {
+ set_search_result(*dest.mutable_search_result(), src.getSearchResult());
+ set_document_summary(*dest.mutable_document_summary(), src.getDocumentSummary());
+ },
+ [](const protobuf::QueryResultRequest& src) {
+ auto msg = std::make_unique<QueryResultMessage>();
+ // Explicitly enforce presence of result/summary fields, as our object is not necessarily
+ // well-defined if these have not been initialized.
+ if (!src.has_search_result() || !src.has_document_summary()) {
+ throw document::DeserializeException("Query result does not have all required fields set", VESPA_STRLOC);
+ }
+ {
+ auto buf_view = wrap_as_buffer(src.search_result().payload()); // Must be lvalue
+ msg->getSearchResult().deserialize(buf_view);
+ }
+ {
+ auto buf_view = wrap_as_buffer(src.document_summary().payload()); // Also lvalue
+ msg->getDocumentSummary().deserialize(buf_view);
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::query_result_reply_factory() {
+ return make_codec<VisitorReply, protobuf::QueryResultResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::QueryResultResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::QueryResultResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_QUERYRESULT);
+ }
+ );
+}
+
+// ---------------------------------------------
+// VisitorInfo request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_message_factory() {
+ return make_codec<VisitorInfoMessage, protobuf::VisitorInfoRequest>(
+ [](const VisitorInfoMessage& src, protobuf::VisitorInfoRequest& dest) {
+ set_bucket_id_vector(*dest.mutable_finished_buckets(), src.getFinishedBuckets());
+ dest.set_error_message(src.getErrorMessage());
+ },
+ [](const protobuf::VisitorInfoRequest& src) {
+ auto msg = std::make_unique<VisitorInfoMessage>();
+ msg->setFinishedBuckets(get_bucket_id_vector(src.finished_buckets()));
+ msg->setErrorMessage(src.error_message());
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::visitor_info_reply_factory() {
+ return make_codec<VisitorReply, protobuf::VisitorInfoResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::VisitorInfoResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::VisitorInfoResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_VISITORINFO);
+ }
+ );
+}
+
+// ---------------------------------------------
+// DocumentList request and response
+// TODO deprecate
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory>
+RoutableFactories80::document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo) {
+ return make_codec<DocumentListMessage, protobuf::DocumentListRequest>(
+ [](const DocumentListMessage& src, protobuf::DocumentListRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ for (const auto& doc : src.getDocuments()) {
+ auto* proto_entry = dest.add_entries();
+ proto_entry->set_timestamp(doc.getTimestamp());
+ proto_entry->set_is_tombstone(doc.isRemoveEntry());
+ set_document(*proto_entry->mutable_document(), *doc.getDocument());
+ }
+ },
+ [type_repo = std::move(repo)](const protobuf::DocumentListRequest& src) {
+ auto msg = std::make_unique<DocumentListMessage>();
+ msg->setBucketId(get_bucket_id(src.bucket_id()));
+ for (const auto& proto_entry : src.entries()) {
+ auto doc = get_document_or_throw(proto_entry.document(), *type_repo);
+ msg->getDocuments().emplace_back(proto_entry.timestamp(), std::move(doc), proto_entry.is_tombstone());
+ }
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::document_list_reply_factory() {
+ return make_codec<VisitorReply, protobuf::DocumentListResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::DocumentListResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DocumentListResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_DOCUMENTLIST);
+ }
+ );
+}
+
+// ---------------------------------------------
+// EmptyBuckets request and response
+// TODO this should be deprecated
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_message_factory() {
+ return make_codec<EmptyBucketsMessage, protobuf::EmptyBucketsRequest>(
+ [](const EmptyBucketsMessage& src, protobuf::EmptyBucketsRequest& dest) {
+ set_bucket_id_vector(*dest.mutable_bucket_ids(), src.getBucketIds());
+ },
+ [](const protobuf::EmptyBucketsRequest& src) {
+ auto msg = std::make_unique<EmptyBucketsMessage>();
+ msg->setBucketIds(get_bucket_id_vector(src.bucket_ids()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::empty_buckets_reply_factory() {
+ return make_codec<VisitorReply, protobuf::EmptyBucketsResponse>(
+ []([[maybe_unused]] const VisitorReply& src, [[maybe_unused]] protobuf::EmptyBucketsResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::EmptyBucketsResponse& src) {
+ return std::make_unique<VisitorReply>(DocumentProtocol::REPLY_EMPTYBUCKETS); // ugh
+ }
+ );
+}
+
+// ---------------------------------------------
+// GetBucketList request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_message_factory() {
+ return make_codec<GetBucketListMessage, protobuf::GetBucketListRequest>(
+ [](const GetBucketListMessage& src, protobuf::GetBucketListRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [](const protobuf::GetBucketListRequest& src) {
+ auto msg = std::make_unique<GetBucketListMessage>(get_bucket_id(src.bucket_id()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_list_reply_factory() {
+ return make_codec<GetBucketListReply, protobuf::GetBucketListResponse>(
+ [](const GetBucketListReply& src, protobuf::GetBucketListResponse& dest) {
+ auto* proto_info = dest.mutable_bucket_info();
+ assert(src.getBuckets().size() <= INT_MAX);
+ proto_info->Reserve(static_cast<int>(src.getBuckets().size()));
+ for (const auto& info : src.getBuckets()) {
+ auto* entry = proto_info->Add();
+ set_bucket_id(*entry->mutable_bucket_id(), info._bucket);
+ entry->set_info(info._bucketInformation.data(), info._bucketInformation.size());
+ }
+ },
+ [](const protobuf::GetBucketListResponse& src) {
+ auto reply = std::make_unique<GetBucketListReply>();
+ reply->getBuckets().reserve(src.bucket_info_size());
+ for (const auto& proto_info : src.bucket_info()) {
+ GetBucketListReply::BucketInfo info;
+ info._bucket = get_bucket_id(proto_info.bucket_id());
+ info._bucketInformation = proto_info.info();
+ reply->getBuckets().emplace_back(std::move(info));
+ }
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// GetBucketState request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_message_factory() {
+ return make_codec<GetBucketStateMessage, protobuf::GetBucketStateRequest>(
+ [](const GetBucketStateMessage& src, protobuf::GetBucketStateRequest& dest) {
+ // FIXME misses bucket space, but does not seem to be in use?
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ },
+ [](const protobuf::GetBucketStateRequest& src) {
+ return std::make_unique<GetBucketStateMessage>(get_bucket_id(src.bucket_id()));
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::get_bucket_state_reply_factory() {
+ return make_codec<GetBucketStateReply, protobuf::GetBucketStateResponse>(
+ [](const GetBucketStateReply& src, protobuf::GetBucketStateResponse& dest) {
+ assert(src.getBucketState().size() <= INT_MAX);
+ auto* proto_states = dest.mutable_states();
+ proto_states->Reserve(static_cast<int>(src.getBucketState().size()));
+ for (const auto& state : src.getBucketState()) {
+ auto* ps = proto_states->Add();
+ if (state.getDocumentId()) {
+ set_document_id(*ps->mutable_document_id(), *state.getDocumentId());
+ } else {
+ set_global_id(*ps->mutable_global_id(), state.getGlobalId());
+ }
+ ps->set_timestamp(state.getTimestamp());
+ ps->set_is_tombstone(state.isRemoveEntry());
+ }
+ },
+ [](const protobuf::GetBucketStateResponse& src) {
+ auto reply = std::make_unique<GetBucketStateReply>();
+ reply->getBucketState().reserve(src.states_size());
+ for (const auto& proto_state : src.states()) {
+ if (proto_state.has_document_id()) {
+ reply->getBucketState().emplace_back(get_document_id(proto_state.document_id()),
+ proto_state.timestamp(), proto_state.is_tombstone());
+ } else {
+ reply->getBucketState().emplace_back(get_global_id(proto_state.global_id()),
+ proto_state.timestamp(), proto_state.is_tombstone());
+ }
+ }
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// StatBucket request and response
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_message_factory() {
+ return make_codec<StatBucketMessage, protobuf::StatBucketRequest>(
+ [](const StatBucketMessage& src, protobuf::StatBucketRequest& dest) {
+ set_bucket_id(*dest.mutable_bucket_id(), src.getBucketId());
+ set_raw_selection(*dest.mutable_selection(), src.getDocumentSelection());
+ set_bucket_space(*dest.mutable_bucket_space(), src.getBucketSpace());
+ },
+ [](const protobuf::StatBucketRequest& src) {
+ auto msg = std::make_unique<StatBucketMessage>();
+ msg->setBucketId(get_bucket_id(src.bucket_id()));
+ msg->setDocumentSelection(get_raw_selection(src.selection()));
+ msg->setBucketSpace(get_bucket_space(src.bucket_space()));
+ return msg;
+ }
+ );
+}
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::stat_bucket_reply_factory() {
+ return make_codec<StatBucketReply, protobuf::StatBucketResponse>(
+ [](const StatBucketReply& src, protobuf::StatBucketResponse& dest) {
+ dest.set_results(src.getResults());
+ },
+ [](const protobuf::StatBucketResponse& src) {
+ auto reply = std::make_unique<StatBucketReply>();
+ reply->setResults(src.results());
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// WrongDistribution response (no request type)
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::wrong_distribution_reply_factory() {
+ return make_codec<WrongDistributionReply, protobuf::WrongDistributionResponse>(
+ [](const WrongDistributionReply& src, protobuf::WrongDistributionResponse& dest) {
+ dest.mutable_cluster_state()->set_state_string(src.getSystemState());
+ },
+ [](const protobuf::WrongDistributionResponse& src) {
+ auto reply = std::make_unique<WrongDistributionReply>();
+ reply->setSystemState(src.cluster_state().state_string());
+ return reply;
+ }
+ );
+}
+
+// ---------------------------------------------
+// DocumentIgnored response (no request type)
+// ---------------------------------------------
+
+std::shared_ptr<IRoutableFactory> RoutableFactories80::document_ignored_reply_factory() {
+ return make_codec<DocumentIgnoredReply, protobuf::DocumentIgnoredResponse>(
+ []([[maybe_unused]] const DocumentIgnoredReply& src, [[maybe_unused]] protobuf::DocumentIgnoredResponse& dest) {
+ // no-op
+ },
+ []([[maybe_unused]] const protobuf::DocumentIgnoredResponse& src) {
+ return std::make_unique<DocumentIgnoredReply>();
+ }
+ );
+}
+
+} // documentapi::messagebus
diff --git a/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
new file mode 100644
index 00000000000..76da2f7dc9f
--- /dev/null
+++ b/documentapi/src/vespa/documentapi/messagebus/routable_factories_8.h
@@ -0,0 +1,72 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "iroutablefactory.h"
+
+namespace document { class DocumentTypeRepo; }
+
+namespace documentapi::messagebus {
+
+class RoutableFactories80 {
+public:
+ RoutableFactories80() = delete;
+
+ // CRUD messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> put_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_document_reply_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> update_document_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> remove_location_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_message_factory(std::shared_ptr<const document::DocumentTypeRepo> repo);
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_list_reply_factory();
+
+ // Visitor-related messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> create_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> destroy_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> empty_buckets_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> map_visitor_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> query_result_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> visitor_info_reply_factory();
+
+ // Inspection-related messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_list_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> get_bucket_state_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_message_factory();
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> stat_bucket_reply_factory();
+
+ // Polymorphic reply messages
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> wrong_distribution_reply_factory();
+
+ [[nodiscard]] static std::shared_ptr<IRoutableFactory> document_ignored_reply_factory();
+};
+
+}
diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
index 54774f142ba..3e1ae07f7ca 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.cpp
@@ -14,11 +14,13 @@ RoutableRepository::VersionMap::VersionMap() :
_factoryVersions()
{ }
+RoutableRepository::VersionMap::~VersionMap() = default;
+
bool
RoutableRepository::VersionMap::putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory)
{
bool ret = _factoryVersions.find(version) != _factoryVersions.end();
- _factoryVersions[version] = factory;
+ _factoryVersions[version] = std::move(factory);
return ret;
}
@@ -28,14 +30,14 @@ RoutableRepository::VersionMap::getFactory(const vespalib::Version &version) con
{
const vespalib::VersionSpecification versionSpec{version.getMajor(), version.getMinor(), version.getMicro()};
- std::vector< std::pair<vespalib::VersionSpecification, IRoutableFactory::SP> > candidates;
- for (auto & entry : _factoryVersions) {
+ std::vector<std::pair<vespalib::VersionSpecification, IRoutableFactory::SP>> candidates;
+ for (const auto & entry : _factoryVersions) {
if (entry.first.compareTo(versionSpec) <= 0) {
- candidates.push_back(std::make_pair(entry.first, entry.second));
+ candidates.emplace_back(entry.first, entry.second);
}
}
if (candidates.empty()) {
- return IRoutableFactory::SP();
+ return {};
}
return std::max_element(candidates.begin(), candidates.end(),
@@ -54,7 +56,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
{
if (data.size() == 0) {
LOG(error, "Received empty byte array for deserialization.");
- return mbus::Routable::UP();
+ return {};
}
document::ByteBuffer in(data.data(), data.size());
@@ -64,7 +66,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
if (!factory) {
LOG(error, "No routable factory found for routable type %d (version %s).",
type, version.toString().c_str());
- return mbus::Routable::UP();
+ return {};
}
mbus::Routable::UP ret = factory->decode(in);
if (!ret) {
@@ -74,7 +76,7 @@ RoutableRepository::decode(const vespalib::Version &version, mbus::BlobRef data)
std::ostringstream ost;
document::StringUtil::printAsHex(ost, data.data(), data.size());
LOG(error, "%s", ost.str().c_str());
- return mbus::Routable::UP();
+ return {};
}
return ret;
}
@@ -107,7 +109,7 @@ RoutableRepository::putFactory(const vespalib::VersionSpecification &version,
uint32_t type, IRoutableFactory::SP factory)
{
std::lock_guard guard(_lock);
- if (_factoryTypes[type].putFactory(version, factory)) {
+ if (_factoryTypes[type].putFactory(version, std::move(factory))) {
_cache.clear();
}
}
@@ -117,17 +119,17 @@ RoutableRepository::getFactory(const vespalib::Version &version, uint32_t type)
{
std::lock_guard guard(_lock);
CacheKey cacheKey(version, type);
- FactoryCache::const_iterator cit = _cache.find(cacheKey);
+ auto cit = _cache.find(cacheKey);
if (cit != _cache.end()) {
return cit->second;
}
- TypeMap::const_iterator vit = _factoryTypes.find(type);
+ auto vit = _factoryTypes.find(type);
if (vit == _factoryTypes.end()) {
- return IRoutableFactory::SP();
+ return {};
}
- IRoutableFactory::SP factory = vit->second.getFactory(version);
+ auto factory = vit->second.getFactory(version);
if (!factory) {
- return IRoutableFactory::SP();
+ return {};
}
_cache[cacheKey] = factory;
return factory;
diff --git a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
index 5060d1b3817..62b5103d8f5 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routablerepository.h
@@ -27,8 +27,9 @@ private:
public:
VersionMap();
- bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory);
- IRoutableFactory::SP getFactory(const vespalib::Version &version) const;
+ ~VersionMap();
+ [[nodiscard]] bool putFactory(const vespalib::VersionSpecification &version, IRoutableFactory::SP factory);
+ [[nodiscard]] IRoutableFactory::SP getFactory(const vespalib::Version &version) const;
};
using CacheKey = std::pair<vespalib::Version, uint32_t>;