diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-24 14:38:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-24 14:38:15 +0200 |
commit | 48c99a9427263c1a13f2b5d3857dc29c6179197b (patch) | |
tree | 9f091f2daec8a230ed9888000e43df4081dbb402 /storageapi | |
parent | df62e7f9e691fe4c1b2af83a9439afff74aa5e1d (diff) | |
parent | 7611e6c6ae1562679545ed6cf590d68dee5fc474 (diff) |
Merge pull request #14524 from vespa-engine/vekterli/add-statbucket-storage-protocol-serialization-support
Add StatBucket storage protocol (de-)serialization support
Diffstat (limited to 'storageapi')
11 files changed, 120 insertions, 6 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index 37159ab0011..636f9b1f701 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/internal.h> #include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/mbusprot/storageprotocol.h> #include <vespa/storageapi/mbusprot/storagecommand.h> #include <vespa/storageapi/mbusprot/storagereply.h> @@ -567,6 +568,24 @@ TEST_P(StorageProtocolTest, remove_location) { } } +TEST_P(StorageProtocolTest, stat_bucket) { + if (GetParam().getMajor() < 7) { + return; // Only available for protobuf-backed protocol version. + } + auto cmd = std::make_shared<StatBucketCommand>(_bucket, "id.group == 'mygroup'"); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ("id.group == 'mygroup'", cmd2->getDocumentSelection()); + EXPECT_EQ(_bucket, cmd2->getBucket()); + + auto reply = std::make_shared<StatBucketReply>(*cmd2, "neat bucket info goes here"); + reply->remapBucketId(_dummy_remap_bucket); + auto reply2 = copyReply(reply); + EXPECT_EQ(reply2->getResults(), "neat bucket info goes here"); + EXPECT_TRUE(reply2->hasBeenRemapped()); + EXPECT_EQ(_dummy_remap_bucket, reply2->getBucketId()); + EXPECT_EQ(_bucket_id, reply2->getOriginalBucketId()); +} + TEST_P(StorageProtocolTest, create_visitor) { std::vector<document::BucketId> buckets; buckets.push_back(document::BucketId(16, 1)); diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt index b749844775d..ce5294a7470 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt @@ -4,6 +4,7 @@ find_package(Protobuf REQUIRED) PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS protobuf/common.proto protobuf/feed.proto + protobuf/inspect.proto protobuf/visiting.proto protobuf/maintenance.proto) diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/inspect.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/inspect.proto new file mode 100644 index 00000000000..1bf833e4bf6 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/inspect.proto @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.mbusprot.protobuf; + +import "common.proto"; + +message StatBucketRequest { + Bucket bucket = 1; + bytes document_selection = 2; +} + +message StatBucketResponse { + BucketId remapped_bucket_id = 1; + bytes results = 2; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h index ed7fbc0bdf9..289e5dc355c 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h @@ -9,6 +9,7 @@ #endif #include "feed.pb.h" +#include "inspect.pb.h" #include "visiting.pb.h" #include "maintenance.pb.h" diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp index 917b60c50c3..a53bd415c8e 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp @@ -7,6 +7,7 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/stat.h> #include <vespa/vespalib/util/exceptions.h> @@ -120,6 +121,12 @@ ProtocolSerialization::encode(const api::StorageMessage& msg) const case api::MessageType::VISITOR_DESTROY_REPLY_ID: onEncode(buf, static_cast<const api::DestroyVisitorReply&>(msg)); break; + case api::MessageType::STATBUCKET_ID: + onEncode(buf, static_cast<const api::StatBucketCommand&>(msg)); + break; + case api::MessageType::STATBUCKET_REPLY_ID: + onEncode(buf, static_cast<const api::StatBucketReply&>(msg)); + break; case api::MessageType::REMOVELOCATION_ID: onEncode(buf, static_cast<const api::RemoveLocationCommand&>(msg)); break; @@ -191,6 +198,8 @@ ProtocolSerialization::decodeCommand(mbus::BlobRef data) const cmd = onDecodeCreateVisitorCommand(buf); break; case api::MessageType::VISITOR_DESTROY_ID: cmd = onDecodeDestroyVisitorCommand(buf); break; + case api::MessageType::STATBUCKET_ID: + cmd = onDecodeStatBucketCommand(buf); break; case api::MessageType::REMOVELOCATION_ID: cmd = onDecodeRemoveLocationCommand(buf); break; case api::MessageType::SETBUCKETSTATE_ID: @@ -253,6 +262,8 @@ ProtocolSerialization::decodeReply(mbus::BlobRef data, const api::StorageCommand reply = onDecodeCreateVisitorReply(cmd, buf); break; case api::MessageType::VISITOR_DESTROY_REPLY_ID: reply = onDecodeDestroyVisitorReply(cmd, buf); break; + case api::MessageType::STATBUCKET_REPLY_ID: + reply = onDecodeStatBucketReply(cmd, buf); break; case api::MessageType::REMOVELOCATION_REPLY_ID: reply = onDecodeRemoveLocationReply(cmd, buf); break; case api::MessageType::SETBUCKETSTATE_REPLY_ID: diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h index a57627b9ba9..569ff99c11f 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h @@ -43,6 +43,8 @@ class NotifyBucketChangeCommand; class NotifyBucketChangeReply; class SplitBucketCommand; class SplitBucketReply; +class StatBucketCommand; +class StatBucketReply; class JoinBucketsCommand; class JoinBucketsReply; class SetBucketStateCommand; @@ -111,6 +113,8 @@ protected: virtual void onEncode(GBBuf&, const api::DestroyVisitorReply&) const = 0; virtual void onEncode(GBBuf&, const api::RemoveLocationCommand&) const = 0; virtual void onEncode(GBBuf&, const api::RemoveLocationReply&) const = 0; + virtual void onEncode(GBBuf&, const api::StatBucketCommand&) const = 0; + virtual void onEncode(GBBuf&, const api::StatBucketReply&) const = 0; virtual SCmd::UP onDecodePutCommand(BBuf&) const = 0; virtual SRep::UP onDecodePutReply(const SCmd&, BBuf&) const = 0; @@ -148,6 +152,8 @@ protected: virtual SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const = 0; virtual SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const = 0; + virtual SCmd::UP onDecodeStatBucketCommand(BBuf&) const = 0; + virtual SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const = 0; }; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp index 8deb689d3a8..13fba8b8508 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp @@ -488,6 +488,24 @@ ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf return msg; } +void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketCommand&) const { + throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC); +} + +api::StorageCommand::UP +ProtocolSerialization4_2::onDecodeStatBucketCommand(BBuf&) const { + throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC); +} + +void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketReply&) const { + throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC); +} + +api::StorageReply::UP +ProtocolSerialization4_2::onDecodeStatBucketReply(const SCmd&, BBuf&) const { + throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC); +} + // Utility functions for serialization void diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h index e4ab36dc989..3ae1770f3c2 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h @@ -26,6 +26,8 @@ protected: void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override; void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override; void onEncode(GBBuf&, const api::RemoveLocationReply&) const override; + void onEncode(GBBuf&, const api::StatBucketCommand&) const override; + void onEncode(GBBuf&, const api::StatBucketReply&) const override; // Not supported on 4.2, but implemented here for simplicity. void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override; @@ -56,6 +58,8 @@ protected: SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override; SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; + SCmd::UP onDecodeStatBucketCommand(BBuf&) const override; + SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const override; virtual void onDecodeBucketInfoCommand(BBuf&, api::BucketInfoCommand&) const; virtual void onDecodeBucketInfoReply(BBuf&, api::BucketInfoReply&) const = 0; diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 8ea946eede4..166925382bd 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -10,6 +10,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/visitor.h> +#include <vespa/storageapi/message/stat.h> namespace storage::mbusprot { @@ -1313,4 +1314,32 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const }); } +// ----------------------------------------------------------------- +// StatBucket +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::StatBucketCommand& msg) const { + encode_bucket_request<protobuf::StatBucketRequest>(buf, msg, [&](auto& req) { + req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::StatBucketReply& msg) const { + encode_bucket_response<protobuf::StatBucketResponse>(buf, msg, [&](auto& res) { + res.set_results(msg.getResults().data(), msg.getResults().size()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeStatBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::StatBucketRequest>(buf, [&](auto& req, auto& bucket) { + return std::make_unique<api::StatBucketCommand>(bucket, req.document_selection()); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeStatBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::StatBucketResponse>(buf, [&](auto& res) { + return std::make_unique<api::StatBucketReply>(static_cast<const api::StatBucketCommand&>(cmd), res.results()); + }); +} + } // storage::mbusprot diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h index bb7f0308efa..e1d08691bc1 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h @@ -129,6 +129,12 @@ public: SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; + // StatBucket + void onEncode(GBBuf&, const api::StatBucketCommand&) const override; + void onEncode(GBBuf&, const api::StatBucketReply&) const override; + SCmd::UP onDecodeStatBucketCommand(BBuf&) const override; + SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const override; + private: template <typename ProtobufType, typename Func> std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const; diff --git a/storageapi/src/vespa/storageapi/message/stat.h b/storageapi/src/vespa/storageapi/message/stat.h index 9020d16622a..a7d9a30ca6e 100644 --- a/storageapi/src/vespa/storageapi/message/stat.h +++ b/storageapi/src/vespa/storageapi/message/stat.h @@ -23,7 +23,7 @@ private: public: StatBucketCommand(const document::Bucket &bucket, vespalib::stringref documentSelection); - ~StatBucketCommand(); + ~StatBucketCommand() override; const vespalib::string& getDocumentSelection() const { return _docSelection; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; @@ -33,8 +33,8 @@ public: class StatBucketReply : public BucketReply { vespalib::string _results; public: - StatBucketReply(const StatBucketCommand&, vespalib::stringref results = ""); - const vespalib::string& getResults() { return _results; } + explicit StatBucketReply(const StatBucketCommand&, vespalib::stringref results = ""); + const vespalib::string& getResults() const noexcept { return _results; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGEREPLY(StatBucketReply, onStatBucketReply) }; @@ -51,7 +51,7 @@ public: */ class GetBucketListCommand : public BucketCommand { public: - GetBucketListCommand(const document::Bucket &bucket); + explicit GetBucketListCommand(const document::Bucket &bucket); void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(GetBucketListCommand, onGetBucketList); }; @@ -78,8 +78,8 @@ private: std::vector<BucketInfo> _buckets; public: - GetBucketListReply(const GetBucketListCommand&); - ~GetBucketListReply(); + explicit GetBucketListReply(const GetBucketListCommand&); + ~GetBucketListReply() override; std::vector<BucketInfo>& getBuckets() { return _buckets; } const std::vector<BucketInfo>& getBuckets() const { return _buckets; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; |