diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-04 11:44:21 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-05 11:27:32 +0000 |
commit | 0fd9e1693eccc7a8aea1db61871821121cf514d2 (patch) | |
tree | c6174d3bc63688135e8dba44692fb3f9f2c470dc /storageapi | |
parent | 7c13c34df95c0a5d66f6d4c91bfab508b2425117 (diff) |
Implement NotifyBucketChange, SplitBucket and JoinBuckets
Diffstat (limited to 'storageapi')
3 files changed, 158 insertions, 2 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto index e7dcd5f3457..19362bf40db 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto @@ -286,3 +286,36 @@ message BucketAndBucketInfo { message RequestBucketInfoResponse { repeated BucketAndBucketInfo bucket_infos = 1; } + +message NotifyBucketChangeRequest { + Bucket bucket = 1; + BucketInfo bucket_info = 2; +} + +message NotifyBucketChangeResponse { + // Currently empty +} + +message SplitBucketRequest { + Bucket bucket = 1; + uint32 min_split_bits = 2; + uint32 max_split_bits = 3; + uint32 min_byte_size = 4; + uint32 min_doc_count = 5; +} + +message SplitBucketResponse { + BucketId remapped_bucket_id = 1; + repeated BucketAndBucketInfo split_info = 2; +} + +message JoinBucketsRequest { + Bucket bucket = 1; + repeated BucketId source_buckets = 2; + uint32 min_join_bits = 3; +} + +message JoinBucketsResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 9578f88830e..0f9f5908ca9 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -8,6 +8,7 @@ #include "protocolserialization7.h" #include "serializationhelper.h" #include "storageapi.pb.h" +#include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/document/update/documentupdate.h> #include <vespa/document/util/bufferexceptions.h> @@ -721,7 +722,7 @@ void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffComma api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) { api::GetBucketDiffCommand::Entry e; e._timestamp = src.timestamp(); - e._gid = get_global_id(src.gid()); // TODO need presence check? + e._gid = get_global_id(src.gid()); e._headerSize = src.header_size(); e._bodySize = src.body_size(); e._flags = src.flags(); @@ -893,7 +894,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCo } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const { - encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](protobuf::RequestBucketInfoResponse& res) { + encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](auto& res) { auto* proto_info = res.mutable_bucket_infos(); proto_info->Reserve(msg.getBucketInfo().size()); for (const auto& entry : msg.getBucketInfo()) { @@ -941,6 +942,109 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con }); } +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const { + encode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, msg, [&](auto& req) { + set_bucket_info(*req.mutable_bucket_info(), msg.getBucketInfo()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const { + encode_response<protobuf::NotifyBucketChangeResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeNotifyBucketChangeCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, [&](auto& req, auto& bucket) { + auto bucket_info = get_bucket_info(req.bucket_info()); + return std::make_unique<api::NotifyBucketChangeCommand>(bucket, bucket_info); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::NotifyBucketChangeResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::NotifyBucketChangeReply>(static_cast<const api::NotifyBucketChangeCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const { + encode_bucket_request<protobuf::SplitBucketRequest>(buf, msg, [&](auto& req) { + req.set_min_split_bits(msg.getMinSplitBits()); + req.set_max_split_bits(msg.getMaxSplitBits()); + req.set_min_byte_size(msg.getMinByteSize()); + req.set_min_doc_count(msg.getMinDocCount()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const { + encode_bucket_response<protobuf::SplitBucketResponse>(buf, msg, [&](auto& res) { + for (const auto& split_info : msg.getSplitInfo()) { + auto* proto_info = res.add_split_info(); + proto_info->set_raw_bucket_id(split_info.first.getRawId()); + set_bucket_info(*proto_info->mutable_bucket_info(), split_info.second); + } + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeSplitBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::SplitBucketRequest>(buf, [&](auto& req, auto& bucket) { + auto cmd = std::make_unique<api::SplitBucketCommand>(bucket); + cmd->setMinSplitBits(static_cast<uint8_t>(req.min_split_bits())); + cmd->setMaxSplitBits(static_cast<uint8_t>(req.max_split_bits())); + cmd->setMinByteSize(req.min_byte_size()); + cmd->setMinDocCount(req.min_doc_count()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::SplitBucketResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::SplitBucketReply>(static_cast<const api::SplitBucketCommand&>(cmd)); + auto& dest_info = reply->getSplitInfo(); + dest_info.reserve(res.split_info_size()); + for (const auto& proto_info : res.split_info()) { + dest_info.emplace_back(document::BucketId(proto_info.raw_bucket_id()), + get_bucket_info(proto_info.bucket_info())); + } + return reply; + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const { + encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](protobuf::JoinBucketsRequest& req) { + for (const auto& source : msg.getSourceBuckets()) { + req.add_source_buckets()->set_raw_id(source.getRawId()); + } + req.set_min_join_bits(msg.getMinJoinBits()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const { + encode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::JoinBucketsRequest>(buf, [&](auto& req, auto& bucket) { + auto cmd = std::make_unique<api::JoinBucketsCommand>(bucket); + auto& entries = cmd->getSourceBuckets(); + for (const auto& proto_bucket : req.source_buckets()) { + entries.emplace_back(proto_bucket.raw_id()); + } + cmd->setMinJoinBits(req.min_join_bits()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::JoinBucketsReply>(static_cast<const api::JoinBucketsCommand&>(cmd)); + }); +} + /* // ----------------------------------------------------------------- @@ -971,6 +1075,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeReply(const SCmd& cmd, BBu * TODO extend testing of: * - bucket info in responses * - bucket remapping in responses + * - presence of fields */ } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h index 66fd3b110ed..5c527f8d032 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h @@ -85,6 +85,24 @@ public: SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override; SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override; + // NotifyBucketChange + void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override; + void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override; + SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override; + SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override; + + // SplitBucket + void onEncode(GBBuf&, const api::SplitBucketCommand&) const override; + void onEncode(GBBuf&, const api::SplitBucketReply&) const override; + SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override; + SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override; + + // JoinBuckets + void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override; + void onEncode(GBBuf&, const api::JoinBucketsReply&) const override; + SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override; + SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override; + private: template <typename ProtobufType, typename Func> std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const; |