summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-04 11:44:21 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-05 11:27:32 +0000
commit0fd9e1693eccc7a8aea1db61871821121cf514d2 (patch)
treec6174d3bc63688135e8dba44692fb3f9f2c470dc /storageapi
parent7c13c34df95c0a5d66f6d4c91bfab508b2425117 (diff)
Implement NotifyBucketChange, SplitBucket and JoinBuckets
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto33
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp109
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h18
3 files changed, 158 insertions, 2 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
index e7dcd5f3457..19362bf40db 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
@@ -286,3 +286,36 @@ message BucketAndBucketInfo {
message RequestBucketInfoResponse {
repeated BucketAndBucketInfo bucket_infos = 1;
}
+
+message NotifyBucketChangeRequest {
+ Bucket bucket = 1;
+ BucketInfo bucket_info = 2;
+}
+
+message NotifyBucketChangeResponse {
+ // Currently empty
+}
+
+message SplitBucketRequest {
+ Bucket bucket = 1;
+ uint32 min_split_bits = 2;
+ uint32 max_split_bits = 3;
+ uint32 min_byte_size = 4;
+ uint32 min_doc_count = 5;
+}
+
+message SplitBucketResponse {
+ BucketId remapped_bucket_id = 1;
+ repeated BucketAndBucketInfo split_info = 2;
+}
+
+message JoinBucketsRequest {
+ Bucket bucket = 1;
+ repeated BucketId source_buckets = 2;
+ uint32 min_join_bits = 3;
+}
+
+message JoinBucketsResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 9578f88830e..0f9f5908ca9 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -8,6 +8,7 @@
#include "protocolserialization7.h"
#include "serializationhelper.h"
#include "storageapi.pb.h"
+#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/document/util/bufferexceptions.h>
@@ -721,7 +722,7 @@ void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffComma
api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) {
api::GetBucketDiffCommand::Entry e;
e._timestamp = src.timestamp();
- e._gid = get_global_id(src.gid()); // TODO need presence check?
+ e._gid = get_global_id(src.gid());
e._headerSize = src.header_size();
e._bodySize = src.body_size();
e._flags = src.flags();
@@ -893,7 +894,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCo
}
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const {
- encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](protobuf::RequestBucketInfoResponse& res) {
+ encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](auto& res) {
auto* proto_info = res.mutable_bucket_infos();
proto_info->Reserve(msg.getBucketInfo().size());
for (const auto& entry : msg.getBucketInfo()) {
@@ -941,6 +942,109 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con
});
}
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const {
+ encode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, msg, [&](auto& req) {
+ set_bucket_info(*req.mutable_bucket_info(), msg.getBucketInfo());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const {
+ encode_response<protobuf::NotifyBucketChangeResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeNotifyBucketChangeCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, [&](auto& req, auto& bucket) {
+ auto bucket_info = get_bucket_info(req.bucket_info());
+ return std::make_unique<api::NotifyBucketChangeCommand>(bucket, bucket_info);
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::NotifyBucketChangeResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::NotifyBucketChangeReply>(static_cast<const api::NotifyBucketChangeCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const {
+ encode_bucket_request<protobuf::SplitBucketRequest>(buf, msg, [&](auto& req) {
+ req.set_min_split_bits(msg.getMinSplitBits());
+ req.set_max_split_bits(msg.getMaxSplitBits());
+ req.set_min_byte_size(msg.getMinByteSize());
+ req.set_min_doc_count(msg.getMinDocCount());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const {
+ encode_bucket_response<protobuf::SplitBucketResponse>(buf, msg, [&](auto& res) {
+ for (const auto& split_info : msg.getSplitInfo()) {
+ auto* proto_info = res.add_split_info();
+ proto_info->set_raw_bucket_id(split_info.first.getRawId());
+ set_bucket_info(*proto_info->mutable_bucket_info(), split_info.second);
+ }
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeSplitBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::SplitBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::SplitBucketCommand>(bucket);
+ cmd->setMinSplitBits(static_cast<uint8_t>(req.min_split_bits()));
+ cmd->setMaxSplitBits(static_cast<uint8_t>(req.max_split_bits()));
+ cmd->setMinByteSize(req.min_byte_size());
+ cmd->setMinDocCount(req.min_doc_count());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::SplitBucketResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::SplitBucketReply>(static_cast<const api::SplitBucketCommand&>(cmd));
+ auto& dest_info = reply->getSplitInfo();
+ dest_info.reserve(res.split_info_size());
+ for (const auto& proto_info : res.split_info()) {
+ dest_info.emplace_back(document::BucketId(proto_info.raw_bucket_id()),
+ get_bucket_info(proto_info.bucket_info()));
+ }
+ return reply;
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const {
+ encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](protobuf::JoinBucketsRequest& req) {
+ for (const auto& source : msg.getSourceBuckets()) {
+ req.add_source_buckets()->set_raw_id(source.getRawId());
+ }
+ req.set_min_join_bits(msg.getMinJoinBits());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const {
+ encode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::JoinBucketsRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::JoinBucketsCommand>(bucket);
+ auto& entries = cmd->getSourceBuckets();
+ for (const auto& proto_bucket : req.source_buckets()) {
+ entries.emplace_back(proto_bucket.raw_id());
+ }
+ cmd->setMinJoinBits(req.min_join_bits());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::JoinBucketsReply>(static_cast<const api::JoinBucketsCommand&>(cmd));
+ });
+}
+
/*
// -----------------------------------------------------------------
@@ -971,6 +1075,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeReply(const SCmd& cmd, BBu
* TODO extend testing of:
* - bucket info in responses
* - bucket remapping in responses
+ * - presence of fields
*/
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
index 66fd3b110ed..5c527f8d032 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
@@ -85,6 +85,24 @@ public:
SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override;
SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override;
+ // NotifyBucketChange
+ void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override;
+ void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override;
+ SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override;
+ SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override;
+
+ // SplitBucket
+ void onEncode(GBBuf&, const api::SplitBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::SplitBucketReply&) const override;
+ SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override;
+
+ // JoinBuckets
+ void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override;
+ void onEncode(GBBuf&, const api::JoinBucketsReply&) const override;
+ SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override;
+ SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override;
+
private:
template <typename ProtobufType, typename Func>
std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const;