diff options
3 files changed, 186 insertions, 157 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto index 48d482adc0c..66af749b69e 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -79,7 +79,7 @@ message GetResponse { BucketId remapped_bucket_id = 4; } -// TODO consider deprecation/removal if this is not used in practice. +// Next tag to use: 3 message RevertRequest { Bucket bucket = 1; repeated uint64 revert_tokens = 2; diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto index 847586a0248..f5fc94980b5 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -15,8 +15,8 @@ message DeleteBucketRequest { // Next tag to use: 3 message DeleteBucketResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; } // Next tag to use: 3 @@ -52,12 +52,12 @@ message MergeBucketResponse { } message MetaDiffEntry { - uint64 timestamp = 1; - GlobalId gid = 2; - uint32 header_size = 3; // TODO one of these can be removed...! - uint32 body_size = 4; - uint32 flags = 5; - uint32 has_mask = 6; + uint64 timestamp = 1; + GlobalId gid = 2; + uint32 header_size = 3; + uint32 body_size = 4; + uint32 flags = 5; + uint32 presence_mask = 6; } message GetBucketDiffRequest { @@ -75,7 +75,7 @@ message GetBucketDiffResponse { message ApplyDiffEntry { MetaDiffEntry entry_meta = 1; bytes document_id = 2; - bytes header_blob = 3; // TODO use singular blob + bytes header_blob = 3; bytes body_blob = 4; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 68b504fb6e2..23ec5ab86e7 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -81,6 +81,22 @@ std::shared_ptr<document::Document> get_document(const protobuf::Document& src_d return std::shared_ptr<document::Document>(); } +void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) { + vespalib::nbostream stream; + src.serializeHEAD(stream); + dest.set_payload(stream.peek(), stream.size()); +} + +std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::Update& src, + const document::DocumentTypeRepo& type_repo) +{ + if (!src.payload().empty()) { + return document::DocumentUpdate::createHEAD( + type_repo, vespalib::nbostream(src.payload().data(), src.payload().size())); + } + return std::shared_ptr<document::DocumentUpdate>(); +} + void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) { protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages hdr.set_message_id(cmd.getMsgId()); @@ -231,16 +247,6 @@ public: }; template <typename ProtobufType> -void transfer_bucket_info_response_fields_from_proto_to_msg(api::BucketInfoReply& dest, const ProtobufType& src) { - if (src.has_bucket_info()) { - dest.setBucketInfo(get_bucket_info(src.bucket_info())); - } - if (src.has_remapped_bucket_id()) { - dest.remapBucketId(document::BucketId(src.remapped_bucket_id().raw_id())); - } -} - -template <typename ProtobufType> class ResponseDecoder { protobuf::ResponseHeader _hdr; ::google::protobuf::Arena _arena; @@ -378,6 +384,8 @@ void set_document_if_present(protobuf::Document& target_doc, const document::Doc } // ----------------------------------------------------------------- +// Put +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const { encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) { @@ -415,15 +423,14 @@ api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, } // ----------------------------------------------------------------- +// Update +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const { encode_bucket_request<protobuf::UpdateRequest>(buf, msg, [&](auto& req) { auto* update = msg.getUpdate().get(); if (update) { - // TODO move out - vespalib::nbostream stream; - update->serializeHEAD(stream); - req.mutable_update()->set_payload(stream.peek(), stream.size()); + set_update(*req.mutable_update(), *update); } req.set_new_timestamp(msg.getTimestamp()); req.set_expected_old_timestamp(msg.getOldTimestamp()); @@ -441,12 +448,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) c api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const { return decode_bucket_request<protobuf::UpdateRequest>(buf, [&](auto& req, auto& bucket) { - // TODO move out - std::shared_ptr<document::DocumentUpdate> update; - if (req.has_update() && !req.update().payload().empty()) { - update = document::DocumentUpdate::createHEAD(type_repo(), vespalib::nbostream( - req.update().payload().data(), req.update().payload().size())); - } + auto update = get_update(req.update(), type_repo()); auto cmd = std::make_unique<api::UpdateCommand>(bucket, std::move(update), req.new_timestamp()); cmd->setOldTimestamp(req.expected_old_timestamp()); if (req.has_condition()) { @@ -464,6 +466,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cm } // ----------------------------------------------------------------- +// Remove +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const { encode_bucket_request<protobuf::RemoveRequest>(buf, msg, [&](auto& req) { @@ -501,6 +505,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cm } // ----------------------------------------------------------------- +// Get +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const { encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) { @@ -545,6 +551,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, } // ----------------------------------------------------------------- +// Revert +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const { encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) { @@ -579,6 +587,34 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cm } // ----------------------------------------------------------------- +// RemoveLocation +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const { + encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) { + req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const { + encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) { + return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// DeleteBucket +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const { encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) { @@ -607,6 +643,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SC } // ----------------------------------------------------------------- +// CreateBucket +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const { encode_bucket_request<protobuf::CreateBucketRequest>(buf, msg, [&](auto& req) { @@ -633,15 +671,39 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SC } // ----------------------------------------------------------------- +// MergeBucket +// ----------------------------------------------------------------- + +namespace { + +void set_merge_nodes(::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& dest, + const std::vector<api::MergeBucketCommand::Node>& src) +{ + dest.Reserve(src.size()); + for (const auto& src_node : src) { + auto* dest_node = dest.Add(); + dest_node->set_index(src_node.index); + dest_node->set_source_only(src_node.sourceOnly); + } +} + +std::vector<api::MergeBucketCommand::Node> get_merge_nodes( + const ::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& src) +{ + using Node = api::MergeBucketCommand::Node; + std::vector<Node> nodes; + nodes.reserve(src.size()); + for (const auto& node : src) { + nodes.emplace_back(node.index(), node.source_only()); + } + return nodes; +} + +} void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const { encode_bucket_request<protobuf::MergeBucketRequest>(buf, msg, [&](auto& req) { - // TODO dedupe - for (const auto& src_node : msg.getNodes()) { - auto* dest_node = req.add_nodes(); - dest_node->set_index(src_node.index); - dest_node->set_source_only(src_node.sourceOnly); - } + set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_timestamp(msg.getMaxTimestamp()); req.set_cluster_state_version(msg.getClusterStateVersion()); for (uint16_t chain_node : msg.getChain()) { @@ -656,12 +718,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& m api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const { return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) { - using Node = api::MergeBucketCommand::Node; - std::vector<Node> nodes; - nodes.reserve(req.nodes_size()); - for (const auto& node : req.nodes()) { - nodes.emplace_back(node.index(), node.source_only()); - } + auto nodes = get_merge_nodes(req.nodes()); std::vector<uint16_t> chain; chain.reserve(req.node_chain_size()); for (uint16_t node : req.node_chain()) { @@ -682,6 +739,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCm } // ----------------------------------------------------------------- +// GetBucketDiff +// ----------------------------------------------------------------- namespace { @@ -712,7 +771,7 @@ void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffComma dest.set_header_size(src._headerSize); dest.set_body_size(src._bodySize); dest.set_flags(src._flags); - dest.set_has_mask(src._hasMask); + dest.set_presence_mask(src._hasMask); } api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) { @@ -722,50 +781,49 @@ api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& s e._headerSize = src.header_size(); e._bodySize = src.body_size(); e._flags = src.flags(); - e._hasMask = src.has_mask(); // TODO rename, ambiguous :I + e._hasMask = src.presence_mask(); return e; } -} // anynomous namespace +void fill_proto_meta_diff(::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& dest, + const std::vector<api::GetBucketDiffCommand::Entry>& src) { + for (const auto& diff_entry : src) { + set_diff_entry(*dest.Add(), diff_entry); + } +} + +void fill_api_meta_diff(std::vector<api::GetBucketDiffCommand::Entry>& dest, + const ::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& src) { + // FIXME GetBucketDiffReply ctor copies the diff from the request for some reason + // TODO verify this isn't actually used anywhere and remove this "feature". + dest.clear(); + dest.reserve(src.size()); + for (const auto& diff_entry : src) { + dest.emplace_back(get_diff_entry(diff_entry)); + } +} + +} // anonymous namespace void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const { encode_bucket_request<protobuf::GetBucketDiffRequest>(buf, msg, [&](auto& req) { - // TODO dedupe - for (const auto& src_node : msg.getNodes()) { - auto* dest_node = req.add_nodes(); - dest_node->set_index(src_node.index); - dest_node->set_source_only(src_node.sourceOnly); - } + set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_timestamp(msg.getMaxTimestamp()); - for (const auto& diff_entry : msg.getDiff()) { - set_diff_entry(*req.add_diff(), diff_entry); - } + fill_proto_meta_diff(*req.mutable_diff(), msg.getDiff()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const { encode_bucket_response<protobuf::GetBucketDiffResponse>(buf, msg, [&](auto& res) { - for (const auto& diff_entry : msg.getDiff()) { - set_diff_entry(*res.add_diff(), diff_entry); - } + fill_proto_meta_diff(*res.mutable_diff(), msg.getDiff()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const { return decode_bucket_request<protobuf::GetBucketDiffRequest>(buf, [&](auto& req, auto& bucket) { - // TODO dedupe - using Node = api::MergeBucketCommand::Node; - std::vector<Node> nodes; - nodes.reserve(req.nodes_size()); - for (const auto& node : req.nodes()) { - nodes.emplace_back(node.index(), node.source_only()); - } + auto nodes = get_merge_nodes(req.nodes()); auto cmd = std::make_unique<api::GetBucketDiffCommand>(bucket, std::move(nodes), req.max_timestamp()); - auto& diff = cmd->getDiff(); // TODO refactor - diff.reserve(req.diff_size()); - for (const auto& diff_entry : req.diff()) { - diff.emplace_back(get_diff_entry(diff_entry)); - } + fill_api_meta_diff(cmd->getDiff(), req.diff()); return cmd; }); } @@ -773,76 +831,70 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBu api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response<protobuf::GetBucketDiffResponse>(buf, [&](auto& res) { auto reply = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd)); - // TODO dedupe - auto& diff = reply->getDiff(); // TODO refactor - // FIXME why does the ctor copy the diff from the command? remove entirely? - diff.clear(); - diff.reserve(res.diff_size()); - for (const auto& diff_entry : res.diff()) { - diff.emplace_back(get_diff_entry(diff_entry)); - } + fill_api_meta_diff(reply->getDiff(), res.diff()); return reply; }); } // ----------------------------------------------------------------- +// ApplyBucketDiff +// ----------------------------------------------------------------- + +namespace { + +void fill_api_apply_diff_vector(std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + const ::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& src) +{ + // We use the same approach as the legacy protocols here in that we pre-reserve and + // directly write into the vector. This avoids having to ensure all buffer management is movable. + size_t n_entries = src.size(); + diff.resize(n_entries); + for (size_t i = 0; i < n_entries; ++i) { + auto& proto_entry = src.Get(i); + auto& dest = diff[i]; + dest._entry = get_diff_entry(proto_entry.entry_meta()); + dest._docName = proto_entry.document_id(); + dest._headerBlob.resize(proto_entry.header_blob().size()); + memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); + dest._bodyBlob.resize(proto_entry.body_blob().size()); + memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); + } +} + +void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& dest, + const std::vector<api::ApplyBucketDiffCommand::Entry>& src) +{ + dest.Reserve(src.size()); + for (const auto& entry : src) { + auto* proto_entry = dest.Add(); + set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); + proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); + proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); + proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); + } +} + +} // anonymous namespace void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const { encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) { - // TODO dedupe - for (const auto& src_node : msg.getNodes()) { - auto* dest_node = req.add_nodes(); - dest_node->set_index(src_node.index); - dest_node->set_source_only(src_node.sourceOnly); - } + set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_buffer_size(msg.getMaxBufferSize()); - // TODO dedupe - for (const auto& entry : msg.getDiff()) { - auto* proto_entry = req.add_entries(); - set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); - proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); - proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); - proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); - } + fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const { encode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, msg, [&](auto& res) { - for (const auto& entry : msg.getDiff()) { - auto* proto_entry = res.add_entries(); - set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); - proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); - proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); - proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); - } + fill_proto_apply_diff_vector(*res.mutable_entries(), msg.getDiff()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const { return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) { - // TODO dedupe - using Node = api::MergeBucketCommand::Node; - std::vector<Node> nodes; - nodes.reserve(req.nodes_size()); - for (const auto& node : req.nodes()) { - nodes.emplace_back(node.index(), node.source_only()); - } + auto nodes = get_merge_nodes(req.nodes()); auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size()); - auto& diff = cmd->getDiff(); // TODO refactor - // TODO refactor, dedupe - size_t n_entries = req.entries_size(); - diff.resize(n_entries); - for (size_t i = 0; i < n_entries; ++i) { - auto& proto_entry = req.entries(i); - auto& dest = diff[i]; - dest._entry = get_diff_entry(proto_entry.entry_meta()); - dest._docName = proto_entry.document_id(); - dest._headerBlob.resize(proto_entry.header_blob().size()); - memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); - dest._bodyBlob.resize(proto_entry.body_blob().size()); - memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); - } + fill_api_apply_diff_vector(cmd->getDiff(), req.entries()); return cmd; }); } @@ -850,25 +902,14 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(B api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, [&](auto& res) { auto reply = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd)); - auto& diff = reply->getDiff(); // TODO refactor - // TODO refactor, dedupe - size_t n_entries = res.entries_size(); - diff.resize(n_entries); - for (size_t i = 0; i < n_entries; ++i) { - auto& proto_entry = res.entries(i); - auto& dest = diff[i]; - dest._entry = get_diff_entry(proto_entry.entry_meta()); - dest._docName = proto_entry.document_id(); - dest._headerBlob.resize(proto_entry.header_blob().size()); - memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); - dest._bodyBlob.resize(proto_entry.body_blob().size()); - memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); - } + fill_api_apply_diff_vector(reply->getDiff(), res.entries()); return reply; }); } // ----------------------------------------------------------------- +// RequestBucketInfo +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const { encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) { @@ -939,6 +980,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con } // ----------------------------------------------------------------- +// NotifyBucketChange +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const { encode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, msg, [&](auto& req) { @@ -964,6 +1007,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(co } // ----------------------------------------------------------------- +// SplitBucket +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const { encode_bucket_request<protobuf::SplitBucketRequest>(buf, msg, [&](auto& req) { @@ -1009,6 +1054,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCm } // ----------------------------------------------------------------- +// JoinBuckets +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const { encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) { @@ -1042,6 +1089,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCm } // ----------------------------------------------------------------- +// SetBucketState +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const { encode_bucket_request<protobuf::SetBucketStateRequest>(buf, msg, [&](auto& req) { @@ -1075,6 +1124,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const } // ----------------------------------------------------------------- +// CreateVisitor +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const { encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) { @@ -1173,6 +1224,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const S } // ----------------------------------------------------------------- +// DestroyVisitor +// ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const { encode_request<protobuf::DestroyVisitorRequest>(buf, msg, [&](auto& req) { @@ -1196,30 +1249,6 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const }); } -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const { - encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) { - req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const { - encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const { - return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) { - return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket); - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd)); - }); -} - /* * TODO extend testing of: * - bucket info in responses |