diff options
Diffstat (limited to 'storageapi/src')
15 files changed, 189 insertions, 18 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index 37159ab0011..636f9b1f701 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/internal.h> #include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/mbusprot/storageprotocol.h> #include <vespa/storageapi/mbusprot/storagecommand.h> #include <vespa/storageapi/mbusprot/storagereply.h> @@ -567,6 +568,24 @@ TEST_P(StorageProtocolTest, remove_location) { } } +TEST_P(StorageProtocolTest, stat_bucket) { + if (GetParam().getMajor() < 7) { + return; // Only available for protobuf-backed protocol version. + } + auto cmd = std::make_shared<StatBucketCommand>(_bucket, "id.group == 'mygroup'"); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ("id.group == 'mygroup'", cmd2->getDocumentSelection()); + EXPECT_EQ(_bucket, cmd2->getBucket()); + + auto reply = std::make_shared<StatBucketReply>(*cmd2, "neat bucket info goes here"); + reply->remapBucketId(_dummy_remap_bucket); + auto reply2 = copyReply(reply); + EXPECT_EQ(reply2->getResults(), "neat bucket info goes here"); + EXPECT_TRUE(reply2->hasBeenRemapped()); + EXPECT_EQ(_dummy_remap_bucket, reply2->getBucketId()); + EXPECT_EQ(_bucket_id, reply2->getOriginalBucketId()); +} + TEST_P(StorageProtocolTest, create_visitor) { std::vector<document::BucketId> buckets; buckets.push_back(document::BucketId(16, 1)); diff --git a/storageapi/src/tests/messageapi/CMakeLists.txt b/storageapi/src/tests/messageapi/CMakeLists.txt index 4833dc45acf..50f0b306191 100644 --- a/storageapi/src/tests/messageapi/CMakeLists.txt +++ b/storageapi/src/tests/messageapi/CMakeLists.txt @@ -1,5 +1,8 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(storageapi_testmessageapi INTERFACE +vespa_add_library(storageapi_testmessageapi SOURCES + storage_message_address_test.cpp DEPENDS + storageapi + GTest::GTest ) diff --git a/storageapi/src/tests/messageapi/storage_message_address_test.cpp b/storageapi/src/tests/messageapi/storage_message_address_test.cpp new file mode 100644 index 00000000000..c340cba4b28 --- /dev/null +++ b/storageapi/src/tests/messageapi/storage_message_address_test.cpp @@ -0,0 +1,36 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace ::testing; + +namespace storage::api { + +namespace { + +size_t hash_of(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index) { + return StorageMessageAddress(cluster, type, index).internal_storage_hash(); +} + +} + +TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) { + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::STORAGE, 0)); + EXPECT_EQ(hash_of("foo", lib::NodeType::DISTRIBUTOR, 0), + hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 123), + hash_of("foo", lib::NodeType::STORAGE, 123)); + + // These tests are all true with extremely high probability, though they do + // depend on a hash function that may inherently cause collisions. + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("bar", lib::NodeType::STORAGE, 0)); + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::STORAGE, 1)); +} + +} // storage::api diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt index b749844775d..ce5294a7470 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt @@ -4,6 +4,7 @@ find_package(Protobuf REQUIRED) PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS protobuf/common.proto protobuf/feed.proto + protobuf/inspect.proto protobuf/visiting.proto protobuf/maintenance.proto) diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/inspect.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/inspect.proto new file mode 100644 index 00000000000..1bf833e4bf6 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/inspect.proto @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.mbusprot.protobuf; + +import "common.proto"; + +message StatBucketRequest { + Bucket bucket = 1; + bytes document_selection = 2; +} + +message StatBucketResponse { + BucketId remapped_bucket_id = 1; + bytes results = 2; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h index ed7fbc0bdf9..289e5dc355c 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h @@ -9,6 +9,7 @@ #endif #include "feed.pb.h" +#include "inspect.pb.h" #include "visiting.pb.h" #include "maintenance.pb.h" diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp index 917b60c50c3..a53bd415c8e 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp @@ -7,6 +7,7 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/stat.h> #include <vespa/vespalib/util/exceptions.h> @@ -120,6 +121,12 @@ ProtocolSerialization::encode(const api::StorageMessage& msg) const case api::MessageType::VISITOR_DESTROY_REPLY_ID: onEncode(buf, static_cast<const api::DestroyVisitorReply&>(msg)); break; + case api::MessageType::STATBUCKET_ID: + onEncode(buf, static_cast<const api::StatBucketCommand&>(msg)); + break; + case api::MessageType::STATBUCKET_REPLY_ID: + onEncode(buf, static_cast<const api::StatBucketReply&>(msg)); + break; case api::MessageType::REMOVELOCATION_ID: onEncode(buf, static_cast<const api::RemoveLocationCommand&>(msg)); break; @@ -191,6 +198,8 @@ ProtocolSerialization::decodeCommand(mbus::BlobRef data) const cmd = onDecodeCreateVisitorCommand(buf); break; case api::MessageType::VISITOR_DESTROY_ID: cmd = onDecodeDestroyVisitorCommand(buf); break; + case api::MessageType::STATBUCKET_ID: + cmd = onDecodeStatBucketCommand(buf); break; case api::MessageType::REMOVELOCATION_ID: cmd = onDecodeRemoveLocationCommand(buf); break; case api::MessageType::SETBUCKETSTATE_ID: @@ -253,6 +262,8 @@ ProtocolSerialization::decodeReply(mbus::BlobRef data, const api::StorageCommand reply = onDecodeCreateVisitorReply(cmd, buf); break; case api::MessageType::VISITOR_DESTROY_REPLY_ID: reply = onDecodeDestroyVisitorReply(cmd, buf); break; + case api::MessageType::STATBUCKET_REPLY_ID: + reply = onDecodeStatBucketReply(cmd, buf); break; case api::MessageType::REMOVELOCATION_REPLY_ID: reply = onDecodeRemoveLocationReply(cmd, buf); break; case api::MessageType::SETBUCKETSTATE_REPLY_ID: diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h index a57627b9ba9..569ff99c11f 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h @@ -43,6 +43,8 @@ class NotifyBucketChangeCommand; class NotifyBucketChangeReply; class SplitBucketCommand; class SplitBucketReply; +class StatBucketCommand; +class StatBucketReply; class JoinBucketsCommand; class JoinBucketsReply; class SetBucketStateCommand; @@ -111,6 +113,8 @@ protected: virtual void onEncode(GBBuf&, const api::DestroyVisitorReply&) const = 0; virtual void onEncode(GBBuf&, const api::RemoveLocationCommand&) const = 0; virtual void onEncode(GBBuf&, const api::RemoveLocationReply&) const = 0; + virtual void onEncode(GBBuf&, const api::StatBucketCommand&) const = 0; + virtual void onEncode(GBBuf&, const api::StatBucketReply&) const = 0; virtual SCmd::UP onDecodePutCommand(BBuf&) const = 0; virtual SRep::UP onDecodePutReply(const SCmd&, BBuf&) const = 0; @@ -148,6 +152,8 @@ protected: virtual SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const = 0; virtual SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const = 0; + virtual SCmd::UP onDecodeStatBucketCommand(BBuf&) const = 0; + virtual SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const = 0; }; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp index 8deb689d3a8..13fba8b8508 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp @@ -488,6 +488,24 @@ ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf return msg; } +void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketCommand&) const { + throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC); +} + +api::StorageCommand::UP +ProtocolSerialization4_2::onDecodeStatBucketCommand(BBuf&) const { + throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC); +} + +void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketReply&) const { + throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC); +} + +api::StorageReply::UP +ProtocolSerialization4_2::onDecodeStatBucketReply(const SCmd&, BBuf&) const { + throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC); +} + // Utility functions for serialization void diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h index e4ab36dc989..3ae1770f3c2 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h @@ -26,6 +26,8 @@ protected: void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override; void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override; void onEncode(GBBuf&, const api::RemoveLocationReply&) const override; + void onEncode(GBBuf&, const api::StatBucketCommand&) const override; + void onEncode(GBBuf&, const api::StatBucketReply&) const override; // Not supported on 4.2, but implemented here for simplicity. void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override; @@ -56,6 +58,8 @@ protected: SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override; SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; + SCmd::UP onDecodeStatBucketCommand(BBuf&) const override; + SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const override; virtual void onDecodeBucketInfoCommand(BBuf&, api::BucketInfoCommand&) const; virtual void onDecodeBucketInfoReply(BBuf&, api::BucketInfoReply&) const = 0; diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 8ea946eede4..166925382bd 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -10,6 +10,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/visitor.h> +#include <vespa/storageapi/message/stat.h> namespace storage::mbusprot { @@ -1313,4 +1314,32 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const }); } +// ----------------------------------------------------------------- +// StatBucket +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::StatBucketCommand& msg) const { + encode_bucket_request<protobuf::StatBucketRequest>(buf, msg, [&](auto& req) { + req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::StatBucketReply& msg) const { + encode_bucket_response<protobuf::StatBucketResponse>(buf, msg, [&](auto& res) { + res.set_results(msg.getResults().data(), msg.getResults().size()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeStatBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::StatBucketRequest>(buf, [&](auto& req, auto& bucket) { + return std::make_unique<api::StatBucketCommand>(bucket, req.document_selection()); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeStatBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::StatBucketResponse>(buf, [&](auto& res) { + return std::make_unique<api::StatBucketReply>(static_cast<const api::StatBucketCommand&>(cmd), res.results()); + }); +} + } // storage::mbusprot diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h index bb7f0308efa..e1d08691bc1 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h @@ -129,6 +129,12 @@ public: SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; + // StatBucket + void onEncode(GBBuf&, const api::StatBucketCommand&) const override; + void onEncode(GBBuf&, const api::StatBucketReply&) const override; + SCmd::UP onDecodeStatBucketCommand(BBuf&) const override; + SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const override; + private: template <typename ProtobufType, typename Func> std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const; diff --git a/storageapi/src/vespa/storageapi/message/stat.h b/storageapi/src/vespa/storageapi/message/stat.h index 9020d16622a..a7d9a30ca6e 100644 --- a/storageapi/src/vespa/storageapi/message/stat.h +++ b/storageapi/src/vespa/storageapi/message/stat.h @@ -23,7 +23,7 @@ private: public: StatBucketCommand(const document::Bucket &bucket, vespalib::stringref documentSelection); - ~StatBucketCommand(); + ~StatBucketCommand() override; const vespalib::string& getDocumentSelection() const { return _docSelection; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; @@ -33,8 +33,8 @@ public: class StatBucketReply : public BucketReply { vespalib::string _results; public: - StatBucketReply(const StatBucketCommand&, vespalib::stringref results = ""); - const vespalib::string& getResults() { return _results; } + explicit StatBucketReply(const StatBucketCommand&, vespalib::stringref results = ""); + const vespalib::string& getResults() const noexcept { return _results; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGEREPLY(StatBucketReply, onStatBucketReply) }; @@ -51,7 +51,7 @@ public: */ class GetBucketListCommand : public BucketCommand { public: - GetBucketListCommand(const document::Bucket &bucket); + explicit GetBucketListCommand(const document::Bucket &bucket); void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(GetBucketListCommand, onGetBucketList); }; @@ -78,8 +78,8 @@ private: std::vector<BucketInfo> _buckets; public: - GetBucketListReply(const GetBucketListCommand&); - ~GetBucketListReply(); + explicit GetBucketListReply(const GetBucketListCommand&); + ~GetBucketListReply() override; std::vector<BucketInfo>& getBuckets() { return _buckets; } const std::vector<BucketInfo>& getBuckets() const { return _buckets; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index d1bd24f5087..1f1a2c602de 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -6,6 +6,7 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/stllike/hash_fun.h> #include <sstream> #include <cassert> #include <atomic> @@ -141,8 +142,8 @@ MessageType::print(std::ostream& out, bool verbose, const std::string& indent) c StorageMessageAddress::StorageMessageAddress(const mbus::Route& route) : _route(route), - _retryEnabled(false), _protocol(DOCUMENT), + _precomputed_storage_hash(0), _cluster(""), _type(nullptr), _index(0xFFFF) @@ -160,17 +161,34 @@ createAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t i return os.str(); } +// TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available +StorageMessageAddress::StorageMessageAddress() + : _route(), + _protocol(Protocol::STORAGE), + _precomputed_storage_hash(0), + _cluster(), + _type(nullptr), + _index(0) +{} + + StorageMessageAddress::StorageMessageAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index, Protocol protocol) : _route(), - _retryEnabled(false), _protocol(protocol), + _precomputed_storage_hash(0), _cluster(cluster), _type(&type), _index(index) { std::vector<mbus::IHopDirective::SP> directives; - directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(createAddress(cluster, type, index))); + auto address_as_str = createAddress(cluster, type, index); + // We reuse the string representation and pass it to vespalib's hashValue instead of + // explicitly combining a running hash over the individual fields. This is because + // hashValue internally uses xxhash, which offers great dispersion of bits even for + // minimal changes in the input (such as single bit differences in the index). + _precomputed_storage_hash = vespalib::hashValue(address_as_str.data(), address_as_str.size()); + directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str))); _route.addHop(mbus::Hop(std::move(directives), false)); } @@ -207,12 +225,11 @@ bool StorageMessageAddress::operator==(const StorageMessageAddress& other) const { if (_protocol != other._protocol) return false; - if (_retryEnabled != other._retryEnabled) return false; if (_type != other._type) return false; if (_type) { - if (_cluster != other._cluster) return false; if (_index != other._index) return false; if (_type != other._type) return false; + if (_cluster != other._cluster) return false; } return true; } @@ -234,9 +251,6 @@ StorageMessageAddress::print(vespalib::asciistream & out) const } else { out << "Document protocol"; } - if (_retryEnabled) { - out << ", retry enabled"; - } if (!_type) { out << ", " << _route.toString() << ")"; } else { diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index 415bd7717f2..85d4e072171 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -269,14 +269,15 @@ public: private: mbus::Route _route; - bool _retryEnabled; Protocol _protocol; // Used for internal VDS addresses only + size_t _precomputed_storage_hash; vespalib::string _cluster; const lib::NodeType* _type; uint16_t _index; public: + StorageMessageAddress(); // Only to be used when transient default ctor semantics are needed by containers StorageMessageAddress(const mbus::Route& route); StorageMessageAddress(vespalib::stringref clusterName, const lib::NodeType& type, uint16_t index, @@ -284,15 +285,18 @@ public: ~StorageMessageAddress(); void setProtocol(Protocol p) { _protocol = p; } - void enableRetry(bool enable = true) { _retryEnabled = enable; } const mbus::Route& getRoute() const { return _route; } - bool retryEnabled() const { return _retryEnabled; } Protocol getProtocol() const { return _protocol; } uint16_t getIndex() const; const lib::NodeType& getNodeType() const; const vespalib::string& getCluster() const; + // Returns precomputed hash over <cluster, type, index> tuple. Other fields not included. + [[nodiscard]] size_t internal_storage_hash() const noexcept { + return _precomputed_storage_hash; + } + bool operator==(const StorageMessageAddress& other) const; vespalib::string toString() const; friend std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr); |