summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-01 13:35:19 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-05 11:27:32 +0000
commit8d48dfa4f6f13aff4dcc81217d0ddba5fda6c4bc (patch)
tree3dcd13ae1e339d801bd5c3ec1559d0c90b878590 /storageapi
parentfe0cf4f343fc150055ba4c5c64bf9faf8fb29526 (diff)
Add start of protobuf serialization protocol implementation
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/tests/CMakeLists.txt1
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp17
-rw-r--r--storageapi/src/vespa/storageapi/CMakeLists.txt4
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/.gitignore3
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt10
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto158
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h3
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp549
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h60
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp12
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h2
11 files changed, 814 insertions, 5 deletions
diff --git a/storageapi/src/tests/CMakeLists.txt b/storageapi/src/tests/CMakeLists.txt
index eeb135e9f7a..ddc43c70004 100644
--- a/storageapi/src/tests/CMakeLists.txt
+++ b/storageapi/src/tests/CMakeLists.txt
@@ -24,6 +24,7 @@ vespa_add_executable(storageapi_testrunner_app TEST
DEPENDS
storageapi_testmessageapi
storageapi
+ vdstestlib
)
vespa_add_test(
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index c03fb0bbbf4..a0018578238 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -46,6 +46,7 @@ struct StorageProtocolTest : TestWithParam<vespalib::Version> {
vespalib::Version _version5_1{5, 1, 0};
vespalib::Version _version5_2{5, 93, 30};
vespalib::Version _version6_0{6, 240, 0};
+ vespalib::Version _version7_0{7, 0, 0}; // FIXME
documentapi::LoadTypeSet _loadTypes;
mbusprot::StorageProtocol _protocol;
static std::vector<std::string> _nonVerboseMessageStrings;
@@ -89,7 +90,8 @@ std::string version_as_gtest_string(TestParamInfo<vespalib::Version> info) {
// TODO replace with INSTANTIATE_TEST_SUITE_P on newer gtest versions
INSTANTIATE_TEST_CASE_P(MultiVersionTest, StorageProtocolTest,
- Values(vespalib::Version(6, 240, 0)),
+ Values(vespalib::Version(6, 240, 0),
+ vespalib::Version(7, 0, 0)), // TODO proper 7 version
version_as_gtest_string);
std::vector<std::string> StorageProtocolTest::_nonVerboseMessageStrings;
@@ -196,11 +198,13 @@ TEST_P(StorageProtocolTest, testPut) {
auto reply = std::make_shared<PutReply>(*cmd2);
ASSERT_TRUE(reply->hasDocument());
EXPECT_EQ(*_testDoc, *reply->getDocument());
+ reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48));
auto reply2 = copyReply(reply);
ASSERT_TRUE(reply2->hasDocument());
EXPECT_EQ(*_testDoc, *reply->getDocument());
EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId());
EXPECT_EQ(Timestamp(14), reply2->getTimestamp());
+ EXPECT_EQ(BucketInfo(1,2,3,4,5, true, false, 48), reply2->getBucketInfo());
recordOutput(*cmd2);
recordOutput(*reply2);
@@ -228,10 +232,12 @@ TEST_P(StorageProtocolTest, testUpdate) {
EXPECT_EQ(*update, *cmd2->getUpdate());
auto reply = std::make_shared<UpdateReply>(*cmd2, 8);
+ reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48));
auto reply2 = copyReply(reply);
EXPECT_EQ(_testDocId, reply2->getDocumentId());
EXPECT_EQ(Timestamp(14), reply2->getTimestamp());
EXPECT_EQ(Timestamp(8), reply->getOldTimestamp());
+ EXPECT_EQ(BucketInfo(1,2,3,4,5, true, false, 48), reply2->getBucketInfo());
recordOutput(*cmd2);
recordOutput(*reply2);
@@ -246,6 +252,7 @@ TEST_P(StorageProtocolTest, testGet) {
EXPECT_EQ(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet());
auto reply = std::make_shared<GetReply>(*cmd2, _testDoc, 100);
+ reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48));
auto reply2 = copyReply(reply);
ASSERT_TRUE(reply2.get() != nullptr);
ASSERT_TRUE(reply2->getDocument().get() != nullptr);
@@ -253,6 +260,7 @@ TEST_P(StorageProtocolTest, testGet) {
EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId());
EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp());
EXPECT_EQ(Timestamp(100), reply2->getLastModifiedTimestamp());
+ EXPECT_EQ(BucketInfo(1,2,3,4,5, true, false, 48), reply2->getBucketInfo());
recordOutput(*cmd2);
recordOutput(*reply2);
@@ -742,7 +750,12 @@ TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storag
EXPECT_EQ(50u, cmd->getApproxByteSize());
auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(181u, cmd2->getApproxByteSize());
+ auto version = GetParam();
+ if (version.getMajor() == 7) { // Protobuf encoding
+ EXPECT_EQ(158u, cmd2->getApproxByteSize());
+ } else { // Legacy encoding
+ EXPECT_EQ(181u, cmd2->getApproxByteSize());
+ }
}
} // storage::api
diff --git a/storageapi/src/vespa/storageapi/CMakeLists.txt b/storageapi/src/vespa/storageapi/CMakeLists.txt
index c08dcbc2419..90eb6dd9eca 100644
--- a/storageapi/src/vespa/storageapi/CMakeLists.txt
+++ b/storageapi/src/vespa/storageapi/CMakeLists.txt
@@ -1,4 +1,5 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
vespa_add_library(storageapi
SOURCES
$<TARGET_OBJECTS:storageapi_message>
@@ -8,3 +9,6 @@ vespa_add_library(storageapi
INSTALL lib64
DEPENDS
)
+
+vespa_add_target_package_dependency(storageapi Protobuf)
+
diff --git a/storageapi/src/vespa/storageapi/mbusprot/.gitignore b/storageapi/src/vespa/storageapi/mbusprot/.gitignore
index 526f91c6668..8e91fe9cab0 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/.gitignore
+++ b/storageapi/src/vespa/storageapi/mbusprot/.gitignore
@@ -5,3 +5,6 @@
.deps
.libs
Makefile
+*.pb.h
+*.pb.cc
+
diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
index d5952d7cb91..b82bf8381e4 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
+++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
@@ -1,4 +1,12 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+find_package(Protobuf REQUIRED)
+PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS protobuf/storageapi.proto)
+
+# protoc-generated files emit compiler warnings that we normally treat as errors.
+# Instead of rolling our own compiler plugin we'll pragmatically disable the noise.
+set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override")
+
vespa_add_library(storageapi_mbusprot OBJECT
SOURCES
storagemessage.cpp
@@ -11,5 +19,7 @@ vespa_add_library(storageapi_mbusprot OBJECT
protocolserialization5_1.cpp
protocolserialization5_2.cpp
protocolserialization6_0.cpp
+ protocolserialization7.cpp
+ ${storageapi_PROTOBUF_SRCS}
DEPENDS
)
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
new file mode 100644
index 00000000000..bac38f107b4
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
@@ -0,0 +1,158 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+option cc_enable_arenas = true;
+
+package storage.mbusprot.protobuf;
+
+// Note: we use a *Request/*Response naming convention rather than *Command/*Reply,
+// as the former is the gRPC convention and that's where we intend to move.
+
+// Next tag to use: 2
+message BucketSpace {
+ uint64 space_id = 1;
+}
+
+// Next tag to use: 2
+message BucketId {
+ fixed64 raw_id = 1;
+}
+
+// Next tag to use: 3
+message Bucket {
+ uint64 space_id = 1;
+ fixed64 raw_bucket_id = 2;
+}
+
+// Next tag to use: 9
+message BucketInfoV1 {
+ uint64 last_modified_timestamp = 1;
+ // TODO version the checksum instead?
+ fixed32 checksum = 2;
+ uint32 doc_count = 3;
+ uint32 total_doc_size = 4;
+ uint32 meta_count = 5;
+ uint32 used_file_size = 6;
+ bool ready = 7;
+ bool active = 8;
+}
+
+// Next tag to use: 10
+message BucketInfoV2 {
+ uint64 last_modified_timestamp = 1;
+ // TODO version the checksum instead?
+ fixed64 checksum_lo = 2;
+ fixed64 checksum_hi = 3;
+ uint32 doc_count = 4;
+ uint32 total_doc_size = 5;
+ uint32 meta_count = 6;
+ uint32 used_file_size = 7;
+ bool ready = 8;
+ bool active = 9;
+}
+
+// Next tag to use: 3
+message BucketInfo {
+ BucketInfoV1 info_v1 = 1;
+ BucketInfoV2 info_v2 = 2;
+}
+
+// TODO these should ideally be gRPC headers..
+message RequestHeader {
+ uint64 message_id = 1;
+ uint32 priority = 2; // Always in range [0, 255]
+ uint32 source_index = 3; // Always in range [0, 65535]
+ fixed32 loadtype_id = 4;
+}
+
+// TODO these should ideally be gRPC headers..
+message ResponseHeader {
+ // TODO this should ideally be gRPC Status...
+ uint32 return_code_id = 1;
+ bytes return_code_message = 2; // FIXME it's `bytes` since `string` will check for UTF-8... might not hold...
+ uint64 message_id = 3;
+ uint32 priority = 4; // Always in range [0, 255]
+}
+
+// Next tag to use: 3
+message DeleteBucketRequest {
+ Bucket bucket = 1;
+ BucketInfo expected_bucket_info = 2;
+}
+
+// Next tag to use: 3
+message DeleteBucketResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+message Document {
+ bytes payload = 1;
+}
+
+message DocumentId {
+ bytes id = 1;
+}
+
+message TestAndSetCondition {
+ bytes selection = 1;
+}
+
+message PutRequest {
+ Bucket bucket = 1;
+ Document document = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
+}
+
+message PutResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ bool was_found = 3;
+}
+
+message Update {
+ bytes payload = 1;
+}
+
+message UpdateRequest {
+ Bucket bucket = 1;
+ Update update = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
+}
+
+message UpdateResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ uint64 updated_timestamp = 3;
+}
+
+message RemoveRequest {
+ Bucket bucket = 1;
+ bytes document_id = 2;
+ uint64 new_timestamp = 3;
+ TestAndSetCondition condition = 4;
+}
+
+message RemoveResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ uint64 removed_timestamp = 3;
+}
+
+message GetRequest {
+ Bucket bucket = 1;
+ bytes document_id = 2;
+ bytes field_set = 3;
+ uint64 before_timestamp = 4;
+}
+
+message GetResponse {
+ Document document = 1;
+ uint64 last_modified_timestamp = 2;
+ BucketInfo bucket_info = 3;
+ BucketId remapped_bucket_id = 4;
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
index 042ec7850ef..67f02aa2d2a 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
@@ -73,6 +73,9 @@ public:
SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override;
void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override;
void onDecodeReply(BBuf&, api::StorageReply&) const override;
+
+protected:
+ const documentapi::LoadTypeSet& loadTypes() const noexcept { return _loadTypes; };
};
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
new file mode 100644
index 00000000000..c64fabb1c81
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -0,0 +1,549 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+// Disable warnings emitted by protoc generated
+// TODO move into own forwarding header file
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wsuggest-override"
+
+#include "protocolserialization7.h"
+#include "serializationhelper.h"
+#include "storageapi.pb.h"
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/util/bufferexceptions.h>
+
+#pragma GCC diagnostic pop
+
+namespace storage::mbusprot {
+
+ProtocolSerialization7::ProtocolSerialization7(const std::shared_ptr<const document::DocumentTypeRepo>& repo,
+ const documentapi::LoadTypeSet& loadTypes)
+ : ProtocolSerialization6_0(repo, loadTypes)
+{
+}
+
+namespace {
+
+void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) {
+ dest.set_raw_bucket_id(src.getBucketId().getRawId());
+ dest.set_space_id(src.getBucketSpace().getId());
+}
+
+void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) {
+ auto* info = dest.mutable_info_v1();
+ info->set_last_modified_timestamp(src.getLastModified());
+ info->set_checksum(src.getChecksum());
+ info->set_doc_count(src.getDocumentCount());
+ info->set_total_doc_size(src.getTotalDocumentSize());
+ info->set_meta_count(src.getMetaCount());
+ info->set_used_file_size(src.getUsedFileSize());
+ info->set_active(src.isActive());
+ info->set_ready(src.isReady());
+}
+
+document::Bucket get_bucket(const protobuf::Bucket& src) {
+ return document::Bucket(document::BucketSpace(src.space_id()),
+ document::BucketId(src.raw_bucket_id()));
+}
+
+api::BucketInfo get_bucket_info(const protobuf::BucketInfo& src) {
+ if (!src.has_info_v1()) {
+ return {};
+ }
+ api::BucketInfo info;
+ const auto& s = src.info_v1();
+ info.setLastModified(s.last_modified_timestamp());
+ info.setChecksum(s.checksum());
+ info.setDocumentCount(s.doc_count());
+ info.setTotalDocumentSize(s.total_doc_size());
+ info.setMetaCount(s.meta_count());
+ info.setUsedFileSize(s.used_file_size());
+ info.setActive(s.active());
+ info.setReady(s.ready());
+ return info;
+}
+
+documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) {
+ return documentapi::TestAndSetCondition(src.selection());
+}
+
+void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) {
+ dest.set_selection(src.getSelection().data(), src.getSelection().size());
+}
+
+// TODO add test with unset doc field in root proto
+std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src_doc.payload().empty()) {
+ document::ByteBuffer doc_buf(src_doc.payload().data(), src_doc.payload().size());
+ return std::make_shared<document::Document>(type_repo, doc_buf);
+ }
+ return std::shared_ptr<document::Document>();
+}
+
+void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) {
+ protobuf::RequestHeader hdr;
+ hdr.set_message_id(cmd.getMsgId());
+ hdr.set_priority(cmd.getPriority());
+ hdr.set_source_index(cmd.getSourceIndex());
+ hdr.set_loadtype_id(cmd.getLoadType().getId());
+
+ char dest[128]; // Only primitive fields, should be plenty large enough.
+ auto encoded_size = static_cast<uint32_t>(hdr.ByteSizeLong());
+ bool ok = hdr.SerializeToArray(dest, sizeof(dest));
+ assert(ok); // TODO
+ buf.putInt(encoded_size);
+ buf.putBytes(dest, encoded_size);
+}
+
+void write_response_header(vespalib::GrowableByteBuffer& buf, const api::StorageReply& reply) {
+ protobuf::ResponseHeader hdr;
+ const auto& result = reply.getResult();
+ hdr.set_return_code_id(static_cast<uint32_t>(result.getResult()));
+ if (!result.getMessage().empty()) {
+ hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size());
+ }
+ hdr.set_message_id(reply.getMsgId());
+ hdr.set_priority(reply.getPriority());
+
+ std::string encoded; // TODO wrap in zero copy buffers!
+ bool ok = hdr.SerializeToString(&encoded);
+ assert(ok); // TODO
+ buf.putInt(static_cast<uint32_t>(encoded.size()));
+ buf.putBytes(encoded.data(), static_cast<uint32_t>(encoded.size()));
+}
+
+void decode_request_header(document::ByteBuffer& buf, protobuf::RequestHeader& hdr) {
+ auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf));
+ if (hdr_len > buf.getRemaining()) {
+ throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len);
+ }
+ bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len);
+ if (!ok) {
+ throw vespalib::IllegalArgumentException("Malformed protobuf request header");
+ }
+ buf.incPos(hdr_len);
+}
+
+void decode_response_header(document::ByteBuffer& buf, protobuf::ResponseHeader& hdr) {
+ auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf));
+ if (hdr_len > buf.getRemaining()) {
+ throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len);
+ }
+ bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len);
+ if (!ok) {
+ throw vespalib::IllegalArgumentException("Malformed protobuf response header");
+ }
+ buf.incPos(hdr_len);
+}
+
+} // anonymous namespace
+
+template <typename ProtobufType>
+class BaseEncoder {
+ vespalib::GrowableByteBuffer& _out_buf;
+ ::google::protobuf::Arena _arena;
+ ProtobufType* _proto_obj;
+public:
+ explicit BaseEncoder(vespalib::GrowableByteBuffer& out_buf)
+ : _out_buf(out_buf),
+ _arena(),
+ _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
+ {
+ }
+
+ void encode() {
+ assert(_proto_obj != nullptr);
+ std::string encoded; // TODO wrap in zero copy buffers!
+ bool ok = _proto_obj->SerializeToString(&encoded);
+ assert(ok); // TODO
+ _out_buf.putBytes(encoded.data(), encoded.size());
+ _proto_obj = nullptr;
+ }
+protected:
+ vespalib::GrowableByteBuffer& buffer() noexcept { return _out_buf; }
+
+ // Precondition: encode() is not called
+ ProtobufType& proto_obj() noexcept { return *_proto_obj; }
+ const ProtobufType& proto_obj() const noexcept { return *_proto_obj; }
+};
+
+template <typename ProtobufType>
+class RequestEncoder : public BaseEncoder<ProtobufType> {
+public:
+ RequestEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& cmd)
+ : BaseEncoder<ProtobufType>(out_buf)
+ {
+ write_request_header(out_buf, cmd);
+ }
+
+ // Precondition: encode() is not called
+ ProtobufType& request() noexcept { return this->proto_obj(); }
+ const ProtobufType& request() const noexcept { return this->proto_obj(); }
+};
+
+template <typename ProtobufType>
+class ResponseEncoder : public BaseEncoder<ProtobufType> {
+public:
+ ResponseEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply)
+ : BaseEncoder<ProtobufType>(out_buf)
+ {
+ write_response_header(out_buf, reply);
+ }
+
+ // Precondition: encode() is not called
+ ProtobufType& response() noexcept { return this->proto_obj(); }
+ const ProtobufType& response() const noexcept { return this->proto_obj(); }
+};
+
+template <typename ProtobufType>
+class RequestDecoder {
+ protobuf::RequestHeader _hdr;
+ ::google::protobuf::Arena _arena;
+ ProtobufType* _proto_obj;
+ const documentapi::LoadTypeSet& _load_types;
+public:
+ RequestDecoder(document::ByteBuffer& in_buf, const documentapi::LoadTypeSet& load_types)
+ : _arena(),
+ _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)),
+ _load_types(load_types)
+ {
+ decode_request_header(in_buf, _hdr);
+ bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling
+ if (!ok) {
+ throw vespalib::IllegalArgumentException("Malformed protobuf request payload");
+ }
+ }
+
+ void transfer_meta_information_to(api::StorageCommand& dest) {
+ dest.forceMsgId(_hdr.message_id());
+ dest.setPriority(static_cast<uint8_t>(_hdr.priority()));
+ dest.setSourceIndex(static_cast<uint16_t>(_hdr.source_index()));
+ dest.setLoadType(_load_types[_hdr.loadtype_id()]);
+ }
+
+ ProtobufType& request() noexcept { return *_proto_obj; }
+ const ProtobufType& request() const noexcept { return *_proto_obj; }
+};
+
+template <typename ProtobufType>
+void transfer_bucket_info_response_fields_from_proto_to_msg(api::BucketInfoReply& dest, const ProtobufType& src) {
+ if (src.has_bucket_info()) {
+ dest.setBucketInfo(get_bucket_info(src.bucket_info()));
+ }
+ if (src.has_remapped_bucket_id()) {
+ dest.remapBucketId(document::BucketId(src.remapped_bucket_id().raw_id()));
+ }
+}
+
+template <typename ProtobufType>
+class ResponseDecoder {
+ protobuf::ResponseHeader _hdr;
+ ::google::protobuf::Arena _arena;
+ ProtobufType* _proto_obj;
+public:
+ explicit ResponseDecoder(document::ByteBuffer& in_buf)
+ : _arena(),
+ _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
+ {
+ decode_response_header(in_buf, _hdr);
+ bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling
+ if (!ok) {
+ throw vespalib::IllegalArgumentException("Malformed protobuf response payload");
+ }
+ }
+
+ ProtobufType& response() noexcept { return *_proto_obj; }
+ const ProtobufType& response() const noexcept { return *_proto_obj; }
+};
+
+template <typename ProtobufType, typename Func>
+void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) {
+ RequestEncoder<ProtobufType> enc(out_buf, msg);
+ set_bucket(*enc.request().mutable_bucket(), msg.getBucket());
+ f(enc.request());
+ enc.encode();
+}
+
+template <typename ProtobufType, typename Func>
+void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) {
+ ResponseEncoder<ProtobufType> enc(out_buf, reply);
+ auto& res = enc.response();
+ if (reply.hasBeenRemapped()) {
+ res.mutable_remapped_bucket_id()->set_raw_id(reply.getBucketId().getRawId());
+ }
+ set_bucket_info(*res.mutable_bucket_info(), reply.getBucketInfo());
+ f(res);
+ enc.encode();
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageCommand>
+ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const {
+ RequestDecoder<ProtobufType> dec(in_buf, loadTypes());
+ const auto& req = dec.request();
+ if (!req.has_bucket()) {
+ throw vespalib::IllegalArgumentException("Malformed protocol buffer request; no bucket"); // TODO proto type name?
+ }
+ const auto bucket = get_bucket(req.bucket());
+ auto cmd = f(req, bucket);
+ dec.transfer_meta_information_to(*cmd);
+ return cmd;
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageReply>
+ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const {
+ ResponseDecoder<ProtobufType> dec(in_buf);
+ const auto& res = dec.response();
+ auto reply = f(res);
+ transfer_bucket_info_response_fields_from_proto_to_msg(*reply, res);
+ return reply;
+}
+
+// TODO document protobuf ducktyping assumptions
+
+namespace {
+// Inherit from known base class just to avoid having to template this. We don't care about its subtype anyway.
+void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) {
+ // nothing to do here.
+}
+
+void set_document_if_present(protobuf::Document& target_doc, const document::Document* src_doc) {
+ if (src_doc) {
+ vespalib::nbostream stream;
+ src_doc->serialize(stream);
+ target_doc.set_payload(stream.peek(), stream.size());
+ }
+}
+
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const {
+ encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) {
+ set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const {
+ encode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::DeleteBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::DeleteBucketCommand>(bucket);
+ if (req.has_expected_bucket_info()) {
+ cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const {
+ encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) {
+ req.set_new_timestamp(msg.getTimestamp());
+ req.set_expected_old_timestamp(msg.getUpdateTimestamp());
+ if (msg.getCondition().isPresent()) {
+ set_tas_condition(*req.mutable_condition(), msg.getCondition());
+ }
+ set_document_if_present(*req.mutable_document(), msg.getDocument().get());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutReply& msg) const {
+ encode_bucket_info_response<protobuf::PutResponse>(buf, msg, [&](auto& res) {
+ res.set_was_found(msg.wasFound());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodePutCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::PutRequest>(buf, [&](auto& req, auto& bucket) {
+ auto document = get_document(req.document(), getTypeRepo());
+ auto cmd = std::make_unique<api::PutCommand>(bucket, std::move(document), req.new_timestamp());
+ cmd->setUpdateTimestamp(req.expected_old_timestamp());
+ if (req.has_condition()) {
+ cmd->setCondition(get_tas_condition(req.condition()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::PutResponse>(buf, [&](auto& res) {
+ return std::make_unique<api::PutReply>(static_cast<const api::PutCommand&>(cmd), res.was_found());
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const {
+ encode_bucket_request<protobuf::UpdateRequest>(buf, msg, [&](auto& req) {
+ auto* update = msg.getUpdate().get();
+ if (update) {
+ // TODO move out
+ vespalib::nbostream stream;
+ update->serializeHEAD(stream);
+ req.mutable_update()->set_payload(stream.peek(), stream.size());
+ }
+ req.set_new_timestamp(msg.getTimestamp());
+ req.set_expected_old_timestamp(msg.getOldTimestamp());
+ if (msg.getCondition().isPresent()) {
+ set_tas_condition(*req.mutable_condition(), msg.getCondition());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) const {
+ encode_bucket_info_response<protobuf::UpdateResponse>(buf, msg, [&](auto& res) {
+ res.set_updated_timestamp(msg.getOldTimestamp());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::UpdateRequest>(buf, [&](auto& req, auto& bucket) {
+ // TODO move out
+ std::shared_ptr<document::DocumentUpdate> update;
+ if (req.has_update() && !req.update().payload().empty()) {
+ update = document::DocumentUpdate::createHEAD(getTypeRepo(), vespalib::nbostream(
+ req.update().payload().data(), req.update().payload().size()));
+ }
+ auto cmd = std::make_unique<api::UpdateCommand>(bucket, std::move(update), req.new_timestamp());
+ cmd->setOldTimestamp(req.expected_old_timestamp());
+ if (req.has_condition()) {
+ cmd->setCondition(get_tas_condition(req.condition()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::UpdateResponse>(buf, [&](auto& res) {
+ return std::make_unique<api::UpdateReply>(static_cast<const api::UpdateCommand&>(cmd),
+ res.updated_timestamp());
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const {
+ encode_bucket_request<protobuf::RemoveRequest>(buf, msg, [&](auto& req) {
+ auto doc_id_str = msg.getDocumentId().toString();
+ req.set_document_id(doc_id_str.data(), doc_id_str.size());
+ req.set_new_timestamp(msg.getTimestamp());
+ if (msg.getCondition().isPresent()) {
+ set_tas_condition(*req.mutable_condition(), msg.getCondition());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveReply& msg) const {
+ encode_bucket_info_response<protobuf::RemoveResponse>(buf, msg, [&](auto& res) {
+ res.set_removed_timestamp(msg.getOldTimestamp());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::RemoveRequest>(buf, [&](auto& req, auto& bucket) {
+ document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size()));
+ auto cmd = std::make_unique<api::RemoveCommand>(bucket, doc_id, req.new_timestamp());
+ if (req.has_condition()) {
+ cmd->setCondition(get_tas_condition(req.condition()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::RemoveResponse>(buf, [&](auto& res) {
+ return std::make_unique<api::RemoveReply>(static_cast<const api::RemoveCommand&>(cmd),
+ res.removed_timestamp());
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const {
+ encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) {
+ auto doc_id = msg.getDocumentId().toString();
+ req.set_document_id(doc_id.data(), doc_id.size());
+ req.set_before_timestamp(msg.getBeforeTimestamp());
+ if (!msg.getFieldSet().empty()) {
+ req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const {
+ encode_bucket_info_response<protobuf::GetResponse>(buf, msg, [&](auto& res) {
+ // FIXME this will always create an empty document field!
+ set_document_if_present(*res.mutable_document(), msg.getDocument().get());
+ res.set_last_modified_timestamp(msg.getLastModifiedTimestamp());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::GetRequest>(buf, [&](auto& req, auto& bucket) {
+ document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size()));
+ return std::make_unique<api::GetCommand>(bucket, std::move(doc_id),
+ req.field_set(), req.before_timestamp());
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::GetResponse>(buf, [&](auto& res) {
+ try {
+ auto document = get_document(res.document(), getTypeRepo());
+ return std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd),
+ std::move(document), res.last_modified_timestamp());
+ } catch (std::exception& e) {
+ auto reply = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd),
+ std::shared_ptr<document::Document>(), 0u);
+ reply->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what()));
+ return reply;
+ }
+ });
+}
+
+/*
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Command& msg) const {
+ (void)buf;
+ (void)msg;
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Reply& msg) const {
+ (void)buf;
+ (void)msg;
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeCommand(BBuf& buf) const {
+ (void)buf;
+ return api::StorageCommand::UP();
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeReply(const SCmd& cmd, BBuf& buf) const {
+ (void)cmd;
+ (void)buf;
+ return api::StorageReply::UP();
+}
+ */
+
+/*
+ * TODO extend testing of:
+ * - bucket info in responses
+ * - bucket remapping in responses
+ */
+
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
new file mode 100644
index 00000000000..d6da89023bf
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
@@ -0,0 +1,60 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "protocolserialization6_0.h"
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+
+namespace storage {
+namespace mbusprot {
+
+/**
+ * Protocol serialization version that uses Protocol Buffers for all its binary
+ * encoding and decoding.
+ *
+ * TODO stop inheriting from _versioned_ protocol impl once all methods are implemented here.
+ */
+class ProtocolSerialization7 : public ProtocolSerialization6_0 {
+public:
+ ProtocolSerialization7(const std::shared_ptr<const document::DocumentTypeRepo> &repo,
+ const documentapi::LoadTypeSet &loadTypes);
+
+ // DeleteBucket
+ void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::DeleteBucketReply&) const override;
+ SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override;
+
+ // Put
+ void onEncode(GBBuf&, const api::PutCommand&) const override;
+ void onEncode(GBBuf&, const api::PutReply&) const override;
+ SCmd::UP onDecodePutCommand(BBuf&) const override;
+ SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override;
+
+ // Update
+ void onEncode(GBBuf&, const api::UpdateCommand&) const override;
+ void onEncode(GBBuf&, const api::UpdateReply&) const override;
+ SCmd::UP onDecodeUpdateCommand(BBuf&) const override;
+ SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override;
+
+ // Remove
+ void onEncode(GBBuf&, const api::RemoveCommand&) const override;
+ void onEncode(GBBuf&, const api::RemoveReply&) const override;
+ SCmd::UP onDecodeRemoveCommand(BBuf&) const override;
+ SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override;
+
+ // Get
+ void onEncode(GBBuf&, const api::GetCommand&) const override;
+ void onEncode(GBBuf&, const api::GetReply&) const override;
+ SCmd::UP onDecodeGetCommand(BBuf&) const override;
+ SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override;
+
+private:
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageReply> decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const;
+};
+
+}
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
index 7e6be0a84f5..6825be4afba 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
@@ -20,7 +20,8 @@ StorageProtocol::StorageProtocol(const std::shared_ptr<const document::DocumentT
: _serializer5_0(repo, loadTypes),
_serializer5_1(repo, loadTypes),
_serializer5_2(repo, loadTypes),
- _serializer6_0(repo, loadTypes)
+ _serializer6_0(repo, loadTypes),
+ _serializer7_0(repo, loadTypes)
{
}
@@ -33,6 +34,7 @@ StorageProtocol::createPolicy(const mbus::string&, const mbus::string&) const
}
namespace {
+ vespalib::Version version7_0(7, 0, 0); // FIXME
vespalib::Version version6_0(6, 240, 0);
vespalib::Version version5_2(5, 93, 30);
vespalib::Version version5_1(5, 1, 0);
@@ -106,8 +108,10 @@ StorageProtocol::encode(const vespalib::Version& version,
} else {
if (version < version6_0) {
return encodeMessage(_serializer5_2, routable, message, version5_2, version);
- } else {
+ } else if (version < version7_0) {
return encodeMessage(_serializer6_0, routable, message, version6_0, version);
+ } else {
+ return encodeMessage(_serializer7_0, routable, message, version7_0, version);
}
}
@@ -180,8 +184,10 @@ StorageProtocol::decode(const vespalib::Version & version,
} else {
if (version < version6_0) {
return decodeMessage(_serializer5_2, data, type, version5_2, version);
- } else {
+ } else if (version < version7_0) {
return decodeMessage(_serializer6_0, data, type, version6_0, version);
+ } else {
+ return decodeMessage(_serializer7_0, data, type, version7_0, version);
}
}
} catch (std::exception & e) {
diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
index 1acd7c9675f..67ea121c340 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
@@ -3,6 +3,7 @@
#include "protocolserialization5_2.h"
#include "protocolserialization6_0.h"
+#include "protocolserialization7.h"
#include <vespa/messagebus/iprotocol.h>
namespace storage::mbusprot {
@@ -28,6 +29,7 @@ private:
ProtocolSerialization5_1 _serializer5_1;
ProtocolSerialization5_2 _serializer5_2;
ProtocolSerialization6_0 _serializer6_0;
+ ProtocolSerialization7 _serializer7_0;
};
}