summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-08 13:01:55 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-08 13:01:55 +0000
commit227106198e235b52549d8c48e38f01a438eef1a0 (patch)
treedcaa30d2d06443adf3dd6922e6573458e3b463e8 /storageapi
parentf26cb7110330be17582c7e23812e43f608626392 (diff)
Factor out shared protobuf code
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto2
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto18
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp323
3 files changed, 186 insertions, 157 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
index 48d482adc0c..66af749b69e 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -79,7 +79,7 @@ message GetResponse {
BucketId remapped_bucket_id = 4;
}
-// TODO consider deprecation/removal if this is not used in practice.
+// Next tag to use: 3
message RevertRequest {
Bucket bucket = 1;
repeated uint64 revert_tokens = 2;
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 847586a0248..f5fc94980b5 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -15,8 +15,8 @@ message DeleteBucketRequest {
// Next tag to use: 3
message DeleteBucketResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
}
// Next tag to use: 3
@@ -52,12 +52,12 @@ message MergeBucketResponse {
}
message MetaDiffEntry {
- uint64 timestamp = 1;
- GlobalId gid = 2;
- uint32 header_size = 3; // TODO one of these can be removed...!
- uint32 body_size = 4;
- uint32 flags = 5;
- uint32 has_mask = 6;
+ uint64 timestamp = 1;
+ GlobalId gid = 2;
+ uint32 header_size = 3;
+ uint32 body_size = 4;
+ uint32 flags = 5;
+ uint32 presence_mask = 6;
}
message GetBucketDiffRequest {
@@ -75,7 +75,7 @@ message GetBucketDiffResponse {
message ApplyDiffEntry {
MetaDiffEntry entry_meta = 1;
bytes document_id = 2;
- bytes header_blob = 3; // TODO use singular blob
+ bytes header_blob = 3;
bytes body_blob = 4;
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 68b504fb6e2..23ec5ab86e7 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -81,6 +81,22 @@ std::shared_ptr<document::Document> get_document(const protobuf::Document& src_d
return std::shared_ptr<document::Document>();
}
+void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) {
+ vespalib::nbostream stream;
+ src.serializeHEAD(stream);
+ dest.set_payload(stream.peek(), stream.size());
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::Update& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src.payload().empty()) {
+ return document::DocumentUpdate::createHEAD(
+ type_repo, vespalib::nbostream(src.payload().data(), src.payload().size()));
+ }
+ return std::shared_ptr<document::DocumentUpdate>();
+}
+
void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) {
protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages
hdr.set_message_id(cmd.getMsgId());
@@ -231,16 +247,6 @@ public:
};
template <typename ProtobufType>
-void transfer_bucket_info_response_fields_from_proto_to_msg(api::BucketInfoReply& dest, const ProtobufType& src) {
- if (src.has_bucket_info()) {
- dest.setBucketInfo(get_bucket_info(src.bucket_info()));
- }
- if (src.has_remapped_bucket_id()) {
- dest.remapBucketId(document::BucketId(src.remapped_bucket_id().raw_id()));
- }
-}
-
-template <typename ProtobufType>
class ResponseDecoder {
protobuf::ResponseHeader _hdr;
::google::protobuf::Arena _arena;
@@ -378,6 +384,8 @@ void set_document_if_present(protobuf::Document& target_doc, const document::Doc
}
// -----------------------------------------------------------------
+// Put
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const {
encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) {
@@ -415,15 +423,14 @@ api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd,
}
// -----------------------------------------------------------------
+// Update
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const {
encode_bucket_request<protobuf::UpdateRequest>(buf, msg, [&](auto& req) {
auto* update = msg.getUpdate().get();
if (update) {
- // TODO move out
- vespalib::nbostream stream;
- update->serializeHEAD(stream);
- req.mutable_update()->set_payload(stream.peek(), stream.size());
+ set_update(*req.mutable_update(), *update);
}
req.set_new_timestamp(msg.getTimestamp());
req.set_expected_old_timestamp(msg.getOldTimestamp());
@@ -441,12 +448,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) c
api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::UpdateRequest>(buf, [&](auto& req, auto& bucket) {
- // TODO move out
- std::shared_ptr<document::DocumentUpdate> update;
- if (req.has_update() && !req.update().payload().empty()) {
- update = document::DocumentUpdate::createHEAD(type_repo(), vespalib::nbostream(
- req.update().payload().data(), req.update().payload().size()));
- }
+ auto update = get_update(req.update(), type_repo());
auto cmd = std::make_unique<api::UpdateCommand>(bucket, std::move(update), req.new_timestamp());
cmd->setOldTimestamp(req.expected_old_timestamp());
if (req.has_condition()) {
@@ -464,6 +466,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cm
}
// -----------------------------------------------------------------
+// Remove
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const {
encode_bucket_request<protobuf::RemoveRequest>(buf, msg, [&](auto& req) {
@@ -501,6 +505,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cm
}
// -----------------------------------------------------------------
+// Get
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const {
encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) {
@@ -545,6 +551,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd,
}
// -----------------------------------------------------------------
+// Revert
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const {
encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) {
@@ -579,6 +587,34 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cm
}
// -----------------------------------------------------------------
+// RemoveLocation
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const {
+ encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) {
+ req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const {
+ encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) {
+ return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// DeleteBucket
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const {
encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) {
@@ -607,6 +643,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SC
}
// -----------------------------------------------------------------
+// CreateBucket
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const {
encode_bucket_request<protobuf::CreateBucketRequest>(buf, msg, [&](auto& req) {
@@ -633,15 +671,39 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SC
}
// -----------------------------------------------------------------
+// MergeBucket
+// -----------------------------------------------------------------
+
+namespace {
+
+void set_merge_nodes(::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& dest,
+ const std::vector<api::MergeBucketCommand::Node>& src)
+{
+ dest.Reserve(src.size());
+ for (const auto& src_node : src) {
+ auto* dest_node = dest.Add();
+ dest_node->set_index(src_node.index);
+ dest_node->set_source_only(src_node.sourceOnly);
+ }
+}
+
+std::vector<api::MergeBucketCommand::Node> get_merge_nodes(
+ const ::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& src)
+{
+ using Node = api::MergeBucketCommand::Node;
+ std::vector<Node> nodes;
+ nodes.reserve(src.size());
+ for (const auto& node : src) {
+ nodes.emplace_back(node.index(), node.source_only());
+ }
+ return nodes;
+}
+
+}
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const {
encode_bucket_request<protobuf::MergeBucketRequest>(buf, msg, [&](auto& req) {
- // TODO dedupe
- for (const auto& src_node : msg.getNodes()) {
- auto* dest_node = req.add_nodes();
- dest_node->set_index(src_node.index);
- dest_node->set_source_only(src_node.sourceOnly);
- }
+ set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
req.set_max_timestamp(msg.getMaxTimestamp());
req.set_cluster_state_version(msg.getClusterStateVersion());
for (uint16_t chain_node : msg.getChain()) {
@@ -656,12 +718,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& m
api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) {
- using Node = api::MergeBucketCommand::Node;
- std::vector<Node> nodes;
- nodes.reserve(req.nodes_size());
- for (const auto& node : req.nodes()) {
- nodes.emplace_back(node.index(), node.source_only());
- }
+ auto nodes = get_merge_nodes(req.nodes());
std::vector<uint16_t> chain;
chain.reserve(req.node_chain_size());
for (uint16_t node : req.node_chain()) {
@@ -682,6 +739,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCm
}
// -----------------------------------------------------------------
+// GetBucketDiff
+// -----------------------------------------------------------------
namespace {
@@ -712,7 +771,7 @@ void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffComma
dest.set_header_size(src._headerSize);
dest.set_body_size(src._bodySize);
dest.set_flags(src._flags);
- dest.set_has_mask(src._hasMask);
+ dest.set_presence_mask(src._hasMask);
}
api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) {
@@ -722,50 +781,49 @@ api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& s
e._headerSize = src.header_size();
e._bodySize = src.body_size();
e._flags = src.flags();
- e._hasMask = src.has_mask(); // TODO rename, ambiguous :I
+ e._hasMask = src.presence_mask();
return e;
}
-} // anynomous namespace
+void fill_proto_meta_diff(::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& dest,
+ const std::vector<api::GetBucketDiffCommand::Entry>& src) {
+ for (const auto& diff_entry : src) {
+ set_diff_entry(*dest.Add(), diff_entry);
+ }
+}
+
+void fill_api_meta_diff(std::vector<api::GetBucketDiffCommand::Entry>& dest,
+ const ::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& src) {
+ // FIXME GetBucketDiffReply ctor copies the diff from the request for some reason
+ // TODO verify this isn't actually used anywhere and remove this "feature".
+ dest.clear();
+ dest.reserve(src.size());
+ for (const auto& diff_entry : src) {
+ dest.emplace_back(get_diff_entry(diff_entry));
+ }
+}
+
+} // anonymous namespace
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const {
encode_bucket_request<protobuf::GetBucketDiffRequest>(buf, msg, [&](auto& req) {
- // TODO dedupe
- for (const auto& src_node : msg.getNodes()) {
- auto* dest_node = req.add_nodes();
- dest_node->set_index(src_node.index);
- dest_node->set_source_only(src_node.sourceOnly);
- }
+ set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
req.set_max_timestamp(msg.getMaxTimestamp());
- for (const auto& diff_entry : msg.getDiff()) {
- set_diff_entry(*req.add_diff(), diff_entry);
- }
+ fill_proto_meta_diff(*req.mutable_diff(), msg.getDiff());
});
}
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const {
encode_bucket_response<protobuf::GetBucketDiffResponse>(buf, msg, [&](auto& res) {
- for (const auto& diff_entry : msg.getDiff()) {
- set_diff_entry(*res.add_diff(), diff_entry);
- }
+ fill_proto_meta_diff(*res.mutable_diff(), msg.getDiff());
});
}
api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::GetBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
- // TODO dedupe
- using Node = api::MergeBucketCommand::Node;
- std::vector<Node> nodes;
- nodes.reserve(req.nodes_size());
- for (const auto& node : req.nodes()) {
- nodes.emplace_back(node.index(), node.source_only());
- }
+ auto nodes = get_merge_nodes(req.nodes());
auto cmd = std::make_unique<api::GetBucketDiffCommand>(bucket, std::move(nodes), req.max_timestamp());
- auto& diff = cmd->getDiff(); // TODO refactor
- diff.reserve(req.diff_size());
- for (const auto& diff_entry : req.diff()) {
- diff.emplace_back(get_diff_entry(diff_entry));
- }
+ fill_api_meta_diff(cmd->getDiff(), req.diff());
return cmd;
});
}
@@ -773,76 +831,70 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBu
api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
return decode_bucket_response<protobuf::GetBucketDiffResponse>(buf, [&](auto& res) {
auto reply = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd));
- // TODO dedupe
- auto& diff = reply->getDiff(); // TODO refactor
- // FIXME why does the ctor copy the diff from the command? remove entirely?
- diff.clear();
- diff.reserve(res.diff_size());
- for (const auto& diff_entry : res.diff()) {
- diff.emplace_back(get_diff_entry(diff_entry));
- }
+ fill_api_meta_diff(reply->getDiff(), res.diff());
return reply;
});
}
// -----------------------------------------------------------------
+// ApplyBucketDiff
+// -----------------------------------------------------------------
+
+namespace {
+
+void fill_api_apply_diff_vector(std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ const ::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& src)
+{
+ // We use the same approach as the legacy protocols here in that we pre-reserve and
+ // directly write into the vector. This avoids having to ensure all buffer management is movable.
+ size_t n_entries = src.size();
+ diff.resize(n_entries);
+ for (size_t i = 0; i < n_entries; ++i) {
+ auto& proto_entry = src.Get(i);
+ auto& dest = diff[i];
+ dest._entry = get_diff_entry(proto_entry.entry_meta());
+ dest._docName = proto_entry.document_id();
+ dest._headerBlob.resize(proto_entry.header_blob().size());
+ memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
+ dest._bodyBlob.resize(proto_entry.body_blob().size());
+ memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size());
+ }
+}
+
+void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& dest,
+ const std::vector<api::ApplyBucketDiffCommand::Entry>& src)
+{
+ dest.Reserve(src.size());
+ for (const auto& entry : src) {
+ auto* proto_entry = dest.Add();
+ set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry);
+ proto_entry->set_document_id(entry._docName.data(), entry._docName.size());
+ proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size());
+ proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size());
+ }
+}
+
+} // anonymous namespace
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const {
encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) {
- // TODO dedupe
- for (const auto& src_node : msg.getNodes()) {
- auto* dest_node = req.add_nodes();
- dest_node->set_index(src_node.index);
- dest_node->set_source_only(src_node.sourceOnly);
- }
+ set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
req.set_max_buffer_size(msg.getMaxBufferSize());
- // TODO dedupe
- for (const auto& entry : msg.getDiff()) {
- auto* proto_entry = req.add_entries();
- set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry);
- proto_entry->set_document_id(entry._docName.data(), entry._docName.size());
- proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size());
- proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size());
- }
+ fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff());
});
}
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const {
encode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, msg, [&](auto& res) {
- for (const auto& entry : msg.getDiff()) {
- auto* proto_entry = res.add_entries();
- set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry);
- proto_entry->set_document_id(entry._docName.data(), entry._docName.size());
- proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size());
- proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size());
- }
+ fill_proto_apply_diff_vector(*res.mutable_entries(), msg.getDiff());
});
}
api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
- // TODO dedupe
- using Node = api::MergeBucketCommand::Node;
- std::vector<Node> nodes;
- nodes.reserve(req.nodes_size());
- for (const auto& node : req.nodes()) {
- nodes.emplace_back(node.index(), node.source_only());
- }
+ auto nodes = get_merge_nodes(req.nodes());
auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size());
- auto& diff = cmd->getDiff(); // TODO refactor
- // TODO refactor, dedupe
- size_t n_entries = req.entries_size();
- diff.resize(n_entries);
- for (size_t i = 0; i < n_entries; ++i) {
- auto& proto_entry = req.entries(i);
- auto& dest = diff[i];
- dest._entry = get_diff_entry(proto_entry.entry_meta());
- dest._docName = proto_entry.document_id();
- dest._headerBlob.resize(proto_entry.header_blob().size());
- memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
- dest._bodyBlob.resize(proto_entry.body_blob().size());
- memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size());
- }
+ fill_api_apply_diff_vector(cmd->getDiff(), req.entries());
return cmd;
});
}
@@ -850,25 +902,14 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(B
api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
return decode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, [&](auto& res) {
auto reply = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd));
- auto& diff = reply->getDiff(); // TODO refactor
- // TODO refactor, dedupe
- size_t n_entries = res.entries_size();
- diff.resize(n_entries);
- for (size_t i = 0; i < n_entries; ++i) {
- auto& proto_entry = res.entries(i);
- auto& dest = diff[i];
- dest._entry = get_diff_entry(proto_entry.entry_meta());
- dest._docName = proto_entry.document_id();
- dest._headerBlob.resize(proto_entry.header_blob().size());
- memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
- dest._bodyBlob.resize(proto_entry.body_blob().size());
- memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size());
- }
+ fill_api_apply_diff_vector(reply->getDiff(), res.entries());
return reply;
});
}
// -----------------------------------------------------------------
+// RequestBucketInfo
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const {
encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) {
@@ -939,6 +980,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con
}
// -----------------------------------------------------------------
+// NotifyBucketChange
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const {
encode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, msg, [&](auto& req) {
@@ -964,6 +1007,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(co
}
// -----------------------------------------------------------------
+// SplitBucket
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const {
encode_bucket_request<protobuf::SplitBucketRequest>(buf, msg, [&](auto& req) {
@@ -1009,6 +1054,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCm
}
// -----------------------------------------------------------------
+// JoinBuckets
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const {
encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) {
@@ -1042,6 +1089,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCm
}
// -----------------------------------------------------------------
+// SetBucketState
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const {
encode_bucket_request<protobuf::SetBucketStateRequest>(buf, msg, [&](auto& req) {
@@ -1075,6 +1124,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const
}
// -----------------------------------------------------------------
+// CreateVisitor
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const {
encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) {
@@ -1173,6 +1224,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const S
}
// -----------------------------------------------------------------
+// DestroyVisitor
+// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const {
encode_request<protobuf::DestroyVisitorRequest>(buf, msg, [&](auto& req) {
@@ -1196,30 +1249,6 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const
});
}
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const {
- encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) {
- req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const {
- encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) {
- return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd));
- });
-}
-
/*
* TODO extend testing of:
* - bucket info in responses