diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-01 13:35:19 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-05 11:27:32 +0000 |
commit | 8d48dfa4f6f13aff4dcc81217d0ddba5fda6c4bc (patch) | |
tree | 3dcd13ae1e339d801bd5c3ec1559d0c90b878590 /storageapi | |
parent | fe0cf4f343fc150055ba4c5c64bf9faf8fb29526 (diff) |
Add start of protobuf serialization protocol implementation
Diffstat (limited to 'storageapi')
11 files changed, 814 insertions, 5 deletions
diff --git a/storageapi/src/tests/CMakeLists.txt b/storageapi/src/tests/CMakeLists.txt index eeb135e9f7a..ddc43c70004 100644 --- a/storageapi/src/tests/CMakeLists.txt +++ b/storageapi/src/tests/CMakeLists.txt @@ -24,6 +24,7 @@ vespa_add_executable(storageapi_testrunner_app TEST DEPENDS storageapi_testmessageapi storageapi + vdstestlib ) vespa_add_test( diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index c03fb0bbbf4..a0018578238 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -46,6 +46,7 @@ struct StorageProtocolTest : TestWithParam<vespalib::Version> { vespalib::Version _version5_1{5, 1, 0}; vespalib::Version _version5_2{5, 93, 30}; vespalib::Version _version6_0{6, 240, 0}; + vespalib::Version _version7_0{7, 0, 0}; // FIXME documentapi::LoadTypeSet _loadTypes; mbusprot::StorageProtocol _protocol; static std::vector<std::string> _nonVerboseMessageStrings; @@ -89,7 +90,8 @@ std::string version_as_gtest_string(TestParamInfo<vespalib::Version> info) { // TODO replace with INSTANTIATE_TEST_SUITE_P on newer gtest versions INSTANTIATE_TEST_CASE_P(MultiVersionTest, StorageProtocolTest, - Values(vespalib::Version(6, 240, 0)), + Values(vespalib::Version(6, 240, 0), + vespalib::Version(7, 0, 0)), // TODO proper 7 version version_as_gtest_string); std::vector<std::string> StorageProtocolTest::_nonVerboseMessageStrings; @@ -196,11 +198,13 @@ TEST_P(StorageProtocolTest, testPut) { auto reply = std::make_shared<PutReply>(*cmd2); ASSERT_TRUE(reply->hasDocument()); EXPECT_EQ(*_testDoc, *reply->getDocument()); + reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48)); auto reply2 = copyReply(reply); ASSERT_TRUE(reply2->hasDocument()); EXPECT_EQ(*_testDoc, *reply->getDocument()); EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId()); EXPECT_EQ(Timestamp(14), reply2->getTimestamp()); + EXPECT_EQ(BucketInfo(1,2,3,4,5, true, false, 48), reply2->getBucketInfo()); recordOutput(*cmd2); recordOutput(*reply2); @@ -228,10 +232,12 @@ TEST_P(StorageProtocolTest, testUpdate) { EXPECT_EQ(*update, *cmd2->getUpdate()); auto reply = std::make_shared<UpdateReply>(*cmd2, 8); + reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48)); auto reply2 = copyReply(reply); EXPECT_EQ(_testDocId, reply2->getDocumentId()); EXPECT_EQ(Timestamp(14), reply2->getTimestamp()); EXPECT_EQ(Timestamp(8), reply->getOldTimestamp()); + EXPECT_EQ(BucketInfo(1,2,3,4,5, true, false, 48), reply2->getBucketInfo()); recordOutput(*cmd2); recordOutput(*reply2); @@ -246,6 +252,7 @@ TEST_P(StorageProtocolTest, testGet) { EXPECT_EQ(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet()); auto reply = std::make_shared<GetReply>(*cmd2, _testDoc, 100); + reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48)); auto reply2 = copyReply(reply); ASSERT_TRUE(reply2.get() != nullptr); ASSERT_TRUE(reply2->getDocument().get() != nullptr); @@ -253,6 +260,7 @@ TEST_P(StorageProtocolTest, testGet) { EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId()); EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp()); EXPECT_EQ(Timestamp(100), reply2->getLastModifiedTimestamp()); + EXPECT_EQ(BucketInfo(1,2,3,4,5, true, false, 48), reply2->getBucketInfo()); recordOutput(*cmd2); recordOutput(*reply2); @@ -742,7 +750,12 @@ TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storag EXPECT_EQ(50u, cmd->getApproxByteSize()); auto cmd2 = copyCommand(cmd); - EXPECT_EQ(181u, cmd2->getApproxByteSize()); + auto version = GetParam(); + if (version.getMajor() == 7) { // Protobuf encoding + EXPECT_EQ(158u, cmd2->getApproxByteSize()); + } else { // Legacy encoding + EXPECT_EQ(181u, cmd2->getApproxByteSize()); + } } } // storage::api diff --git a/storageapi/src/vespa/storageapi/CMakeLists.txt b/storageapi/src/vespa/storageapi/CMakeLists.txt index c08dcbc2419..90eb6dd9eca 100644 --- a/storageapi/src/vespa/storageapi/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/CMakeLists.txt @@ -1,4 +1,5 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + vespa_add_library(storageapi SOURCES $<TARGET_OBJECTS:storageapi_message> @@ -8,3 +9,6 @@ vespa_add_library(storageapi INSTALL lib64 DEPENDS ) + +vespa_add_target_package_dependency(storageapi Protobuf) + diff --git a/storageapi/src/vespa/storageapi/mbusprot/.gitignore b/storageapi/src/vespa/storageapi/mbusprot/.gitignore index 526f91c6668..8e91fe9cab0 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/.gitignore +++ b/storageapi/src/vespa/storageapi/mbusprot/.gitignore @@ -5,3 +5,6 @@ .deps .libs Makefile +*.pb.h +*.pb.cc + diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt index d5952d7cb91..b82bf8381e4 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt @@ -1,4 +1,12 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +find_package(Protobuf REQUIRED) +PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS protobuf/storageapi.proto) + +# protoc-generated files emit compiler warnings that we normally treat as errors. +# Instead of rolling our own compiler plugin we'll pragmatically disable the noise. +set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override") + vespa_add_library(storageapi_mbusprot OBJECT SOURCES storagemessage.cpp @@ -11,5 +19,7 @@ vespa_add_library(storageapi_mbusprot OBJECT protocolserialization5_1.cpp protocolserialization5_2.cpp protocolserialization6_0.cpp + protocolserialization7.cpp + ${storageapi_PROTOBUF_SRCS} DEPENDS ) diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto new file mode 100644 index 00000000000..bac38f107b4 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto @@ -0,0 +1,158 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.mbusprot.protobuf; + +// Note: we use a *Request/*Response naming convention rather than *Command/*Reply, +// as the former is the gRPC convention and that's where we intend to move. + +// Next tag to use: 2 +message BucketSpace { + uint64 space_id = 1; +} + +// Next tag to use: 2 +message BucketId { + fixed64 raw_id = 1; +} + +// Next tag to use: 3 +message Bucket { + uint64 space_id = 1; + fixed64 raw_bucket_id = 2; +} + +// Next tag to use: 9 +message BucketInfoV1 { + uint64 last_modified_timestamp = 1; + // TODO version the checksum instead? + fixed32 checksum = 2; + uint32 doc_count = 3; + uint32 total_doc_size = 4; + uint32 meta_count = 5; + uint32 used_file_size = 6; + bool ready = 7; + bool active = 8; +} + +// Next tag to use: 10 +message BucketInfoV2 { + uint64 last_modified_timestamp = 1; + // TODO version the checksum instead? + fixed64 checksum_lo = 2; + fixed64 checksum_hi = 3; + uint32 doc_count = 4; + uint32 total_doc_size = 5; + uint32 meta_count = 6; + uint32 used_file_size = 7; + bool ready = 8; + bool active = 9; +} + +// Next tag to use: 3 +message BucketInfo { + BucketInfoV1 info_v1 = 1; + BucketInfoV2 info_v2 = 2; +} + +// TODO these should ideally be gRPC headers.. +message RequestHeader { + uint64 message_id = 1; + uint32 priority = 2; // Always in range [0, 255] + uint32 source_index = 3; // Always in range [0, 65535] + fixed32 loadtype_id = 4; +} + +// TODO these should ideally be gRPC headers.. +message ResponseHeader { + // TODO this should ideally be gRPC Status... + uint32 return_code_id = 1; + bytes return_code_message = 2; // FIXME it's `bytes` since `string` will check for UTF-8... might not hold... + uint64 message_id = 3; + uint32 priority = 4; // Always in range [0, 255] +} + +// Next tag to use: 3 +message DeleteBucketRequest { + Bucket bucket = 1; + BucketInfo expected_bucket_info = 2; +} + +// Next tag to use: 3 +message DeleteBucketResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} + +message Document { + bytes payload = 1; +} + +message DocumentId { + bytes id = 1; +} + +message TestAndSetCondition { + bytes selection = 1; +} + +message PutRequest { + Bucket bucket = 1; + Document document = 2; + uint64 new_timestamp = 3; + uint64 expected_old_timestamp = 4; // If zero; no expectation. + TestAndSetCondition condition = 5; +} + +message PutResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; + bool was_found = 3; +} + +message Update { + bytes payload = 1; +} + +message UpdateRequest { + Bucket bucket = 1; + Update update = 2; + uint64 new_timestamp = 3; + uint64 expected_old_timestamp = 4; // If zero; no expectation. + TestAndSetCondition condition = 5; +} + +message UpdateResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; + uint64 updated_timestamp = 3; +} + +message RemoveRequest { + Bucket bucket = 1; + bytes document_id = 2; + uint64 new_timestamp = 3; + TestAndSetCondition condition = 4; +} + +message RemoveResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; + uint64 removed_timestamp = 3; +} + +message GetRequest { + Bucket bucket = 1; + bytes document_id = 2; + bytes field_set = 3; + uint64 before_timestamp = 4; +} + +message GetResponse { + Document document = 1; + uint64 last_modified_timestamp = 2; + BucketInfo bucket_info = 3; + BucketId remapped_bucket_id = 4; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h index 042ec7850ef..67f02aa2d2a 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h @@ -73,6 +73,9 @@ public: SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override; void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override; void onDecodeReply(BBuf&, api::StorageReply&) const override; + +protected: + const documentapi::LoadTypeSet& loadTypes() const noexcept { return _loadTypes; }; }; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp new file mode 100644 index 00000000000..c64fabb1c81 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -0,0 +1,549 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +// Disable warnings emitted by protoc generated +// TODO move into own forwarding header file +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsuggest-override" + +#include "protocolserialization7.h" +#include "serializationhelper.h" +#include "storageapi.pb.h" +#include <vespa/document/update/documentupdate.h> +#include <vespa/document/util/bufferexceptions.h> + +#pragma GCC diagnostic pop + +namespace storage::mbusprot { + +ProtocolSerialization7::ProtocolSerialization7(const std::shared_ptr<const document::DocumentTypeRepo>& repo, + const documentapi::LoadTypeSet& loadTypes) + : ProtocolSerialization6_0(repo, loadTypes) +{ +} + +namespace { + +void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) { + dest.set_raw_bucket_id(src.getBucketId().getRawId()); + dest.set_space_id(src.getBucketSpace().getId()); +} + +void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) { + auto* info = dest.mutable_info_v1(); + info->set_last_modified_timestamp(src.getLastModified()); + info->set_checksum(src.getChecksum()); + info->set_doc_count(src.getDocumentCount()); + info->set_total_doc_size(src.getTotalDocumentSize()); + info->set_meta_count(src.getMetaCount()); + info->set_used_file_size(src.getUsedFileSize()); + info->set_active(src.isActive()); + info->set_ready(src.isReady()); +} + +document::Bucket get_bucket(const protobuf::Bucket& src) { + return document::Bucket(document::BucketSpace(src.space_id()), + document::BucketId(src.raw_bucket_id())); +} + +api::BucketInfo get_bucket_info(const protobuf::BucketInfo& src) { + if (!src.has_info_v1()) { + return {}; + } + api::BucketInfo info; + const auto& s = src.info_v1(); + info.setLastModified(s.last_modified_timestamp()); + info.setChecksum(s.checksum()); + info.setDocumentCount(s.doc_count()); + info.setTotalDocumentSize(s.total_doc_size()); + info.setMetaCount(s.meta_count()); + info.setUsedFileSize(s.used_file_size()); + info.setActive(s.active()); + info.setReady(s.ready()); + return info; +} + +documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) { + return documentapi::TestAndSetCondition(src.selection()); +} + +void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) { + dest.set_selection(src.getSelection().data(), src.getSelection().size()); +} + +// TODO add test with unset doc field in root proto +std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc, + const document::DocumentTypeRepo& type_repo) +{ + if (!src_doc.payload().empty()) { + document::ByteBuffer doc_buf(src_doc.payload().data(), src_doc.payload().size()); + return std::make_shared<document::Document>(type_repo, doc_buf); + } + return std::shared_ptr<document::Document>(); +} + +void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) { + protobuf::RequestHeader hdr; + hdr.set_message_id(cmd.getMsgId()); + hdr.set_priority(cmd.getPriority()); + hdr.set_source_index(cmd.getSourceIndex()); + hdr.set_loadtype_id(cmd.getLoadType().getId()); + + char dest[128]; // Only primitive fields, should be plenty large enough. + auto encoded_size = static_cast<uint32_t>(hdr.ByteSizeLong()); + bool ok = hdr.SerializeToArray(dest, sizeof(dest)); + assert(ok); // TODO + buf.putInt(encoded_size); + buf.putBytes(dest, encoded_size); +} + +void write_response_header(vespalib::GrowableByteBuffer& buf, const api::StorageReply& reply) { + protobuf::ResponseHeader hdr; + const auto& result = reply.getResult(); + hdr.set_return_code_id(static_cast<uint32_t>(result.getResult())); + if (!result.getMessage().empty()) { + hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size()); + } + hdr.set_message_id(reply.getMsgId()); + hdr.set_priority(reply.getPriority()); + + std::string encoded; // TODO wrap in zero copy buffers! + bool ok = hdr.SerializeToString(&encoded); + assert(ok); // TODO + buf.putInt(static_cast<uint32_t>(encoded.size())); + buf.putBytes(encoded.data(), static_cast<uint32_t>(encoded.size())); +} + +void decode_request_header(document::ByteBuffer& buf, protobuf::RequestHeader& hdr) { + auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf)); + if (hdr_len > buf.getRemaining()) { + throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); + } + bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); + if (!ok) { + throw vespalib::IllegalArgumentException("Malformed protobuf request header"); + } + buf.incPos(hdr_len); +} + +void decode_response_header(document::ByteBuffer& buf, protobuf::ResponseHeader& hdr) { + auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf)); + if (hdr_len > buf.getRemaining()) { + throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); + } + bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); + if (!ok) { + throw vespalib::IllegalArgumentException("Malformed protobuf response header"); + } + buf.incPos(hdr_len); +} + +} // anonymous namespace + +template <typename ProtobufType> +class BaseEncoder { + vespalib::GrowableByteBuffer& _out_buf; + ::google::protobuf::Arena _arena; + ProtobufType* _proto_obj; +public: + explicit BaseEncoder(vespalib::GrowableByteBuffer& out_buf) + : _out_buf(out_buf), + _arena(), + _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)) + { + } + + void encode() { + assert(_proto_obj != nullptr); + std::string encoded; // TODO wrap in zero copy buffers! + bool ok = _proto_obj->SerializeToString(&encoded); + assert(ok); // TODO + _out_buf.putBytes(encoded.data(), encoded.size()); + _proto_obj = nullptr; + } +protected: + vespalib::GrowableByteBuffer& buffer() noexcept { return _out_buf; } + + // Precondition: encode() is not called + ProtobufType& proto_obj() noexcept { return *_proto_obj; } + const ProtobufType& proto_obj() const noexcept { return *_proto_obj; } +}; + +template <typename ProtobufType> +class RequestEncoder : public BaseEncoder<ProtobufType> { +public: + RequestEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& cmd) + : BaseEncoder<ProtobufType>(out_buf) + { + write_request_header(out_buf, cmd); + } + + // Precondition: encode() is not called + ProtobufType& request() noexcept { return this->proto_obj(); } + const ProtobufType& request() const noexcept { return this->proto_obj(); } +}; + +template <typename ProtobufType> +class ResponseEncoder : public BaseEncoder<ProtobufType> { +public: + ResponseEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply) + : BaseEncoder<ProtobufType>(out_buf) + { + write_response_header(out_buf, reply); + } + + // Precondition: encode() is not called + ProtobufType& response() noexcept { return this->proto_obj(); } + const ProtobufType& response() const noexcept { return this->proto_obj(); } +}; + +template <typename ProtobufType> +class RequestDecoder { + protobuf::RequestHeader _hdr; + ::google::protobuf::Arena _arena; + ProtobufType* _proto_obj; + const documentapi::LoadTypeSet& _load_types; +public: + RequestDecoder(document::ByteBuffer& in_buf, const documentapi::LoadTypeSet& load_types) + : _arena(), + _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)), + _load_types(load_types) + { + decode_request_header(in_buf, _hdr); + bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling + if (!ok) { + throw vespalib::IllegalArgumentException("Malformed protobuf request payload"); + } + } + + void transfer_meta_information_to(api::StorageCommand& dest) { + dest.forceMsgId(_hdr.message_id()); + dest.setPriority(static_cast<uint8_t>(_hdr.priority())); + dest.setSourceIndex(static_cast<uint16_t>(_hdr.source_index())); + dest.setLoadType(_load_types[_hdr.loadtype_id()]); + } + + ProtobufType& request() noexcept { return *_proto_obj; } + const ProtobufType& request() const noexcept { return *_proto_obj; } +}; + +template <typename ProtobufType> +void transfer_bucket_info_response_fields_from_proto_to_msg(api::BucketInfoReply& dest, const ProtobufType& src) { + if (src.has_bucket_info()) { + dest.setBucketInfo(get_bucket_info(src.bucket_info())); + } + if (src.has_remapped_bucket_id()) { + dest.remapBucketId(document::BucketId(src.remapped_bucket_id().raw_id())); + } +} + +template <typename ProtobufType> +class ResponseDecoder { + protobuf::ResponseHeader _hdr; + ::google::protobuf::Arena _arena; + ProtobufType* _proto_obj; +public: + explicit ResponseDecoder(document::ByteBuffer& in_buf) + : _arena(), + _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)) + { + decode_response_header(in_buf, _hdr); + bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling + if (!ok) { + throw vespalib::IllegalArgumentException("Malformed protobuf response payload"); + } + } + + ProtobufType& response() noexcept { return *_proto_obj; } + const ProtobufType& response() const noexcept { return *_proto_obj; } +}; + +template <typename ProtobufType, typename Func> +void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) { + RequestEncoder<ProtobufType> enc(out_buf, msg); + set_bucket(*enc.request().mutable_bucket(), msg.getBucket()); + f(enc.request()); + enc.encode(); +} + +template <typename ProtobufType, typename Func> +void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) { + ResponseEncoder<ProtobufType> enc(out_buf, reply); + auto& res = enc.response(); + if (reply.hasBeenRemapped()) { + res.mutable_remapped_bucket_id()->set_raw_id(reply.getBucketId().getRawId()); + } + set_bucket_info(*res.mutable_bucket_info(), reply.getBucketInfo()); + f(res); + enc.encode(); +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageCommand> +ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const { + RequestDecoder<ProtobufType> dec(in_buf, loadTypes()); + const auto& req = dec.request(); + if (!req.has_bucket()) { + throw vespalib::IllegalArgumentException("Malformed protocol buffer request; no bucket"); // TODO proto type name? + } + const auto bucket = get_bucket(req.bucket()); + auto cmd = f(req, bucket); + dec.transfer_meta_information_to(*cmd); + return cmd; +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageReply> +ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const { + ResponseDecoder<ProtobufType> dec(in_buf); + const auto& res = dec.response(); + auto reply = f(res); + transfer_bucket_info_response_fields_from_proto_to_msg(*reply, res); + return reply; +} + +// TODO document protobuf ducktyping assumptions + +namespace { +// Inherit from known base class just to avoid having to template this. We don't care about its subtype anyway. +void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) { + // nothing to do here. +} + +void set_document_if_present(protobuf::Document& target_doc, const document::Document* src_doc) { + if (src_doc) { + vespalib::nbostream stream; + src_doc->serialize(stream); + target_doc.set_payload(stream.peek(), stream.size()); + } +} + +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const { + encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) { + set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const { + encode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::DeleteBucketRequest>(buf, [&](auto& req, auto& bucket) { + auto cmd = std::make_unique<api::DeleteBucketCommand>(bucket); + if (req.has_expected_bucket_info()) { + cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const { + encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) { + req.set_new_timestamp(msg.getTimestamp()); + req.set_expected_old_timestamp(msg.getUpdateTimestamp()); + if (msg.getCondition().isPresent()) { + set_tas_condition(*req.mutable_condition(), msg.getCondition()); + } + set_document_if_present(*req.mutable_document(), msg.getDocument().get()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutReply& msg) const { + encode_bucket_info_response<protobuf::PutResponse>(buf, msg, [&](auto& res) { + res.set_was_found(msg.wasFound()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodePutCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::PutRequest>(buf, [&](auto& req, auto& bucket) { + auto document = get_document(req.document(), getTypeRepo()); + auto cmd = std::make_unique<api::PutCommand>(bucket, std::move(document), req.new_timestamp()); + cmd->setUpdateTimestamp(req.expected_old_timestamp()); + if (req.has_condition()) { + cmd->setCondition(get_tas_condition(req.condition())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::PutResponse>(buf, [&](auto& res) { + return std::make_unique<api::PutReply>(static_cast<const api::PutCommand&>(cmd), res.was_found()); + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const { + encode_bucket_request<protobuf::UpdateRequest>(buf, msg, [&](auto& req) { + auto* update = msg.getUpdate().get(); + if (update) { + // TODO move out + vespalib::nbostream stream; + update->serializeHEAD(stream); + req.mutable_update()->set_payload(stream.peek(), stream.size()); + } + req.set_new_timestamp(msg.getTimestamp()); + req.set_expected_old_timestamp(msg.getOldTimestamp()); + if (msg.getCondition().isPresent()) { + set_tas_condition(*req.mutable_condition(), msg.getCondition()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) const { + encode_bucket_info_response<protobuf::UpdateResponse>(buf, msg, [&](auto& res) { + res.set_updated_timestamp(msg.getOldTimestamp()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::UpdateRequest>(buf, [&](auto& req, auto& bucket) { + // TODO move out + std::shared_ptr<document::DocumentUpdate> update; + if (req.has_update() && !req.update().payload().empty()) { + update = document::DocumentUpdate::createHEAD(getTypeRepo(), vespalib::nbostream( + req.update().payload().data(), req.update().payload().size())); + } + auto cmd = std::make_unique<api::UpdateCommand>(bucket, std::move(update), req.new_timestamp()); + cmd->setOldTimestamp(req.expected_old_timestamp()); + if (req.has_condition()) { + cmd->setCondition(get_tas_condition(req.condition())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::UpdateResponse>(buf, [&](auto& res) { + return std::make_unique<api::UpdateReply>(static_cast<const api::UpdateCommand&>(cmd), + res.updated_timestamp()); + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const { + encode_bucket_request<protobuf::RemoveRequest>(buf, msg, [&](auto& req) { + auto doc_id_str = msg.getDocumentId().toString(); + req.set_document_id(doc_id_str.data(), doc_id_str.size()); + req.set_new_timestamp(msg.getTimestamp()); + if (msg.getCondition().isPresent()) { + set_tas_condition(*req.mutable_condition(), msg.getCondition()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveReply& msg) const { + encode_bucket_info_response<protobuf::RemoveResponse>(buf, msg, [&](auto& res) { + res.set_removed_timestamp(msg.getOldTimestamp()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::RemoveRequest>(buf, [&](auto& req, auto& bucket) { + document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); + auto cmd = std::make_unique<api::RemoveCommand>(bucket, doc_id, req.new_timestamp()); + if (req.has_condition()) { + cmd->setCondition(get_tas_condition(req.condition())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::RemoveResponse>(buf, [&](auto& res) { + return std::make_unique<api::RemoveReply>(static_cast<const api::RemoveCommand&>(cmd), + res.removed_timestamp()); + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const { + encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) { + auto doc_id = msg.getDocumentId().toString(); + req.set_document_id(doc_id.data(), doc_id.size()); + req.set_before_timestamp(msg.getBeforeTimestamp()); + if (!msg.getFieldSet().empty()) { + req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const { + encode_bucket_info_response<protobuf::GetResponse>(buf, msg, [&](auto& res) { + // FIXME this will always create an empty document field! + set_document_if_present(*res.mutable_document(), msg.getDocument().get()); + res.set_last_modified_timestamp(msg.getLastModifiedTimestamp()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::GetRequest>(buf, [&](auto& req, auto& bucket) { + document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); + return std::make_unique<api::GetCommand>(bucket, std::move(doc_id), + req.field_set(), req.before_timestamp()); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::GetResponse>(buf, [&](auto& res) { + try { + auto document = get_document(res.document(), getTypeRepo()); + return std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), + std::move(document), res.last_modified_timestamp()); + } catch (std::exception& e) { + auto reply = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), + std::shared_ptr<document::Document>(), 0u); + reply->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what())); + return reply; + } + }); +} + +/* + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Command& msg) const { + (void)buf; + (void)msg; +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Reply& msg) const { + (void)buf; + (void)msg; +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeCommand(BBuf& buf) const { + (void)buf; + return api::StorageCommand::UP(); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeReply(const SCmd& cmd, BBuf& buf) const { + (void)cmd; + (void)buf; + return api::StorageReply::UP(); +} + */ + +/* + * TODO extend testing of: + * - bucket info in responses + * - bucket remapping in responses + */ + +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h new file mode 100644 index 00000000000..d6da89023bf --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h @@ -0,0 +1,60 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "protocolserialization6_0.h" +#include <vespa/documentapi/loadtypes/loadtypeset.h> + +namespace storage { +namespace mbusprot { + +/** + * Protocol serialization version that uses Protocol Buffers for all its binary + * encoding and decoding. + * + * TODO stop inheriting from _versioned_ protocol impl once all methods are implemented here. + */ +class ProtocolSerialization7 : public ProtocolSerialization6_0 { +public: + ProtocolSerialization7(const std::shared_ptr<const document::DocumentTypeRepo> &repo, + const documentapi::LoadTypeSet &loadTypes); + + // DeleteBucket + void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override; + void onEncode(GBBuf&, const api::DeleteBucketReply&) const override; + SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override; + SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override; + + // Put + void onEncode(GBBuf&, const api::PutCommand&) const override; + void onEncode(GBBuf&, const api::PutReply&) const override; + SCmd::UP onDecodePutCommand(BBuf&) const override; + SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override; + + // Update + void onEncode(GBBuf&, const api::UpdateCommand&) const override; + void onEncode(GBBuf&, const api::UpdateReply&) const override; + SCmd::UP onDecodeUpdateCommand(BBuf&) const override; + SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override; + + // Remove + void onEncode(GBBuf&, const api::RemoveCommand&) const override; + void onEncode(GBBuf&, const api::RemoveReply&) const override; + SCmd::UP onDecodeRemoveCommand(BBuf&) const override; + SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override; + + // Get + void onEncode(GBBuf&, const api::GetCommand&) const override; + void onEncode(GBBuf&, const api::GetReply&) const override; + SCmd::UP onDecodeGetCommand(BBuf&) const override; + SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override; + +private: + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const; + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageReply> decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const; +}; + +} +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp index 7e6be0a84f5..6825be4afba 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp @@ -20,7 +20,8 @@ StorageProtocol::StorageProtocol(const std::shared_ptr<const document::DocumentT : _serializer5_0(repo, loadTypes), _serializer5_1(repo, loadTypes), _serializer5_2(repo, loadTypes), - _serializer6_0(repo, loadTypes) + _serializer6_0(repo, loadTypes), + _serializer7_0(repo, loadTypes) { } @@ -33,6 +34,7 @@ StorageProtocol::createPolicy(const mbus::string&, const mbus::string&) const } namespace { + vespalib::Version version7_0(7, 0, 0); // FIXME vespalib::Version version6_0(6, 240, 0); vespalib::Version version5_2(5, 93, 30); vespalib::Version version5_1(5, 1, 0); @@ -106,8 +108,10 @@ StorageProtocol::encode(const vespalib::Version& version, } else { if (version < version6_0) { return encodeMessage(_serializer5_2, routable, message, version5_2, version); - } else { + } else if (version < version7_0) { return encodeMessage(_serializer6_0, routable, message, version6_0, version); + } else { + return encodeMessage(_serializer7_0, routable, message, version7_0, version); } } @@ -180,8 +184,10 @@ StorageProtocol::decode(const vespalib::Version & version, } else { if (version < version6_0) { return decodeMessage(_serializer5_2, data, type, version5_2, version); - } else { + } else if (version < version7_0) { return decodeMessage(_serializer6_0, data, type, version6_0, version); + } else { + return decodeMessage(_serializer7_0, data, type, version7_0, version); } } } catch (std::exception & e) { diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h index 1acd7c9675f..67ea121c340 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h +++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h @@ -3,6 +3,7 @@ #include "protocolserialization5_2.h" #include "protocolserialization6_0.h" +#include "protocolserialization7.h" #include <vespa/messagebus/iprotocol.h> namespace storage::mbusprot { @@ -28,6 +29,7 @@ private: ProtocolSerialization5_1 _serializer5_1; ProtocolSerialization5_2 _serializer5_2; ProtocolSerialization6_0 _serializer6_0; + ProtocolSerialization7 _serializer7_0; }; } |