aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-07 07:34:23 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2023-02-07 07:35:25 +0000
commit1c72b80691b2085de2bbdcef88f0ff76d951abce (patch)
tree99c8284169f2d558c0d4ca7e0dbf987020ea620b
parent491eae8d54a7c0c1b98560f71e5fd5c7519c9512 (diff)
Use steady_time for timing out old configurations
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp39
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h7
2 files changed, 22 insertions, 24 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 0680c10ab29..ec22d7c064e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -9,7 +9,6 @@
#include <vespa/slobrok/sbmirror.h>
#include <vespa/storage/common/bucket_resolver.h>
#include <vespa/storage/common/nodestateupdater.h>
-#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/storageserver/configurable_bucket_resolver.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h>
@@ -49,13 +48,14 @@ CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageRepl
}
namespace {
- vespalib::string getNodeId(StorageComponent& sc) {
- vespalib::asciistream ost;
- ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex();
- return ost.str();
- }
- framework::SecondTime TEN_MINUTES(600);
+vespalib::string getNodeId(StorageComponent& sc) {
+ vespalib::asciistream ost;
+ ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex();
+ return ost.str();
+}
+
+vespalib::duration TEN_MINUTES = 600s;
}
@@ -151,8 +151,7 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply)
std::shared_ptr<api::StorageCommand> originalCommand;
{
std::lock_guard lock(_messageBusSentLock);
- using MessageMap = std::map<api::StorageMessage::Id, api::StorageCommand::SP>;
- MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64));
+ auto iter(_messageBusSent.find(reply->getContext().value.UINT64));
if (iter != _messageBusSent.end()) {
originalCommand.swap(iter->second);
_messageBusSent.erase(iter);
@@ -193,13 +192,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space(
namespace {
struct PlaceHolderBucketResolver : public BucketResolver {
- document::Bucket bucketFromId(const document::DocumentId &) const override {
- return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0));
+ [[nodiscard]] document::Bucket bucketFromId(const document::DocumentId &) const override {
+ return {FixedBucketSpaces::default_space(), document::BucketId(0)};
}
- document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
+ [[nodiscard]] document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
return FixedBucketSpaces::default_space();
}
- vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
+ [[nodiscard]] vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
assert(bucketSpace == FixedBucketSpaces::default_space());
return FixedBucketSpaces::to_string(bucketSpace);
}
@@ -438,7 +437,7 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- process(std::move(msg));
+ process(msg);
}
void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) {
@@ -451,7 +450,7 @@ CommunicationManager::onUp(const std::shared_ptr<api::StorageMessage> & msg)
{
MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString());
if (msg->getType().isReply()) {
- const api::StorageReply & m = static_cast<const api::StorageReply&>(*msg);
+ const auto & m = static_cast<const api::StorageReply&>(*msg);
if (m.getResult().failed()) {
LOG(debug, "Request %s failed: %s", msg->getType().toString().c_str(), m.getResult().toString().c_str());
}
@@ -604,7 +603,7 @@ CommunicationManager::sendDirectRPCReply(
request.addReturnString(m.data(), m.size());
if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(static_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, false);
request.addReturnString(ns.str().c_str());
@@ -693,9 +692,9 @@ CommunicationManager::run(framework::ThreadHandle& thread)
process(msg);
}
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
- for (EarlierProtocols::iterator it(_earlierGenerations.begin());
+ for (auto it(_earlierGenerations.begin());
!_earlierGenerations.empty() &&
- ((it->first + TEN_MINUTES) < _component.getClock().getTimeInSeconds());
+ ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime());
it = _earlierGenerations.begin())
{
_earlierGenerations.erase(it);
@@ -718,10 +717,10 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string&
void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo>& repo) {
if (_mbus) {
- framework::SecondTime now(_component.getClock().getTimeInSeconds());
+ vespalib::steady_time now(_component.getClock().getMonotonicTime());
auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo);
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
- _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)));
+ _earlierGenerations.emplace_back(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol));
}
if (_message_codec_provider) {
_message_codec_provider->update_atomically(repo);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 6f953411cac..e83a6517c45 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -72,9 +72,6 @@ class CommunicationManager final
public MessageDispatcher
{
private:
- CommunicationManager(const CommunicationManager&);
- CommunicationManager& operator=(const CommunicationManager&);
-
StorageComponent _component;
CommunicationManagerMetrics _metrics;
@@ -85,7 +82,7 @@ private:
Queue _eventQueue;
// XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
std::unique_ptr<config::ConfigFetcher> _configFetcher;
- using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>;
+ using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>;
using EarlierProtocols = std::vector<EarlierProtocol>;
std::mutex _earlierGenerationsLock;
EarlierProtocols _earlierGenerations;
@@ -126,6 +123,8 @@ private:
friend struct CommunicationManagerTest;
public:
+ CommunicationManager(const CommunicationManager&) = delete;
+ CommunicationManager& operator=(const CommunicationManager&) = delete;
CommunicationManager(StorageComponentRegister& compReg,
const config::ConfigUri & configUri);
~CommunicationManager() override;