diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-07 07:34:23 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-07 07:35:25 +0000 |
commit | 1c72b80691b2085de2bbdcef88f0ff76d951abce (patch) | |
tree | 99c8284169f2d558c0d4ca7e0dbf987020ea620b | |
parent | 491eae8d54a7c0c1b98560f71e5fd5c7519c9512 (diff) |
Use steady_time for timing out old configurations
-rw-r--r-- | storage/src/vespa/storage/storageserver/communicationmanager.cpp | 39 | ||||
-rw-r--r-- | storage/src/vespa/storage/storageserver/communicationmanager.h | 7 |
2 files changed, 22 insertions, 24 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 0680c10ab29..ec22d7c064e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -9,7 +9,6 @@ #include <vespa/slobrok/sbmirror.h> #include <vespa/storage/common/bucket_resolver.h> #include <vespa/storage/common/nodestateupdater.h> -#include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageserver/configurable_bucket_resolver.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h> @@ -49,13 +48,14 @@ CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageRepl } namespace { - vespalib::string getNodeId(StorageComponent& sc) { - vespalib::asciistream ost; - ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex(); - return ost.str(); - } - framework::SecondTime TEN_MINUTES(600); +vespalib::string getNodeId(StorageComponent& sc) { + vespalib::asciistream ost; + ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex(); + return ost.str(); +} + +vespalib::duration TEN_MINUTES = 600s; } @@ -151,8 +151,7 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) std::shared_ptr<api::StorageCommand> originalCommand; { std::lock_guard lock(_messageBusSentLock); - using MessageMap = std::map<api::StorageMessage::Id, api::StorageCommand::SP>; - MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64)); + auto iter(_messageBusSent.find(reply->getContext().value.UINT64)); if (iter != _messageBusSent.end()) { originalCommand.swap(iter->second); _messageBusSent.erase(iter); @@ -193,13 +192,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space( namespace { struct PlaceHolderBucketResolver : public BucketResolver { - document::Bucket bucketFromId(const document::DocumentId &) const override { - return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0)); + [[nodiscard]] document::Bucket bucketFromId(const document::DocumentId &) const override { + return {FixedBucketSpaces::default_space(), document::BucketId(0)}; } - document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { + [[nodiscard]] document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { return FixedBucketSpaces::default_space(); } - vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { + [[nodiscard]] vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { assert(bucketSpace == FixedBucketSpaces::default_space()); return FixedBucketSpaces::to_string(bucketSpace); } @@ -438,7 +437,7 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - process(std::move(msg)); + process(msg); } void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) { @@ -451,7 +450,7 @@ CommunicationManager::onUp(const std::shared_ptr<api::StorageMessage> & msg) { MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString()); if (msg->getType().isReply()) { - const api::StorageReply & m = static_cast<const api::StorageReply&>(*msg); + const auto & m = static_cast<const api::StorageReply&>(*msg); if (m.getResult().failed()) { LOG(debug, "Request %s failed: %s", msg->getType().toString().c_str(), m.getResult().toString().c_str()); } @@ -604,7 +603,7 @@ CommunicationManager::sendDirectRPCReply( request.addReturnString(m.data(), m.size()); if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) { - api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); + auto& gns(static_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, false); request.addReturnString(ns.str().c_str()); @@ -693,9 +692,9 @@ CommunicationManager::run(framework::ThreadHandle& thread) process(msg); } std::lock_guard<std::mutex> guard(_earlierGenerationsLock); - for (EarlierProtocols::iterator it(_earlierGenerations.begin()); + for (auto it(_earlierGenerations.begin()); !_earlierGenerations.empty() && - ((it->first + TEN_MINUTES) < _component.getClock().getTimeInSeconds()); + ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime()); it = _earlierGenerations.begin()) { _earlierGenerations.erase(it); @@ -718,10 +717,10 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string& void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo>& repo) { if (_mbus) { - framework::SecondTime now(_component.getClock().getTimeInSeconds()); + vespalib::steady_time now(_component.getClock().getMonotonicTime()); auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo); std::lock_guard<std::mutex> guard(_earlierGenerationsLock); - _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol))); + _earlierGenerations.emplace_back(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)); } if (_message_codec_provider) { _message_codec_provider->update_atomically(repo); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 6f953411cac..e83a6517c45 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -72,9 +72,6 @@ class CommunicationManager final public MessageDispatcher { private: - CommunicationManager(const CommunicationManager&); - CommunicationManager& operator=(const CommunicationManager&); - StorageComponent _component; CommunicationManagerMetrics _metrics; @@ -85,7 +82,7 @@ private: Queue _eventQueue; // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? std::unique_ptr<config::ConfigFetcher> _configFetcher; - using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>; + using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>; using EarlierProtocols = std::vector<EarlierProtocol>; std::mutex _earlierGenerationsLock; EarlierProtocols _earlierGenerations; @@ -126,6 +123,8 @@ private: friend struct CommunicationManagerTest; public: + CommunicationManager(const CommunicationManager&) = delete; + CommunicationManager& operator=(const CommunicationManager&) = delete; CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri); ~CommunicationManager() override; |