summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-03 08:51:18 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-05 11:27:32 +0000
commit149a22e94b08c874298eb0bb0b53d53be453cc46 (patch)
treea2a12bbe45d4afdde72405aaadf907b9ba3a7df7 /storageapi
parent8d48dfa4f6f13aff4dcc81217d0ddba5fda6c4bc (diff)
Add support for DeleteBucket, Merge and GetBucketDiff
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto165
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp294
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h38
3 files changed, 422 insertions, 75 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
index bac38f107b4..7bdda0e54d5 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
@@ -57,6 +57,12 @@ message BucketInfo {
BucketInfoV2 info_v2 = 2;
}
+message GlobalId {
+ // 96 bits of GID data in _little_ endian.
+ fixed64 lo_64 = 1;
+ fixed32 hi_32 = 2;
+}
+
// TODO these should ideally be gRPC headers..
message RequestHeader {
uint64 message_id = 1;
@@ -68,23 +74,13 @@ message RequestHeader {
// TODO these should ideally be gRPC headers..
message ResponseHeader {
// TODO this should ideally be gRPC Status...
- uint32 return_code_id = 1;
+ uint32 return_code_id = 1;
bytes return_code_message = 2; // FIXME it's `bytes` since `string` will check for UTF-8... might not hold...
- uint64 message_id = 3;
- uint32 priority = 4; // Always in range [0, 255]
-}
-
-// Next tag to use: 3
-message DeleteBucketRequest {
- Bucket bucket = 1;
- BucketInfo expected_bucket_info = 2;
+ uint64 message_id = 3;
+ uint32 priority = 4; // Always in range [0, 255]
}
-// Next tag to use: 3
-message DeleteBucketResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
-}
+// TODO extract bucket info response fields to own message!
message Document {
bytes payload = 1;
@@ -98,61 +94,148 @@ message TestAndSetCondition {
bytes selection = 1;
}
+// Next tag to use: 6
message PutRequest {
- Bucket bucket = 1;
- Document document = 2;
- uint64 new_timestamp = 3;
- uint64 expected_old_timestamp = 4; // If zero; no expectation.
- TestAndSetCondition condition = 5;
+ Bucket bucket = 1;
+ Document document = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
}
+// Next tag to use: 4
message PutResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- bool was_found = 3;
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ bool was_found = 3;
}
+// Next tag to use: 2
message Update {
bytes payload = 1;
}
+// Next tag to use: 6
message UpdateRequest {
- Bucket bucket = 1;
- Update update = 2;
- uint64 new_timestamp = 3;
+ Bucket bucket = 1;
+ Update update = 2;
+ uint64 new_timestamp = 3;
uint64 expected_old_timestamp = 4; // If zero; no expectation.
TestAndSetCondition condition = 5;
}
+// Next tag to use: 4
message UpdateResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- uint64 updated_timestamp = 3;
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ uint64 updated_timestamp = 3;
}
+// Next tag to use: 5
message RemoveRequest {
- Bucket bucket = 1;
- bytes document_id = 2;
- uint64 new_timestamp = 3;
+ Bucket bucket = 1;
+ bytes document_id = 2;
+ uint64 new_timestamp = 3;
TestAndSetCondition condition = 4;
}
+// Next tag to use: 4
message RemoveResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- uint64 removed_timestamp = 3;
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ uint64 removed_timestamp = 3;
}
+// Next tag to use: 5
message GetRequest {
- Bucket bucket = 1;
- bytes document_id = 2;
- bytes field_set = 3;
+ Bucket bucket = 1;
+ bytes document_id = 2;
+ bytes field_set = 3;
uint64 before_timestamp = 4;
}
+// Next tag to use: 5
message GetResponse {
- Document document = 1;
- uint64 last_modified_timestamp = 2;
- BucketInfo bucket_info = 3;
- BucketId remapped_bucket_id = 4;
+ Document document = 1;
+ uint64 last_modified_timestamp = 2;
+ BucketInfo bucket_info = 3;
+ BucketId remapped_bucket_id = 4;
+}
+
+// TODO consider deprecation/removal if this is not used in practice.
+message RevertRequest {
+ Bucket bucket = 1;
+ repeated uint64 revert_tokens = 2;
+}
+
+// Next tag to use: 3
+message RevertResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+// Next tag to use: 3
+message DeleteBucketRequest {
+ Bucket bucket = 1;
+ BucketInfo expected_bucket_info = 2;
+}
+
+// Next tag to use: 3
+message DeleteBucketResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+// Next tag to use: 3
+message CreateBucketRequest {
+ Bucket bucket = 1;
+ bool create_as_active = 2;
+}
+
+// Next tag to use: 3
+message CreateBucketResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+// Next tag to use: 3
+message MergeNode {
+ uint32 index = 1;
+ bool source_only = 2;
}
+
+// Next tag to use: 6
+message MergeBucketRequest {
+ Bucket bucket = 1;
+ uint32 cluster_state_version = 2;
+ uint64 max_timestamp = 3;
+ repeated MergeNode nodes = 4;
+ repeated uint32 node_chain = 5;
+}
+
+// Next tag to use: 2
+message MergeBucketResponse {
+ BucketId remapped_bucket_id = 1;
+}
+
+message MetaDiffEntry {
+ uint64 timestamp = 1;
+ GlobalId gid = 2;
+ uint32 header_size = 3; // TODO one of these can be removed...!
+ uint32 body_size = 4;
+ uint32 flags = 5;
+ uint32 has_mask = 6;
+}
+
+message GetBucketDiffRequest {
+ Bucket bucket = 1;
+ uint64 max_timestamp = 2;
+ repeated MergeNode nodes = 3;
+ repeated MetaDiffEntry diff = 4;
+}
+
+message GetBucketDiffResponse {
+ BucketId remapped_bucket_id = 1;
+ repeated MetaDiffEntry diff = 2;
+}
+
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index c64fabb1c81..9da60ca39f8 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -266,6 +266,18 @@ void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::Buc
}
template <typename ProtobufType, typename Func>
+void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) {
+ ResponseEncoder<ProtobufType> enc(out_buf, reply);
+ auto& res = enc.response();
+ if (reply.hasBeenRemapped()) {
+ res.mutable_remapped_bucket_id()->set_raw_id(reply.getBucketId().getRawId());
+ }
+ f(res);
+ enc.encode();
+}
+
+// TODO implement in terms of encode_bucket_response
+template <typename ProtobufType, typename Func>
void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) {
ResponseEncoder<ProtobufType> enc(out_buf, reply);
auto& res = enc.response();
@@ -293,6 +305,19 @@ ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func
template <typename ProtobufType, typename Func>
std::unique_ptr<api::StorageReply>
+ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const {
+ ResponseDecoder<ProtobufType> dec(in_buf);
+ const auto& res = dec.response();
+ auto reply = f(res);
+ if (res.has_remapped_bucket_id()) {
+ reply->remapBucketId(document::BucketId(res.remapped_bucket_id().raw_id()));
+ }
+ return reply;
+}
+
+// TODO implement this in terms of decode_bucket_response
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageReply>
ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const {
ResponseDecoder<ProtobufType> dec(in_buf);
const auto& res = dec.response();
@@ -321,34 +346,6 @@ void set_document_if_present(protobuf::Document& target_doc, const document::Doc
// -----------------------------------------------------------------
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const {
- encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) {
- set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const {
- encode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::DeleteBucketRequest>(buf, [&](auto& req, auto& bucket) {
- auto cmd = std::make_unique<api::DeleteBucketCommand>(bucket);
- if (req.has_expected_bucket_info()) {
- cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info()));
- }
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const {
encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) {
req.set_new_timestamp(msg.getTimestamp());
@@ -514,6 +511,247 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd,
});
}
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const {
+ encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) {
+ auto* tokens = req.mutable_revert_tokens();
+ assert(msg.getRevertTokens().size() < INT_MAX);
+ tokens->Reserve(static_cast<int>(msg.getRevertTokens().size()));
+ for (auto token : msg.getRevertTokens()) {
+ tokens->Add(token);
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertReply& msg) const {
+ encode_bucket_info_response<protobuf::RevertResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRevertCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::RevertRequest>(buf, [&](auto& req, auto& bucket) {
+ std::vector<api::Timestamp> tokens;
+ tokens.reserve(req.revert_tokens_size());
+ for (auto token : req.revert_tokens()) {
+ tokens.emplace_back(api::Timestamp(token));
+ }
+ return std::make_unique<api::RevertCommand>(bucket, std::move(tokens));
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::RevertResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::RevertReply>(static_cast<const api::RevertCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const {
+ encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) {
+ set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const {
+ encode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::DeleteBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::DeleteBucketCommand>(bucket);
+ if (req.has_expected_bucket_info()) {
+ cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const {
+ encode_bucket_request<protobuf::CreateBucketRequest>(buf, msg, [&](auto& req) {
+ req.set_create_as_active(msg.getActive());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const {
+ encode_bucket_info_response<protobuf::CreateBucketResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::CreateBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::CreateBucketCommand>(bucket);
+ cmd->setActive(req.create_as_active());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::CreateBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::CreateBucketReply>(static_cast<const api::CreateBucketCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const {
+ encode_bucket_request<protobuf::MergeBucketRequest>(buf, msg, [&](auto& req) {
+ // TODO dedupe
+ for (const auto& src_node : msg.getNodes()) {
+ auto* dest_node = req.add_nodes();
+ dest_node->set_index(src_node.index);
+ dest_node->set_source_only(src_node.sourceOnly);
+ }
+ req.set_max_timestamp(msg.getMaxTimestamp());
+ req.set_cluster_state_version(msg.getClusterStateVersion());
+ for (uint16_t chain_node : msg.getChain()) {
+ req.add_node_chain(chain_node);
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const {
+ encode_bucket_response<protobuf::MergeBucketResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ using Node = api::MergeBucketCommand::Node;
+ std::vector<Node> nodes;
+ nodes.reserve(req.nodes_size());
+ for (const auto& node : req.nodes()) {
+ nodes.emplace_back(node.index(), node.source_only());
+ }
+ std::vector<uint16_t> chain;
+ chain.reserve(req.node_chain_size());
+ for (uint16_t node : req.node_chain()) {
+ chain.emplace_back(node);
+ }
+
+ auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp());
+ cmd->setClusterStateVersion(req.cluster_state_version());
+ cmd->setChain(std::move(chain));
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::MergeBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+namespace {
+
+void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
+ static_assert(document::GlobalId::LENGTH == 12);
+ uint64_t lo64;
+ uint32_t hi32;
+ memcpy(&lo64, src.get() + sizeof(uint32_t), sizeof(uint64_t));
+ memcpy(&hi32, src.get(), sizeof(uint32_t));
+ dest.set_hi_32(hi32);
+ dest.set_lo_64(lo64);
+}
+
+document::GlobalId get_global_id(const protobuf::GlobalId& src) {
+ static_assert(document::GlobalId::LENGTH == 12);
+ const uint64_t lo64 = src.lo_64();
+ const uint32_t hi32 = src.hi_32();
+
+ char buf[document::GlobalId::LENGTH];
+ memcpy(buf, &hi32, sizeof(uint32_t));
+ memcpy(buf + sizeof(uint32_t), &lo64, sizeof(uint64_t));
+ return document::GlobalId(buf);
+}
+
+void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffCommand::Entry& src) {
+ dest.set_timestamp(src._timestamp);
+ set_global_id(*dest.mutable_gid(), src._gid);
+ dest.set_header_size(src._headerSize);
+ dest.set_body_size(src._bodySize);
+ dest.set_flags(src._flags);
+ dest.set_has_mask(src._flags);
+}
+
+api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) {
+ api::GetBucketDiffCommand::Entry e;
+ e._timestamp = src.timestamp();
+ e._gid = get_global_id(src.gid()); // TODO need presence check?
+ e._headerSize = src.header_size();
+ e._bodySize = src.body_size();
+ e._flags = src.flags();
+ e._hasMask = src.has_mask(); // TODO rename, ambiguous :I
+ return e;
+}
+
+} // anynomous namespace
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const {
+ encode_bucket_request<protobuf::GetBucketDiffRequest>(buf, msg, [&](auto& req) {
+ // TODO dedupe
+ for (const auto& src_node : msg.getNodes()) {
+ auto* dest_node = req.add_nodes();
+ dest_node->set_index(src_node.index);
+ dest_node->set_source_only(src_node.sourceOnly);
+ }
+ req.set_max_timestamp(msg.getMaxTimestamp());
+ for (const auto& diff_entry : msg.getDiff()) {
+ set_diff_entry(*req.add_diff(), diff_entry);
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const {
+ encode_bucket_response<protobuf::GetBucketDiffResponse>(buf, msg, [&](auto& res) {
+ for (const auto& diff_entry : msg.getDiff()) {
+ set_diff_entry(*res.add_diff(), diff_entry);
+ }
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::GetBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
+ // TODO dedupe
+ using Node = api::MergeBucketCommand::Node;
+ std::vector<Node> nodes;
+ nodes.reserve(req.nodes_size());
+ for (const auto& node : req.nodes()) {
+ nodes.emplace_back(node.index(), node.source_only());
+ }
+ auto cmd = std::make_unique<api::GetBucketDiffCommand>(bucket, std::move(nodes), req.max_timestamp());
+ auto& diff = cmd->getDiff(); // TODO refactor
+ diff.reserve(req.diff_size());
+ for (const auto& diff_entry : req.diff()) {
+ diff.emplace_back(get_diff_entry(diff_entry));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::GetBucketDiffResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd));
+ // TODO dedupe
+ auto& diff = reply->getDiff(); // TODO refactor
+ // FIXME why does the ctor copy the diff from the command? remove entirely?
+ diff.clear();
+ diff.reserve(res.diff_size());
+ for (const auto& diff_entry : res.diff()) {
+ diff.emplace_back(get_diff_entry(diff_entry));
+ }
+ return reply;
+ });
+}
+
/*
// -----------------------------------------------------------------
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
index d6da89023bf..55d921d8431 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
@@ -19,12 +19,6 @@ public:
ProtocolSerialization7(const std::shared_ptr<const document::DocumentTypeRepo> &repo,
const documentapi::LoadTypeSet &loadTypes);
- // DeleteBucket
- void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override;
- void onEncode(GBBuf&, const api::DeleteBucketReply&) const override;
- SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override;
- SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override;
-
// Put
void onEncode(GBBuf&, const api::PutCommand&) const override;
void onEncode(GBBuf&, const api::PutReply&) const override;
@@ -49,10 +43,42 @@ public:
SCmd::UP onDecodeGetCommand(BBuf&) const override;
SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override;
+ // Revert - TODO this is deprecated, no?
+ void onEncode(GBBuf&, const api::RevertCommand&) const override;
+ void onEncode(GBBuf&, const api::RevertReply&) const override;
+ SCmd::UP onDecodeRevertCommand(BBuf&) const override;
+ SRep::UP onDecodeRevertReply(const SCmd&, BBuf&) const override;
+
+ // DeleteBucket
+ void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::DeleteBucketReply&) const override;
+ SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override;
+
+ // CreateBucket
+ void onEncode(GBBuf&, const api::CreateBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::CreateBucketReply&) const override;
+ SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeCreateBucketReply(const SCmd&, BBuf&) const override;
+
+ // MergeBucket
+ void onEncode(GBBuf&, const api::MergeBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::MergeBucketReply&) const override;
+ SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeMergeBucketReply(const SCmd&, BBuf&) const override;
+
+ // GetBucketDiff
+ void onEncode(GBBuf&, const api::GetBucketDiffCommand&) const override;
+ void onEncode(GBBuf&, const api::GetBucketDiffReply&) const override;
+ SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override;
+ SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override;
+
private:
template <typename ProtobufType, typename Func>
std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const;
template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageReply> decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
std::unique_ptr<api::StorageReply> decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const;
};