summaryrefslogtreecommitdiffstats
path: root/storageapi/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-03 15:38:03 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-05 11:27:32 +0000
commit7c13c34df95c0a5d66f6d4c91bfab508b2425117 (patch)
treee2437259e1cd6a533fca630892e6badd9b066b5f /storageapi/src
parent149a22e94b08c874298eb0bb0b53d53be453cc46 (diff)
Add support for ApplyBucketDiff and RequestBucketInfo
Diffstat (limited to 'storageapi/src')
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt5
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto47
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp189
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h16
4 files changed, 256 insertions, 1 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
index b82bf8381e4..36fb168d5f1 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
+++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
@@ -5,7 +5,10 @@ PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS protobuf
# protoc-generated files emit compiler warnings that we normally treat as errors.
# Instead of rolling our own compiler plugin we'll pragmatically disable the noise.
-set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override")
+set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override -Wno-inline")
+# protoc explicitly annotates methods with inline, which triggers -Werror=inline when
+# the header file grows over a certain size.
+set_source_files_properties(protocolserialization7.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline")
vespa_add_library(storageapi_mbusprot OBJECT
SOURCES
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
index 7bdda0e54d5..e7dcd5f3457 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
@@ -239,3 +239,50 @@ message GetBucketDiffResponse {
repeated MetaDiffEntry diff = 2;
}
+message ApplyDiffEntry {
+ MetaDiffEntry entry_meta = 1;
+ bytes document_id = 2;
+ bytes header_blob = 3; // TODO use singular blob
+ bytes body_blob = 4;
+}
+
+message ApplyBucketDiffRequest {
+ Bucket bucket = 1;
+ repeated MergeNode nodes = 2;
+ uint32 max_buffer_size = 3;
+ repeated ApplyDiffEntry entries = 4;
+}
+
+message ApplyBucketDiffResponse {
+ BucketId remapped_bucket_id = 1;
+ repeated ApplyDiffEntry entries = 4;
+}
+
+message ExplicitBucketSet {
+ // `Bucket` is not needed, as the space is inferred from the owning message.
+ repeated BucketId bucket_ids = 2;
+}
+
+message AllBuckets {
+ uint32 distributor_index = 1;
+ bytes cluster_state = 2;
+ bytes distribution_hash = 3;
+}
+
+message RequestBucketInfoRequest {
+ BucketSpace bucket_space = 1;
+ oneof request_for {
+ ExplicitBucketSet explicit_bucket_set = 2;
+ AllBuckets all_buckets = 3;
+ }
+ // TODO bucket info version requested
+}
+
+message BucketAndBucketInfo {
+ fixed64 raw_bucket_id = 1;
+ BucketInfo bucket_info = 2;
+}
+
+message RequestBucketInfoResponse {
+ repeated BucketAndBucketInfo bucket_infos = 1;
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 9da60ca39f8..9578f88830e 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -258,6 +258,42 @@ public:
};
template <typename ProtobufType, typename Func>
+void encode_request(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& msg, Func&& f) {
+ RequestEncoder<ProtobufType> enc(out_buf, msg);
+ f(enc.request());
+ enc.encode();
+}
+
+template <typename ProtobufType, typename Func>
+void encode_response(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply, Func&& f) {
+ ResponseEncoder<ProtobufType> enc(out_buf, reply);
+ auto& res = enc.response();
+ f(res);
+ enc.encode();
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageCommand>
+ProtocolSerialization7::decode_request(document::ByteBuffer& in_buf, Func&& f) const {
+ RequestDecoder<ProtobufType> dec(in_buf, loadTypes());
+ const auto& req = dec.request();
+ auto cmd = f(req);
+ dec.transfer_meta_information_to(*cmd);
+ return cmd;
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageReply>
+ProtocolSerialization7::decode_response(document::ByteBuffer& in_buf, Func&& f) const {
+ ResponseDecoder<ProtobufType> dec(in_buf);
+ const auto& res = dec.response();
+ auto reply = f(res);
+ return reply;
+}
+
+// TODO encode in terms of encode_request/response
+
+template <typename ProtobufType, typename Func>
void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) {
RequestEncoder<ProtobufType> enc(out_buf, msg);
set_bucket(*enc.request().mutable_bucket(), msg.getBucket());
@@ -752,6 +788,159 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const S
});
}
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const {
+ encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) {
+ // TODO dedupe
+ for (const auto& src_node : msg.getNodes()) {
+ auto* dest_node = req.add_nodes();
+ dest_node->set_index(src_node.index);
+ dest_node->set_source_only(src_node.sourceOnly);
+ }
+ req.set_max_buffer_size(msg.getMaxBufferSize());
+ // TODO dedupe
+ for (const auto& entry : msg.getDiff()) {
+ auto* proto_entry = req.add_entries();
+ set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry);
+ proto_entry->set_document_id(entry._docName.data(), entry._docName.size());
+ proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size());
+ proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const {
+ encode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, msg, [&](auto& res) {
+ for (const auto& entry : msg.getDiff()) {
+ auto* proto_entry = res.add_entries();
+ set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry);
+ proto_entry->set_document_id(entry._docName.data(), entry._docName.size());
+ proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size());
+ proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size());
+ }
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
+ // TODO dedupe
+ using Node = api::MergeBucketCommand::Node;
+ std::vector<Node> nodes;
+ nodes.reserve(req.nodes_size());
+ for (const auto& node : req.nodes()) {
+ nodes.emplace_back(node.index(), node.source_only());
+ }
+ auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size());
+ auto& diff = cmd->getDiff(); // TODO refactor
+ // TODO refactor, dedupe
+ size_t n_entries = req.entries_size();
+ diff.resize(n_entries);
+ for (size_t i = 0; i < n_entries; ++i) {
+ auto& proto_entry = req.entries(i);
+ auto& dest = diff[i];
+ dest._entry = get_diff_entry(proto_entry.entry_meta());
+ dest._docName = proto_entry.document_id();
+ dest._headerBlob.resize(proto_entry.header_blob().size());
+ memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
+ dest._bodyBlob.resize(proto_entry.body_blob().size());
+ memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size());
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd));
+ auto& diff = reply->getDiff(); // TODO refactor
+ // TODO refactor, dedupe
+ size_t n_entries = res.entries_size();
+ diff.resize(n_entries);
+ for (size_t i = 0; i < n_entries; ++i) {
+ auto& proto_entry = res.entries(i);
+ auto& dest = diff[i];
+ dest._entry = get_diff_entry(proto_entry.entry_meta());
+ dest._docName = proto_entry.document_id();
+ dest._headerBlob.resize(proto_entry.header_blob().size());
+ memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
+ dest._bodyBlob.resize(proto_entry.body_blob().size());
+ memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size());
+ }
+ return reply;
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const {
+ encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) {
+ req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId());
+ auto& buckets = msg.getBuckets();
+ if (!buckets.empty()) {
+ auto* proto_buckets = req.mutable_explicit_bucket_set();
+ for (const auto& b : buckets) {
+ proto_buckets->add_bucket_ids()->set_raw_id(b.getRawId());
+ }
+ } else {
+ auto* all_buckets = req.mutable_all_buckets();
+ auto cluster_state = msg.getSystemState().toString();
+ all_buckets->set_distributor_index(msg.getDistributor());
+ all_buckets->set_cluster_state(cluster_state.data(), cluster_state.size());
+ all_buckets->set_distribution_hash(msg.getDistributionHash().data(), msg.getDistributionHash().size());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const {
+ encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](protobuf::RequestBucketInfoResponse& res) {
+ auto* proto_info = res.mutable_bucket_infos();
+ proto_info->Reserve(msg.getBucketInfo().size());
+ for (const auto& entry : msg.getBucketInfo()) {
+ auto* bucket_and_info = proto_info->Add();
+ bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId());
+ set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info);
+ }
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const {
+ return decode_request<protobuf::RequestBucketInfoRequest>(buf, [&](auto& req) {
+ document::BucketSpace bucket_space(req.bucket_space().space_id());
+ if (req.has_explicit_bucket_set()) {
+ const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size();
+ std::vector<document::BucketId> buckets(n_buckets);
+ const auto& proto_buckets = req.explicit_bucket_set().bucket_ids();
+ for (uint32_t i = 0; i < n_buckets; ++i) {
+ buckets[i] = document::BucketId(proto_buckets.Get(i).raw_id());
+ }
+ return std::make_unique<api::RequestBucketInfoCommand>(bucket_space, std::move(buckets));
+ } else if (req.has_all_buckets()) {
+ auto& all_req = req.all_buckets();
+ return std::make_unique<api::RequestBucketInfoCommand>(
+ bucket_space, all_req.distributor_index(),
+ lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash());
+ } else {
+ throw vespalib::IllegalArgumentException("RequestBucketInfo does not have any applicable fields set");
+ }
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::RequestBucketInfoResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::RequestBucketInfoReply>(static_cast<const api::RequestBucketInfoCommand&>(cmd));
+ auto& dest_entries = reply->getBucketInfo();
+ uint32_t n_entries = res.bucket_infos_size();
+ dest_entries.resize(n_entries);
+ for (uint32_t i = 0; i < n_entries; ++i) {
+ auto& proto_entry = res.bucket_infos(i);
+ dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id());
+ dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info());
+ }
+ return reply;
+ });
+}
+
/*
// -----------------------------------------------------------------
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
index 55d921d8431..66fd3b110ed 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
@@ -73,8 +73,24 @@ public:
SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override;
SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override;
+ // ApplyBucketDiff
+ void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override;
+ void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override;
+ SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override;
+ SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override;
+
+ // RequestBucketInfo
+ void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override;
+ void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override;
+ SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override;
+ SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override;
+
private:
template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageReply> decode_response(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const;
template <typename ProtobufType, typename Func>
std::unique_ptr<api::StorageReply> decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const;