diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-03 15:38:03 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-05 11:27:32 +0000 |
commit | 7c13c34df95c0a5d66f6d4c91bfab508b2425117 (patch) | |
tree | e2437259e1cd6a533fca630892e6badd9b066b5f /storageapi | |
parent | 149a22e94b08c874298eb0bb0b53d53be453cc46 (diff) |
Add support for ApplyBucketDiff and RequestBucketInfo
Diffstat (limited to 'storageapi')
4 files changed, 256 insertions, 1 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt index b82bf8381e4..36fb168d5f1 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt @@ -5,7 +5,10 @@ PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS protobuf # protoc-generated files emit compiler warnings that we normally treat as errors. # Instead of rolling our own compiler plugin we'll pragmatically disable the noise. -set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override") +set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override -Wno-inline") +# protoc explicitly annotates methods with inline, which triggers -Werror=inline when +# the header file grows over a certain size. +set_source_files_properties(protocolserialization7.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline") vespa_add_library(storageapi_mbusprot OBJECT SOURCES diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto index 7bdda0e54d5..e7dcd5f3457 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto @@ -239,3 +239,50 @@ message GetBucketDiffResponse { repeated MetaDiffEntry diff = 2; } +message ApplyDiffEntry { + MetaDiffEntry entry_meta = 1; + bytes document_id = 2; + bytes header_blob = 3; // TODO use singular blob + bytes body_blob = 4; +} + +message ApplyBucketDiffRequest { + Bucket bucket = 1; + repeated MergeNode nodes = 2; + uint32 max_buffer_size = 3; + repeated ApplyDiffEntry entries = 4; +} + +message ApplyBucketDiffResponse { + BucketId remapped_bucket_id = 1; + repeated ApplyDiffEntry entries = 4; +} + +message ExplicitBucketSet { + // `Bucket` is not needed, as the space is inferred from the owning message. + repeated BucketId bucket_ids = 2; +} + +message AllBuckets { + uint32 distributor_index = 1; + bytes cluster_state = 2; + bytes distribution_hash = 3; +} + +message RequestBucketInfoRequest { + BucketSpace bucket_space = 1; + oneof request_for { + ExplicitBucketSet explicit_bucket_set = 2; + AllBuckets all_buckets = 3; + } + // TODO bucket info version requested +} + +message BucketAndBucketInfo { + fixed64 raw_bucket_id = 1; + BucketInfo bucket_info = 2; +} + +message RequestBucketInfoResponse { + repeated BucketAndBucketInfo bucket_infos = 1; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 9da60ca39f8..9578f88830e 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -258,6 +258,42 @@ public: }; template <typename ProtobufType, typename Func> +void encode_request(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& msg, Func&& f) { + RequestEncoder<ProtobufType> enc(out_buf, msg); + f(enc.request()); + enc.encode(); +} + +template <typename ProtobufType, typename Func> +void encode_response(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply, Func&& f) { + ResponseEncoder<ProtobufType> enc(out_buf, reply); + auto& res = enc.response(); + f(res); + enc.encode(); +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageCommand> +ProtocolSerialization7::decode_request(document::ByteBuffer& in_buf, Func&& f) const { + RequestDecoder<ProtobufType> dec(in_buf, loadTypes()); + const auto& req = dec.request(); + auto cmd = f(req); + dec.transfer_meta_information_to(*cmd); + return cmd; +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageReply> +ProtocolSerialization7::decode_response(document::ByteBuffer& in_buf, Func&& f) const { + ResponseDecoder<ProtobufType> dec(in_buf); + const auto& res = dec.response(); + auto reply = f(res); + return reply; +} + +// TODO encode in terms of encode_request/response + +template <typename ProtobufType, typename Func> void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) { RequestEncoder<ProtobufType> enc(out_buf, msg); set_bucket(*enc.request().mutable_bucket(), msg.getBucket()); @@ -752,6 +788,159 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const S }); } +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const { + encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) { + // TODO dedupe + for (const auto& src_node : msg.getNodes()) { + auto* dest_node = req.add_nodes(); + dest_node->set_index(src_node.index); + dest_node->set_source_only(src_node.sourceOnly); + } + req.set_max_buffer_size(msg.getMaxBufferSize()); + // TODO dedupe + for (const auto& entry : msg.getDiff()) { + auto* proto_entry = req.add_entries(); + set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); + proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); + proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); + proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const { + encode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, msg, [&](auto& res) { + for (const auto& entry : msg.getDiff()) { + auto* proto_entry = res.add_entries(); + set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); + proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); + proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); + proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); + } + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) { + // TODO dedupe + using Node = api::MergeBucketCommand::Node; + std::vector<Node> nodes; + nodes.reserve(req.nodes_size()); + for (const auto& node : req.nodes()) { + nodes.emplace_back(node.index(), node.source_only()); + } + auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size()); + auto& diff = cmd->getDiff(); // TODO refactor + // TODO refactor, dedupe + size_t n_entries = req.entries_size(); + diff.resize(n_entries); + for (size_t i = 0; i < n_entries; ++i) { + auto& proto_entry = req.entries(i); + auto& dest = diff[i]; + dest._entry = get_diff_entry(proto_entry.entry_meta()); + dest._docName = proto_entry.document_id(); + dest._headerBlob.resize(proto_entry.header_blob().size()); + memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); + dest._bodyBlob.resize(proto_entry.body_blob().size()); + memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd)); + auto& diff = reply->getDiff(); // TODO refactor + // TODO refactor, dedupe + size_t n_entries = res.entries_size(); + diff.resize(n_entries); + for (size_t i = 0; i < n_entries; ++i) { + auto& proto_entry = res.entries(i); + auto& dest = diff[i]; + dest._entry = get_diff_entry(proto_entry.entry_meta()); + dest._docName = proto_entry.document_id(); + dest._headerBlob.resize(proto_entry.header_blob().size()); + memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); + dest._bodyBlob.resize(proto_entry.body_blob().size()); + memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); + } + return reply; + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const { + encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) { + req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId()); + auto& buckets = msg.getBuckets(); + if (!buckets.empty()) { + auto* proto_buckets = req.mutable_explicit_bucket_set(); + for (const auto& b : buckets) { + proto_buckets->add_bucket_ids()->set_raw_id(b.getRawId()); + } + } else { + auto* all_buckets = req.mutable_all_buckets(); + auto cluster_state = msg.getSystemState().toString(); + all_buckets->set_distributor_index(msg.getDistributor()); + all_buckets->set_cluster_state(cluster_state.data(), cluster_state.size()); + all_buckets->set_distribution_hash(msg.getDistributionHash().data(), msg.getDistributionHash().size()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const { + encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](protobuf::RequestBucketInfoResponse& res) { + auto* proto_info = res.mutable_bucket_infos(); + proto_info->Reserve(msg.getBucketInfo().size()); + for (const auto& entry : msg.getBucketInfo()) { + auto* bucket_and_info = proto_info->Add(); + bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId()); + set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info); + } + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const { + return decode_request<protobuf::RequestBucketInfoRequest>(buf, [&](auto& req) { + document::BucketSpace bucket_space(req.bucket_space().space_id()); + if (req.has_explicit_bucket_set()) { + const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size(); + std::vector<document::BucketId> buckets(n_buckets); + const auto& proto_buckets = req.explicit_bucket_set().bucket_ids(); + for (uint32_t i = 0; i < n_buckets; ++i) { + buckets[i] = document::BucketId(proto_buckets.Get(i).raw_id()); + } + return std::make_unique<api::RequestBucketInfoCommand>(bucket_space, std::move(buckets)); + } else if (req.has_all_buckets()) { + auto& all_req = req.all_buckets(); + return std::make_unique<api::RequestBucketInfoCommand>( + bucket_space, all_req.distributor_index(), + lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash()); + } else { + throw vespalib::IllegalArgumentException("RequestBucketInfo does not have any applicable fields set"); + } + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::RequestBucketInfoResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::RequestBucketInfoReply>(static_cast<const api::RequestBucketInfoCommand&>(cmd)); + auto& dest_entries = reply->getBucketInfo(); + uint32_t n_entries = res.bucket_infos_size(); + dest_entries.resize(n_entries); + for (uint32_t i = 0; i < n_entries; ++i) { + auto& proto_entry = res.bucket_infos(i); + dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id()); + dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info()); + } + return reply; + }); +} + /* // ----------------------------------------------------------------- diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h index 55d921d8431..66fd3b110ed 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h @@ -73,8 +73,24 @@ public: SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override; SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override; + // ApplyBucketDiff + void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override; + void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override; + SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override; + SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override; + + // RequestBucketInfo + void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override; + void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override; + SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override; + SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override; + private: template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const; + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageReply> decode_response(document::ByteBuffer& in_buf, Func&& f) const; + template <typename ProtobufType, typename Func> std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const; template <typename ProtobufType, typename Func> std::unique_ptr<api::StorageReply> decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const; |