aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-07 09:31:52 +0100
committerGitHub <noreply@github.com>2023-02-07 09:31:52 +0100
commitfa62bb9420b8755737d42de3a525d4b276196660 (patch)
tree6ff4d7bc59405e8e3d867d494a4f6e70582b923c
parent491eae8d54a7c0c1b98560f71e5fd5c7519c9512 (diff)
parent5cffc8409a9b56b1d129fa691cf7acd4993710f2 (diff)
Merge pull request #25897 from vespa-engine/balder/use-steady_time-time-for-communicationmanager
Balder/use steady time time for communicationmanager MERGEOK
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp39
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h7
-rw-r--r--storage/src/vespa/storage/storageserver/opslogger.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/statereporter.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h2
6 files changed, 36 insertions, 43 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 0680c10ab29..ec22d7c064e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -9,7 +9,6 @@
#include <vespa/slobrok/sbmirror.h>
#include <vespa/storage/common/bucket_resolver.h>
#include <vespa/storage/common/nodestateupdater.h>
-#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/storageserver/configurable_bucket_resolver.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h>
@@ -49,13 +48,14 @@ CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageRepl
}
namespace {
- vespalib::string getNodeId(StorageComponent& sc) {
- vespalib::asciistream ost;
- ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex();
- return ost.str();
- }
- framework::SecondTime TEN_MINUTES(600);
+vespalib::string getNodeId(StorageComponent& sc) {
+ vespalib::asciistream ost;
+ ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex();
+ return ost.str();
+}
+
+vespalib::duration TEN_MINUTES = 600s;
}
@@ -151,8 +151,7 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply)
std::shared_ptr<api::StorageCommand> originalCommand;
{
std::lock_guard lock(_messageBusSentLock);
- using MessageMap = std::map<api::StorageMessage::Id, api::StorageCommand::SP>;
- MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64));
+ auto iter(_messageBusSent.find(reply->getContext().value.UINT64));
if (iter != _messageBusSent.end()) {
originalCommand.swap(iter->second);
_messageBusSent.erase(iter);
@@ -193,13 +192,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space(
namespace {
struct PlaceHolderBucketResolver : public BucketResolver {
- document::Bucket bucketFromId(const document::DocumentId &) const override {
- return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0));
+ [[nodiscard]] document::Bucket bucketFromId(const document::DocumentId &) const override {
+ return {FixedBucketSpaces::default_space(), document::BucketId(0)};
}
- document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
+ [[nodiscard]] document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
return FixedBucketSpaces::default_space();
}
- vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
+ [[nodiscard]] vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
assert(bucketSpace == FixedBucketSpaces::default_space());
return FixedBucketSpaces::to_string(bucketSpace);
}
@@ -438,7 +437,7 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- process(std::move(msg));
+ process(msg);
}
void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) {
@@ -451,7 +450,7 @@ CommunicationManager::onUp(const std::shared_ptr<api::StorageMessage> & msg)
{
MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString());
if (msg->getType().isReply()) {
- const api::StorageReply & m = static_cast<const api::StorageReply&>(*msg);
+ const auto & m = static_cast<const api::StorageReply&>(*msg);
if (m.getResult().failed()) {
LOG(debug, "Request %s failed: %s", msg->getType().toString().c_str(), m.getResult().toString().c_str());
}
@@ -604,7 +603,7 @@ CommunicationManager::sendDirectRPCReply(
request.addReturnString(m.data(), m.size());
if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(static_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, false);
request.addReturnString(ns.str().c_str());
@@ -693,9 +692,9 @@ CommunicationManager::run(framework::ThreadHandle& thread)
process(msg);
}
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
- for (EarlierProtocols::iterator it(_earlierGenerations.begin());
+ for (auto it(_earlierGenerations.begin());
!_earlierGenerations.empty() &&
- ((it->first + TEN_MINUTES) < _component.getClock().getTimeInSeconds());
+ ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime());
it = _earlierGenerations.begin())
{
_earlierGenerations.erase(it);
@@ -718,10 +717,10 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string&
void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo>& repo) {
if (_mbus) {
- framework::SecondTime now(_component.getClock().getTimeInSeconds());
+ vespalib::steady_time now(_component.getClock().getMonotonicTime());
auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo);
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
- _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)));
+ _earlierGenerations.emplace_back(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol));
}
if (_message_codec_provider) {
_message_codec_provider->update_atomically(repo);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 6f953411cac..e83a6517c45 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -72,9 +72,6 @@ class CommunicationManager final
public MessageDispatcher
{
private:
- CommunicationManager(const CommunicationManager&);
- CommunicationManager& operator=(const CommunicationManager&);
-
StorageComponent _component;
CommunicationManagerMetrics _metrics;
@@ -85,7 +82,7 @@ private:
Queue _eventQueue;
// XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
std::unique_ptr<config::ConfigFetcher> _configFetcher;
- using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>;
+ using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>;
using EarlierProtocols = std::vector<EarlierProtocol>;
std::mutex _earlierGenerationsLock;
EarlierProtocols _earlierGenerations;
@@ -126,6 +123,8 @@ private:
friend struct CommunicationManagerTest;
public:
+ CommunicationManager(const CommunicationManager&) = delete;
+ CommunicationManager& operator=(const CommunicationManager&) = delete;
CommunicationManager(StorageComponentRegister& compReg,
const config::ConfigUri & configUri);
~CommunicationManager() override;
diff --git a/storage/src/vespa/storage/storageserver/opslogger.cpp b/storage/src/vespa/storage/storageserver/opslogger.cpp
index 03322cb55fd..e5785968eb1 100644
--- a/storage/src/vespa/storage/storageserver/opslogger.cpp
+++ b/storage/src/vespa/storage/storageserver/opslogger.cpp
@@ -77,7 +77,7 @@ OpsLogger::onPutReply(const std::shared_ptr<api::PutReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << _component.getClock().getTimeInSeconds().getTime()
+ ost << vespalib::to_string(_component.getClock().getSystemTime())
<< "\tPUT\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
@@ -94,7 +94,7 @@ OpsLogger::onUpdateReply(const std::shared_ptr<api::UpdateReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << _component.getClock().getTimeInSeconds().getTime()
+ ost << vespalib::to_string(_component.getClock().getSystemTime())
<< "\tUPDATE\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
@@ -111,7 +111,7 @@ OpsLogger::onRemoveReply(const std::shared_ptr<api::RemoveReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << _component.getClock().getTimeInSeconds().getTime()
+ ost << vespalib::to_string(_component.getClock().getSystemTime())
<< "\tREMOVE\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
@@ -128,7 +128,7 @@ OpsLogger::onGetReply(const std::shared_ptr<api::GetReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << _component.getClock().getTimeInSeconds().getTime()
+ ost << vespalib::to_string(_component.getClock().getSystemTime())
<< "\tGET\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
diff --git a/storage/src/vespa/storage/storageserver/statereporter.cpp b/storage/src/vespa/storage/storageserver/statereporter.cpp
index b2337ae1223..373cd186708 100644
--- a/storage/src/vespa/storage/storageserver/statereporter.cpp
+++ b/storage/src/vespa/storage/storageserver/statereporter.cpp
@@ -29,9 +29,7 @@ StateReporter::StateReporter(
_component.registerStatusPage(*this);
}
-StateReporter::~StateReporter()
-{
-}
+StateReporter::~StateReporter() = default;
vespalib::string
StateReporter::getReportContentType(
@@ -84,7 +82,7 @@ StateReporter::getMetrics(const vespalib::string &consumer)
snapshot.reset(0);
_manager.getMetricSnapshot(guard, interval).addToSnapshot(
- snapshot, _component.getClock().getTimeInSeconds().getTime());
+ snapshot, vespalib::count_s(_component.getClock().getSystemTime().time_since_epoch()));
vespalib::asciistream json;
vespalib::JsonStream stream(json);
@@ -106,7 +104,7 @@ StateReporter::getHealth() const
lib::NodeState cns(*_component.getStateUpdater().getCurrentNodeState());
bool up = cns.getState().oneOf("u");
std::string message = up ? "" : "Node state: " + cns.toString(true);
- return vespalib::HealthProducer::Health(up, message);
+ return { up, message };
}
void
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 3987827a264..2836ab80acf 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -37,7 +37,7 @@ namespace {
void writePidFile(const vespalib::string& pidfile)
{
- int rv = -1;
+ ssize_t rv = -1;
vespalib::string mypid = vespalib::make_string("%d\n", getpid());
size_t lastSlash = pidfile.rfind('/');
if (lastSlash != vespalib::string::npos) {
@@ -372,7 +372,7 @@ StorageNode::shutdown()
_chain->flush();
}
- if (_pidFile != "") {
+ if ( !_pidFile.empty() ) {
LOG(debug, "Removing pid file");
removePidFile(_pidFile);
}
@@ -510,10 +510,8 @@ StorageNode::updateMetrics(const MetricLockGuard &) {
}
void
-StorageNode::waitUntilInitialized(uint32_t timeout) {
- framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(1000 * timeout));
+StorageNode::waitUntilInitialized(vespalib::duration timeout) {
+ vespalib::steady_time doom = vespalib::steady_clock::now() + timeout;
while (true) {
{
NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
@@ -521,10 +519,9 @@ StorageNode::waitUntilInitialized(uint32_t timeout) {
if (nodeState.getState() == lib::State::UP) break;
}
std::this_thread::sleep_for(10ms);
- if (clock.getTimeInMillis() >= endTime) {
+ if (vespalib::steady_clock::now() >= doom) {
std::ostringstream ost;
- ost << "Storage server not initialized after waiting timeout of "
- << timeout << " seconds.";
+ ost << "Storage server not initialized after waiting timeout of " << timeout << " seconds.";
throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
}
}
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 0e420f206e2..19b930c184f 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -78,7 +78,7 @@ public:
virtual const lib::NodeType& getNodeType() const = 0;
bool attemptedStopped() const;
void notifyDoneInitializing() override;
- void waitUntilInitialized(uint32_t timeoutSeconds = 15);
+ void waitUntilInitialized(vespalib::duration timeout = 15s);
void updateMetrics(const MetricLockGuard & guard) override;
/** Updates the document type repo. */