diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-07 09:31:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-07 09:31:52 +0100 |
commit | fa62bb9420b8755737d42de3a525d4b276196660 (patch) | |
tree | 6ff4d7bc59405e8e3d867d494a4f6e70582b923c | |
parent | 491eae8d54a7c0c1b98560f71e5fd5c7519c9512 (diff) | |
parent | 5cffc8409a9b56b1d129fa691cf7acd4993710f2 (diff) |
Merge pull request #25897 from vespa-engine/balder/use-steady_time-time-for-communicationmanager
Balder/use steady time time for communicationmanager MERGEOK
6 files changed, 36 insertions, 43 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 0680c10ab29..ec22d7c064e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -9,7 +9,6 @@ #include <vespa/slobrok/sbmirror.h> #include <vespa/storage/common/bucket_resolver.h> #include <vespa/storage/common/nodestateupdater.h> -#include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageserver/configurable_bucket_resolver.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h> @@ -49,13 +48,14 @@ CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageRepl } namespace { - vespalib::string getNodeId(StorageComponent& sc) { - vespalib::asciistream ost; - ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex(); - return ost.str(); - } - framework::SecondTime TEN_MINUTES(600); +vespalib::string getNodeId(StorageComponent& sc) { + vespalib::asciistream ost; + ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex(); + return ost.str(); +} + +vespalib::duration TEN_MINUTES = 600s; } @@ -151,8 +151,7 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) std::shared_ptr<api::StorageCommand> originalCommand; { std::lock_guard lock(_messageBusSentLock); - using MessageMap = std::map<api::StorageMessage::Id, api::StorageCommand::SP>; - MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64)); + auto iter(_messageBusSent.find(reply->getContext().value.UINT64)); if (iter != _messageBusSent.end()) { originalCommand.swap(iter->second); _messageBusSent.erase(iter); @@ -193,13 +192,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space( namespace { struct PlaceHolderBucketResolver : public BucketResolver { - document::Bucket bucketFromId(const document::DocumentId &) const override { - return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0)); + [[nodiscard]] document::Bucket bucketFromId(const document::DocumentId &) const override { + return {FixedBucketSpaces::default_space(), document::BucketId(0)}; } - document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { + [[nodiscard]] document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { return FixedBucketSpaces::default_space(); } - vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { + [[nodiscard]] vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { assert(bucketSpace == FixedBucketSpaces::default_space()); return FixedBucketSpaces::to_string(bucketSpace); } @@ -438,7 +437,7 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - process(std::move(msg)); + process(msg); } void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) { @@ -451,7 +450,7 @@ CommunicationManager::onUp(const std::shared_ptr<api::StorageMessage> & msg) { MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString()); if (msg->getType().isReply()) { - const api::StorageReply & m = static_cast<const api::StorageReply&>(*msg); + const auto & m = static_cast<const api::StorageReply&>(*msg); if (m.getResult().failed()) { LOG(debug, "Request %s failed: %s", msg->getType().toString().c_str(), m.getResult().toString().c_str()); } @@ -604,7 +603,7 @@ CommunicationManager::sendDirectRPCReply( request.addReturnString(m.data(), m.size()); if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) { - api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); + auto& gns(static_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, false); request.addReturnString(ns.str().c_str()); @@ -693,9 +692,9 @@ CommunicationManager::run(framework::ThreadHandle& thread) process(msg); } std::lock_guard<std::mutex> guard(_earlierGenerationsLock); - for (EarlierProtocols::iterator it(_earlierGenerations.begin()); + for (auto it(_earlierGenerations.begin()); !_earlierGenerations.empty() && - ((it->first + TEN_MINUTES) < _component.getClock().getTimeInSeconds()); + ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime()); it = _earlierGenerations.begin()) { _earlierGenerations.erase(it); @@ -718,10 +717,10 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string& void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo>& repo) { if (_mbus) { - framework::SecondTime now(_component.getClock().getTimeInSeconds()); + vespalib::steady_time now(_component.getClock().getMonotonicTime()); auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo); std::lock_guard<std::mutex> guard(_earlierGenerationsLock); - _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol))); + _earlierGenerations.emplace_back(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)); } if (_message_codec_provider) { _message_codec_provider->update_atomically(repo); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 6f953411cac..e83a6517c45 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -72,9 +72,6 @@ class CommunicationManager final public MessageDispatcher { private: - CommunicationManager(const CommunicationManager&); - CommunicationManager& operator=(const CommunicationManager&); - StorageComponent _component; CommunicationManagerMetrics _metrics; @@ -85,7 +82,7 @@ private: Queue _eventQueue; // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? std::unique_ptr<config::ConfigFetcher> _configFetcher; - using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>; + using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>; using EarlierProtocols = std::vector<EarlierProtocol>; std::mutex _earlierGenerationsLock; EarlierProtocols _earlierGenerations; @@ -126,6 +123,8 @@ private: friend struct CommunicationManagerTest; public: + CommunicationManager(const CommunicationManager&) = delete; + CommunicationManager& operator=(const CommunicationManager&) = delete; CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri); ~CommunicationManager() override; diff --git a/storage/src/vespa/storage/storageserver/opslogger.cpp b/storage/src/vespa/storage/storageserver/opslogger.cpp index 03322cb55fd..e5785968eb1 100644 --- a/storage/src/vespa/storage/storageserver/opslogger.cpp +++ b/storage/src/vespa/storage/storageserver/opslogger.cpp @@ -77,7 +77,7 @@ OpsLogger::onPutReply(const std::shared_ptr<api::PutReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << _component.getClock().getTimeInSeconds().getTime() + ost << vespalib::to_string(_component.getClock().getSystemTime()) << "\tPUT\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { @@ -94,7 +94,7 @@ OpsLogger::onUpdateReply(const std::shared_ptr<api::UpdateReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << _component.getClock().getTimeInSeconds().getTime() + ost << vespalib::to_string(_component.getClock().getSystemTime()) << "\tUPDATE\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { @@ -111,7 +111,7 @@ OpsLogger::onRemoveReply(const std::shared_ptr<api::RemoveReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << _component.getClock().getTimeInSeconds().getTime() + ost << vespalib::to_string(_component.getClock().getSystemTime()) << "\tREMOVE\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { @@ -128,7 +128,7 @@ OpsLogger::onGetReply(const std::shared_ptr<api::GetReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << _component.getClock().getTimeInSeconds().getTime() + ost << vespalib::to_string(_component.getClock().getSystemTime()) << "\tGET\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { diff --git a/storage/src/vespa/storage/storageserver/statereporter.cpp b/storage/src/vespa/storage/storageserver/statereporter.cpp index b2337ae1223..373cd186708 100644 --- a/storage/src/vespa/storage/storageserver/statereporter.cpp +++ b/storage/src/vespa/storage/storageserver/statereporter.cpp @@ -29,9 +29,7 @@ StateReporter::StateReporter( _component.registerStatusPage(*this); } -StateReporter::~StateReporter() -{ -} +StateReporter::~StateReporter() = default; vespalib::string StateReporter::getReportContentType( @@ -84,7 +82,7 @@ StateReporter::getMetrics(const vespalib::string &consumer) snapshot.reset(0); _manager.getMetricSnapshot(guard, interval).addToSnapshot( - snapshot, _component.getClock().getTimeInSeconds().getTime()); + snapshot, vespalib::count_s(_component.getClock().getSystemTime().time_since_epoch())); vespalib::asciistream json; vespalib::JsonStream stream(json); @@ -106,7 +104,7 @@ StateReporter::getHealth() const lib::NodeState cns(*_component.getStateUpdater().getCurrentNodeState()); bool up = cns.getState().oneOf("u"); std::string message = up ? "" : "Node state: " + cns.toString(true); - return vespalib::HealthProducer::Health(up, message); + return { up, message }; } void diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 3987827a264..2836ab80acf 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -37,7 +37,7 @@ namespace { void writePidFile(const vespalib::string& pidfile) { - int rv = -1; + ssize_t rv = -1; vespalib::string mypid = vespalib::make_string("%d\n", getpid()); size_t lastSlash = pidfile.rfind('/'); if (lastSlash != vespalib::string::npos) { @@ -372,7 +372,7 @@ StorageNode::shutdown() _chain->flush(); } - if (_pidFile != "") { + if ( !_pidFile.empty() ) { LOG(debug, "Removing pid file"); removePidFile(_pidFile); } @@ -510,10 +510,8 @@ StorageNode::updateMetrics(const MetricLockGuard &) { } void -StorageNode::waitUntilInitialized(uint32_t timeout) { - framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime( - clock.getTimeInMillis() + framework::MilliSecTime(1000 * timeout)); +StorageNode::waitUntilInitialized(vespalib::duration timeout) { + vespalib::steady_time doom = vespalib::steady_clock::now() + timeout; while (true) { { NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); @@ -521,10 +519,9 @@ StorageNode::waitUntilInitialized(uint32_t timeout) { if (nodeState.getState() == lib::State::UP) break; } std::this_thread::sleep_for(10ms); - if (clock.getTimeInMillis() >= endTime) { + if (vespalib::steady_clock::now() >= doom) { std::ostringstream ost; - ost << "Storage server not initialized after waiting timeout of " - << timeout << " seconds."; + ost << "Storage server not initialized after waiting timeout of " << timeout << " seconds."; throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); } } diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 0e420f206e2..19b930c184f 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -78,7 +78,7 @@ public: virtual const lib::NodeType& getNodeType() const = 0; bool attemptedStopped() const; void notifyDoneInitializing() override; - void waitUntilInitialized(uint32_t timeoutSeconds = 15); + void waitUntilInitialized(vespalib::duration timeout = 15s); void updateMetrics(const MetricLockGuard & guard) override; /** Updates the document type repo. */ |