aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-06 17:20:25 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2023-02-06 17:51:42 +0000
commitcabef01a63a222c6805521c188f24f57e9296152 (patch)
tree538485877d0f265f47a97b34419adef032fc09d3
parent1a37850993a673e79182eb62220d063878d48410 (diff)
General code healt, nodiscard, range loops etc
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp9
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp2
-rw-r--r--storage/src/tests/storageframework/thread/tickingthreadtest.cpp47
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp1
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp179
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.h2
-rw-r--r--storage/src/vespa/storage/common/statusmetricconsumer.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/nodeinfo.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/nodeinfo.h9
-rw-r--r--storage/src/vespa/storageapi/message/bucket.h18
10 files changed, 100 insertions, 170 deletions
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 98ee1fbe8e6..16854cd63c6 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -1378,9 +1378,7 @@ std::string StateCheckersTest::testGarbageCollection(
configure_stripe(cfg);
// Insert after stripe configuration to avoid GC timestamp being implicitly reset
BucketDatabase::Entry e(document::BucketId(17, 0));
- e.getBucketInfo().addNode(BucketCopy(prevTimestamp, 0,
- api::BucketInfo(3,3,3)),
- toVector((uint16_t)0));
+ e.getBucketInfo().addNode(BucketCopy(prevTimestamp, 0, api::BucketInfo(3,3,3)), toVector((uint16_t)0));
e.getBucketInfo().setLastGarbageCollectionTime(prevTimestamp);
getBucketDatabase().update(e);
@@ -1389,8 +1387,7 @@ std::string StateCheckersTest::testGarbageCollection(
getDistributorBucketSpace(), statsTracker,
makeDocumentBucket(e.getBucketId()));
getClock().setAbsoluteTimeInSeconds(nowTimestamp);
- return testStateChecker(checker, c, false, PendingMessage(),
- includePriority, includeSchedulingPri);
+ return testStateChecker(checker, c, false, PendingMessage(), includePriority, includeSchedulingPri);
}
TEST_F(StateCheckersTest, garbage_collection) {
@@ -1749,7 +1746,7 @@ TEST_F(StateCheckersTest, stats_updates_for_maximum_time_since_gc_run) {
.last_gc_at_time({17, 0}, 100)
.runFor({17, 0});
- EXPECT_EQ(runner.stats().max_observed_time_since_last_gc().count(), 1900);
+ EXPECT_EQ(runner.stats().max_observed_time_since_last_gc(), 1900s);
}
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index 2ab37a21ec4..dad6f477d83 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -6,6 +6,8 @@
#include <vespa/storageapi/message/visitor.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h>
+#include <vespa/storageframework/generic/thread/runnable.h>
+#include <vespa/storageframework/generic/thread/thread.h>
#include <tests/distributor/top_level_distributor_test_util.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/test/make_document_bucket.h>
diff --git a/storage/src/tests/storageframework/thread/tickingthreadtest.cpp b/storage/src/tests/storageframework/thread/tickingthreadtest.cpp
index 577e9d128b9..9023f1ad199 100644
--- a/storage/src/tests/storageframework/thread/tickingthreadtest.cpp
+++ b/storage/src/tests/storageframework/thread/tickingthreadtest.cpp
@@ -32,7 +32,7 @@ struct MyApp : public TickingThread {
std::vector<Context> _context;
TickingThreadPool::UP _threadPool;
- MyApp(int threadCount, bool doCritOverlapTest = false);
+ explicit MyApp(int threadCount, bool doCritOverlapTest = false);
~MyApp() override;
void start(ThreadPool& p) { _threadPool->start(p); }
@@ -56,37 +56,30 @@ struct MyApp : public TickingThread {
return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
}
uint64_t getMinCritTick() {
- uint64_t min = std::numeric_limits<uint64_t>().max();
- for (uint32_t i=0; i<_context.size(); ++i) {
- min = std::min(min, load_relaxed(_context[i]._critTickCount));
+ uint64_t min = std::numeric_limits<uint64_t>::max();
+ for (auto & c : _context) {
+ min = std::min(min, load_relaxed(c._critTickCount));
}
return min;
}
- uint64_t getMinNonCritTick() {
- uint64_t min = std::numeric_limits<uint64_t>().max();
- for (uint32_t i=0; i<_context.size(); ++i) {
- min = std::min(min, load_relaxed(_context[i]._critTickCount));
- }
- return min;
- }
- uint64_t getTotalCritTicks() const noexcept {
+ [[nodiscard]] uint64_t getTotalCritTicks() const noexcept {
uint64_t total = 0;
- for (uint32_t i=0; i<_context.size(); ++i) {
- total += load_relaxed(_context[i]._critTickCount);
+ for (const auto & i : _context) {
+ total += load_relaxed(i._critTickCount);
}
return total;
}
- uint64_t getTotalNonCritTicks() const noexcept {
+ [[nodiscard]] uint64_t getTotalNonCritTicks() const noexcept {
uint64_t total = 0;
- for (uint32_t i=0; i<_context.size(); ++i) {
- total += load_relaxed(_context[i]._nonCritTickCount);
+ for (const auto & c : _context) {
+ total += load_relaxed(c._nonCritTickCount);
}
return total;
}
- uint64_t getTotalTicks() const noexcept {
+ [[nodiscard]] uint64_t getTotalTicks() const noexcept {
return getTotalCritTicks() + getTotalNonCritTicks();
}
- bool hasCritOverlap() const noexcept { return load_relaxed(_critOverlap); }
+ [[nodiscard]] bool hasCritOverlap() const noexcept { return load_relaxed(_critOverlap); }
};
MyApp::MyApp(int threadCount, bool doCritOverlapTest)
@@ -215,7 +208,7 @@ RealClock clock;
void printTaskInfo(const std::string& task, const char* action) {
vespalib::string msg = vespalib::make_string(
"%" PRIu64 ": %s %s\n",
- clock.getTimeInMicros().getTime(),
+ vespalib::count_us(clock.getSystemTime().time_since_epoch()),
task.c_str(),
action);
// std::cerr << msg;
@@ -229,15 +222,15 @@ struct BroadcastApp : public TickingThread {
// Set a huge wait time by default to ensure we have to notify
BroadcastApp();
- ~BroadcastApp();
+ ~BroadcastApp() override;
void start(ThreadPool& p) { _threadPool->start(p); }
ThreadWaitInfo doCriticalTick(ThreadIndex) override {
if (!_queue.empty()) {
- for (uint32_t i=0; i<_queue.size(); ++i) {
- printTaskInfo(_queue[i], "activating");
- _active.push_back(_queue[i]);
+ for (const auto & task : _queue) {
+ printTaskInfo(task, "activating");
+ _active.push_back(task);
}
_queue.clear();
return ThreadWaitInfo::MORE_WORK_ENQUEUED;
@@ -246,9 +239,9 @@ struct BroadcastApp : public TickingThread {
}
ThreadWaitInfo doNonCriticalTick(ThreadIndex) override {
if (!_active.empty()) {
- for (uint32_t i=0; i<_active.size(); ++i) {
- printTaskInfo(_active[i], "processing");
- _processed.push_back(_active[i]);
+ for (const auto & task : _active) {
+ printTaskInfo(task, "processing");
+ _processed.push_back(task);
}
_active.clear();
}
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 8b8801f9cf5..9a98a40e7eb 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -5,7 +5,6 @@
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index cf98585bc82..ecf4e81322f 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -11,6 +11,7 @@
#include <vespa/storage/storageutil/distributorstatecache.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
+#include <vespa/storageframework/generic/thread/thread.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
@@ -18,6 +19,7 @@
#include <vespa/storageapi/message/stat.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <ranges>
#include <vespa/config/helper/configgetter.hpp>
#include <chrono>
#include <thread>
@@ -30,8 +32,7 @@ using namespace std::chrono_literals;
namespace storage {
-BucketManager::BucketManager(const config::ConfigUri & configUri,
- ServiceLayerComponentRegister& compReg)
+BucketManager::BucketManager(const config::ConfigUri & configUri, ServiceLayerComponentRegister& compReg)
: StorageLink("Bucket manager"),
framework::StatusReporter("bucketdb", "Bucket database"),
_configUri(configUri),
@@ -58,8 +59,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri,
ns.setMinUsedBits(58);
_component.getStateUpdater().setReportedNodeState(ns);
- auto server_config = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(
- configUri.getConfigId(), configUri.getContext());
+ auto server_config = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(configUri.getConfigId(), configUri.getContext());
_simulated_processing_delay = std::chrono::milliseconds(std::max(0, server_config->simulatedBucketRequestLatencyMsec));
}
@@ -83,10 +83,8 @@ void BucketManager::onClose()
}
void
-BucketManager::print(std::ostream& out, bool verbose,
- const std::string& indent) const
+BucketManager::print(std::ostream& out, bool ,const std::string& ) const
{
- (void) verbose; (void) indent;
out << "BucketManager()";
}
@@ -115,19 +113,15 @@ public:
{
}
- StorBucketDatabase::Decision operator()(uint64_t bucketId,
- const StorBucketDatabase::Entry& data)
+ StorBucketDatabase::Decision operator()(uint64_t bucketId,const StorBucketDatabase::Entry& data)
{
document::BucketId b(document::BucketId::keyToBucketId(bucketId));
try{
uint16_t i = _state.getOwner(b);
auto it = _result.find(i);
if constexpr (log) {
- LOG(spam, "Bucket %s (reverse %" PRIu64 "), should be handled"
- " by distributor %u which we are %sgenerating "
- "state for.",
- b.toString().c_str(), bucketId, i,
- it == _result.end() ? "not " : "");
+ LOG(spam, "Bucket %s (reverse %" PRIu64 "), should be handled by distributor %u which we are %sgenerating state for.",
+ b.toString().c_str(), bucketId, i, it == _result.end() ? "not " : "");
}
if (it != _result.end()) {
api::RequestBucketInfoReply::Entry entry;
@@ -136,17 +130,11 @@ public:
it->second.push_back(entry);
}
} catch (lib::TooFewBucketBitsInUseException& e) {
- LOGBP(warning, "Cannot assign bucket %s to a distributor "
- " as bucket only specifies %u bits.",
- b.toString().c_str(),
- b.getUsedBits());
+ LOGBP(warning, "Cannot assign bucket %s to a distributor as bucket only specifies %u bits.",
+ b.toString().c_str(), b.getUsedBits());
} catch (lib::NoDistributorsAvailableException& e) {
- LOGBP(warning, "No distributors available while processing "
- "request bucket info. Distribution hash: %s, "
- "cluster state: %s",
- _state.getDistribution().getNodeGraph()
- .getDistributionConfigHash().c_str(),
- _state.getClusterState().toString().c_str());
+ LOGBP(warning, "No distributors available while processing request bucket info. Distribution hash: %s, cluster state: %s",
+ _state.getDistribution().getNodeGraph().getDistributionConfigHash().c_str(), _state.getClusterState().toString().c_str());
}
return StorBucketDatabase::Decision::CONTINUE;
}
@@ -173,8 +161,7 @@ struct MetricsUpdater {
void operator()(document::BucketId::Type bucketId,
const StorBucketDatabase::Entry& data) noexcept
{
- document::BucketId bucket(
- document::BucketId::keyToBucketId(bucketId));
+ document::BucketId bucket(document::BucketId::keyToBucketId(bucketId));
if (data.valid()) {
++count.buckets;
@@ -209,8 +196,7 @@ struct MetricsUpdater {
StorBucketDatabase::Entry
BucketManager::getBucketInfo(const document::Bucket &bucket) const
{
- StorBucketDatabase::WrappedEntry entry(
- _component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::getBucketInfo"));
+ StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::getBucketInfo"));
return *entry;
}
@@ -254,8 +240,7 @@ BucketManager::updateMetrics(bool updateDocCount)
void BucketManager::update_bucket_db_memory_usage_metrics() {
for (auto& space : _component.getBucketSpaceRepo()) {
auto bm = _metrics->bucket_spaces.find(space.first);
- bm->second->bucket_db_metrics.memory_usage.update(
- space.second->bucketDatabase().detailed_memory_usage());
+ bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage());
}
}
@@ -266,10 +251,8 @@ void BucketManager::updateMinUsedBits()
// When going through to get sizes, we also record min bits
MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker());
if (bitTracker.getMinUsedBits() != m.lowestUsedBit) {
- NodeStateUpdater::Lock::SP lock(
- _component.getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(
- *_component.getStateUpdater().getReportedNodeState());
+ NodeStateUpdater::Lock::SP lock(_component.getStateUpdater().grabStateChangeLock());
+ lib::NodeState ns(*_component.getStateUpdater().getReportedNodeState());
bitTracker.setMinUsedBits(m.lowestUsedBit);
ns.setMinUsedBits(m.lowestUsedBit);
_component.getStateUpdater().setReportedNodeState(ns);
@@ -406,7 +389,7 @@ bool BucketManager::onRequestBucketInfo(
const std::shared_ptr<api::RequestBucketInfoCommand>& cmd)
{
LOG(debug, "Got request bucket info command %s", cmd->toString().c_str());
- if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) {
+ if (cmd->getBuckets().empty() && cmd->hasSystemState()) {
std::lock_guard guard(_workerLock);
_bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd);
@@ -419,23 +402,17 @@ bool BucketManager::onRequestBucketInfo(
BucketSpace bucketSpace(cmd->getBucketSpace());
api::RequestBucketInfoReply::EntryVector info;
- if (cmd->getBuckets().size()) {
+ if (!cmd->getBuckets().empty()) {
typedef std::map<document::BucketId,
StorBucketDatabase::WrappedEntry> BucketMap;
- for (uint32_t i = 0; i < cmd->getBuckets().size(); i++) {
- BucketMap entries(_component.getBucketDatabase(bucketSpace).getAll(
- cmd->getBuckets()[i],
- "BucketManager::onRequestBucketInfo"));
- for (BucketMap::iterator it = entries.begin();
- it != entries.end(); ++it)
- {
- info.push_back(api::RequestBucketInfoReply::Entry(
- it->first, it->second->getBucketInfo()));
+ for (auto i : cmd->getBuckets()) {
+ BucketMap entries(_component.getBucketDatabase(bucketSpace).getAll(i, "BucketManager::onRequestBucketInfo"));
+ for (auto & entrie : entries) {
+ info.emplace_back(entrie.first, entrie.second->getBucketInfo());
}
}
} else {
- LOG(error, "We don't support fetching bucket info without bucket "
- "list or system state");
+ LOG(error, "We don't support fetching bucket info without bucket list or system state");
assert(false);
}
_metrics->simpleBucketInfoRequestSize.addValue(info.size());
@@ -445,17 +422,14 @@ bool BucketManager::onRequestBucketInfo(
LOG(spam, "Returning list of checksums:");
for (const auto & entry : reply->getBucketInfo()) {
- LOG(spam, "%s: %s",
- entry._bucketId.toString().c_str(),
- entry._info.toString().c_str());
+ LOG(spam, "%s: %s", entry._bucketId.toString().c_str(), entry._info.toString().c_str());
}
sendUp(reply);
// Remaining replies dispatched by queueGuard upon function exit.
return true;
}
-BucketManager::ScopedQueueDispatchGuard::ScopedQueueDispatchGuard(
- BucketManager& mgr)
+BucketManager::ScopedQueueDispatchGuard::ScopedQueueDispatchGuard(BucketManager& mgr)
: _mgr(mgr)
{
_mgr.enterQueueProtectedSection();
@@ -520,16 +494,13 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
LOG(debug, "Processing %zu queued request bucket info commands for bucket space %s. "
"Using cluster state '%s' and distribution hash '%s'",
- reqs.size(),
- bucketSpace.toString().c_str(),
- clusterState->toString().c_str(),
- our_hash.c_str());
+ reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str());
std::lock_guard clusterStateGuard(_clusterStateLock);
- for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) {
+ for (auto & req : std::ranges::reverse_view(reqs)) {
// Currently small requests should not be forwarded to worker thread
- assert((*it)->hasSystemState());
- const auto their_hash = (*it)->getDistributionHash();
+ assert(req->hasSystemState());
+ const auto their_hash = req->getDistributionHash();
std::ostringstream error;
if ((clusterState->getVersion() != _last_cluster_state_version_initiated) ||
@@ -539,13 +510,13 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
// to another cluster state version does not happen atomically. Detect and
// gracefully deal with the case where we're not internally in sync.
error << "Inconsistent internal cluster state on node during transition; "
- << "failing request from distributor " << (*it)->getDistributor()
+ << "failing request from distributor " << req->getDistributor()
<< " so it can be retried. Node version is " << clusterState->getVersion()
<< ", last version seen by the bucket manager is " << _last_cluster_state_version_initiated
<< ", last internally converged version is " << _last_cluster_state_version_completed;
- } else if ((*it)->getSystemState().getVersion() != _last_cluster_state_version_initiated) {
+ } else if (req->getSystemState().getVersion() != _last_cluster_state_version_initiated) {
error << "Ignoring bucket info request for cluster state version "
- << (*it)->getSystemState().getVersion() << " as newest "
+ << req->getSystemState().getVersion() << " as newest "
<< "version we know of is " << _last_cluster_state_version_initiated;
} else if (!their_hash.empty() && their_hash != our_hash) {
// Mismatching config hash indicates nodes are out of sync with their config generations
@@ -553,21 +524,21 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
<< our_hash.c_str() << ", their hash: " << their_hash.c_str() << ")";
}
if (error.str().empty()) {
- auto result = seenDistributors.insert((*it)->getDistributor());
+ auto result = seenDistributors.insert(req->getDistributor());
if (result.second) {
- requests[(*it)->getDistributor()] = *it;
+ requests[req->getDistributor()] = req;
continue;
} else {
error << "There is already a newer bucket info request for this"
- << " node from distributor " << (*it)->getDistributor();
+ << " node from distributor " << req->getDistributor();
}
}
// If we get here, message should be failed
- auto reply = std::make_shared<api::RequestBucketInfoReply>(**it);
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(*req);
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, error.str()));
LOG(debug, "Rejecting request from distributor %u: %s",
- (*it)->getDistributor(),
+ req->getDistributor(),
error.str().c_str());
sendUp(reply);
}
@@ -592,29 +563,24 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
}
_metrics->fullBucketInfoRequestSize.addValue(requests.size());
- LOG(debug, "Processing %zu bucket info requests for "
- "distributors %s, using system state %s",
- requests.size(), distrList.str().c_str(),
- clusterState->toString().c_str());
+ LOG(debug, "Processing %zu bucket info requests for distributors %s, using system state %s",
+ requests.size(), distrList.str().c_str(), clusterState->toString().c_str());
framework::MilliSecTimer runStartTime(_component.getClock());
// Don't allow logging to lower performance of inner loop.
// Call other type of instance if logging
const document::BucketIdFactory& idFac(_component.getBucketIdFactory());
if (LOG_WOULD_LOG(spam)) {
- DistributorInfoGatherer<true> builder(
- *clusterState, result, idFac, distribution);
+ DistributorInfoGatherer<true> builder(*clusterState, result, idFac, distribution);
_component.getBucketDatabase(bucketSpace).for_each_chunked(std::ref(builder),
"BucketManager::processRequestBucketInfoCommands-1");
} else {
- DistributorInfoGatherer<false> builder(
- *clusterState, result, idFac, distribution);
+ DistributorInfoGatherer<false> builder(*clusterState, result, idFac, distribution);
_component.getBucketDatabase(bucketSpace).for_each_chunked(std::ref(builder),
"BucketManager::processRequestBucketInfoCommands-2");
}
_metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble());
for (auto& nodeAndCmd : requests) {
- auto reply(std::make_shared<api::RequestBucketInfoReply>(
- *nodeAndCmd.second));
+ auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second));
reply->getBucketInfo().swap(result[nodeAndCmd.first]);
sendUp(reply);
}
@@ -653,8 +619,7 @@ BucketManager::verifyAndUpdateLastModified(api::StorageCommand& cmd,
uint64_t prevLastModified = 0;
{
- StorBucketDatabase::WrappedEntry entry(
- _component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::verify"));
+ StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::verify"));
if (entry.exist()) {
prevLastModified = entry->info.getLastModified();
@@ -670,16 +635,11 @@ BucketManager::verifyAndUpdateLastModified(api::StorageCommand& cmd,
}
api::StorageReply::UP reply = cmd.makeReply();
- reply->setResult(api::ReturnCode(
- api::ReturnCode::STALE_TIMESTAMP,
+ reply->setResult(api::ReturnCode(api::ReturnCode::STALE_TIMESTAMP,
vespalib::make_string(
- "Received command %s with a lower/equal timestamp "
- " (%" PRIu64 ") than the last operation received for "
- "bucket %s, with timestamp %" PRIu64,
- cmd.toString().c_str(),
- lastModified,
- bucket.toString().c_str(),
- prevLastModified)));
+ "Received command %s with a lower/equal timestamp (%" PRIu64 ") than the last "
+ "operation received for bucket %s, with timestamp %" PRIu64,
+ cmd.toString().c_str(), lastModified, bucket.toString().c_str(), prevLastModified)));
sendUp(api::StorageMessage::SP(reply.release()));
@@ -724,10 +684,8 @@ BucketManager::onCreateBucket(const api::CreateBucketCommand::SP& cmd)
{
MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker());
if (bitTracker.update(cmd->getBucketId())) {
- NodeStateUpdater::Lock::SP lock(
- _component.getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(
- *_component.getStateUpdater().getReportedNodeState());
+ NodeStateUpdater::Lock::SP lock(_component.getStateUpdater().grabStateChangeLock());
+ lib::NodeState ns(*_component.getStateUpdater().getReportedNodeState());
ns.setMinUsedBits(bitTracker.getMinUsedBits());
_component.getStateUpdater().setReportedNodeState(ns);
}
@@ -740,10 +698,8 @@ BucketManager::onMergeBucket(const api::MergeBucketCommand::SP& cmd)
{
MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker());
if (bitTracker.update(cmd->getBucketId())) {
- NodeStateUpdater::Lock::SP lock(
- _component.getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(
- *_component.getStateUpdater().getReportedNodeState());
+ NodeStateUpdater::Lock::SP lock(_component.getStateUpdater().grabStateChangeLock());
+ lib::NodeState ns(*_component.getStateUpdater().getReportedNodeState());
ns.setMinUsedBits(bitTracker.getMinUsedBits());
_component.getStateUpdater().setReportedNodeState(ns);
}
@@ -753,13 +709,7 @@ BucketManager::onMergeBucket(const api::MergeBucketCommand::SP& cmd)
bool
BucketManager::onRemove(const api::RemoveCommand::SP& cmd)
{
- if (!verifyAndUpdateLastModified(*cmd,
- cmd->getBucket(),
- cmd->getTimestamp())) {
- return true;
- }
-
- return false;
+ return !verifyAndUpdateLastModified(*cmd, cmd->getBucket(), cmd->getTimestamp());
}
bool
@@ -771,13 +721,7 @@ BucketManager::onRemoveReply(const api::RemoveReply::SP& reply)
bool
BucketManager::onPut(const api::PutCommand::SP& cmd)
{
- if (!verifyAndUpdateLastModified(*cmd,
- cmd->getBucket(),
- cmd->getTimestamp())) {
- return true;
- }
-
- return false;
+ return !verifyAndUpdateLastModified(*cmd, cmd->getBucket(), cmd->getTimestamp());
}
bool
@@ -789,13 +733,7 @@ BucketManager::onPutReply(const api::PutReply::SP& reply)
bool
BucketManager::onUpdate(const api::UpdateCommand::SP& cmd)
{
- if (!verifyAndUpdateLastModified(*cmd,
- cmd->getBucket(),
- cmd->getTimestamp())) {
- return true;
- }
-
- return false;
+ return !verifyAndUpdateLastModified(*cmd, cmd->getBucket(), cmd->getTimestamp());
}
bool
@@ -805,10 +743,8 @@ BucketManager::onUpdateReply(const api::UpdateReply::SP& reply)
}
bool
-BucketManager::onNotifyBucketChangeReply(
- const api::NotifyBucketChangeReply::SP& reply)
+BucketManager::onNotifyBucketChangeReply(const api::NotifyBucketChangeReply::SP&)
{
- (void) reply;
// Handling bucket change replies is a no-op.
return true;
}
@@ -857,8 +793,7 @@ BucketManager::enqueueAsConflictIfProcessingRequest(
{
std::lock_guard guard(_queueProcessingLock);
if (_requestsCurrentlyProcessing != 0) {
- LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo",
- reply->toString().c_str());
+ LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo", reply->toString().c_str());
_queuedReplies.push_back(reply);
_conflictingBuckets.insert(reply->getBucketId());
return true;
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h
index fda13e09c45..ce60ec88bff 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.h
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h
@@ -67,7 +67,7 @@ private:
size_t _requestsCurrentlyProcessing;
ServiceLayerComponent _component;
std::shared_ptr<BucketManagerMetrics> _metrics;
- framework::Thread::UP _thread;
+ std::unique_ptr<framework::Thread> _thread;
std::chrono::milliseconds _simulated_processing_delay;
class ScopedQueueDispatchGuard {
diff --git a/storage/src/vespa/storage/common/statusmetricconsumer.cpp b/storage/src/vespa/storage/common/statusmetricconsumer.cpp
index e9360c35f3c..8eb3e9f3ab6 100644
--- a/storage/src/vespa/storage/common/statusmetricconsumer.cpp
+++ b/storage/src/vespa/storage/common/statusmetricconsumer.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "statusmetricconsumer.h"
+#include <vespa/storageframework/generic/clock/clock.h>
#include <boost/lexical_cast.hpp>
#include <vespa/metrics/jsonwriter.h>
#include <vespa/metrics/textwriter.h>
diff --git a/storage/src/vespa/storage/distributor/nodeinfo.cpp b/storage/src/vespa/storage/distributor/nodeinfo.cpp
index 44a4670bd54..d02a1e1906e 100644
--- a/storage/src/vespa/storage/distributor/nodeinfo.cpp
+++ b/storage/src/vespa/storage/distributor/nodeinfo.cpp
@@ -16,7 +16,7 @@ bool NodeInfo::isBusy(uint16_t idx) const {
const SingleNodeInfo& info = getNode(idx);
if (info._busyUntilTime.time_since_epoch().count() != 0) {
if (_clock.getMonotonicTime() > info._busyUntilTime) {
- info._busyUntilTime = framework::MonotonicTimePoint{};
+ info._busyUntilTime = vespalib::steady_time();
} else {
return true;
}
diff --git a/storage/src/vespa/storage/distributor/nodeinfo.h b/storage/src/vespa/storage/distributor/nodeinfo.h
index 0e1b04561fb..7f0716d7804 100644
--- a/storage/src/vespa/storage/distributor/nodeinfo.h
+++ b/storage/src/vespa/storage/distributor/nodeinfo.h
@@ -8,8 +8,11 @@
#pragma once
#include <vector>
-#include <vespa/storageframework/generic/clock/time.h>
+#include <vespa/vespalib/util/time.h>
+namespace storage::framework{
+ struct Clock;
+}
namespace storage::distributor {
class NodeInfo {
@@ -20,7 +23,7 @@ public:
bool isBusy(uint16_t idx) const;
- void setBusy(uint16_t idx, framework::MonotonicDuration for_duration);
+ void setBusy(uint16_t idx, vespalib::duration for_duration);
void incPending(uint16_t idx);
@@ -33,7 +36,7 @@ private:
SingleNodeInfo() : _pending(0), _busyUntilTime() {}
uint32_t _pending;
- mutable framework::MonotonicTimePoint _busyUntilTime;
+ mutable vespalib::steady_time _busyUntilTime;
};
mutable std::vector<SingleNodeInfo> _nodes;
diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h
index 57eed20f665..801c75322b3 100644
--- a/storage/src/vespa/storageapi/message/bucket.h
+++ b/storage/src/vespa/storageapi/message/bucket.h
@@ -14,7 +14,7 @@
#include <vespa/storageapi/messageapi/maintenancecommand.h>
#include <vespa/document/base/globalid.h>
#include <vespa/document/util/printable.h>
-#include <vespa/vespalib/util/array.h>
+#include <vespa/vespalib/stllike/allocator.h>
#include <vespa/storageapi/defs.h>
namespace document { class DocumentTypeRepo; }
@@ -237,7 +237,7 @@ private:
public:
explicit GetBucketDiffReply(const GetBucketDiffCommand& cmd);
- ~GetBucketDiffReply();
+ ~GetBucketDiffReply() override;
const std::vector<Node>& getNodes() const { return _nodes; }
Timestamp getMaxTimestamp() const { return _maxTimestamp; }
@@ -268,14 +268,14 @@ public:
const document::DocumentTypeRepo *_repo;
Entry();
- Entry(const GetBucketDiffCommand::Entry&);
+ explicit Entry(const GetBucketDiffCommand::Entry&);
Entry(const Entry &);
Entry & operator = (const Entry &);
Entry(Entry &&) = default;
Entry & operator = (Entry &&) = default;
- ~Entry();
+ ~Entry() override;
- bool filled() const;
+ [[nodiscard]] bool filled() const;
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
bool operator==(const Entry&) const;
};
@@ -358,7 +358,7 @@ public:
const std::vector<document::BucketId>& getBuckets() const { return _buckets; }
- bool hasSystemState() const { return (_state.get() != 0); }
+ bool hasSystemState() const { return bool(_state); }
uint16_t getDistributor() const { return _distributor; }
const lib::ClusterState& getSystemState() const { return *_state; }
@@ -387,8 +387,8 @@ public:
bool operator==(const Entry& e) const { return (_bucketId == e._bucketId && _info == e._info); }
bool operator!=(const Entry& e) const { return !(*this == e); }
- Entry() : _bucketId(), _info() {}
- Entry(const document::BucketId& id, const BucketInfo& info)
+ Entry() noexcept : _bucketId(), _info() {}
+ Entry(const document::BucketId& id, const BucketInfo& info) noexcept
: _bucketId(id), _info(info) {}
friend std::ostream& operator<<(std::ostream& os, const Entry&);
};
@@ -397,7 +397,7 @@ public:
bool two_phase_remove_location = false;
bool no_implicit_indexing_of_active_buckets = false;
};
- using EntryVector = vespalib::Array<Entry>;
+ using EntryVector = std::vector<Entry, vespalib::allocator_large<Entry>>;
private:
EntryVector _buckets;
bool _full_bucket_fetch;