diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-07 05:06:25 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-07 05:06:25 +0100 |
commit | 9e403aa14c9d00b958de14336a346e60f67d803a (patch) | |
tree | 2b1a0d9ab7240e6b2557d7194aeef8e840bf75e9 | |
parent | 8cc478d666c44c4fd7add6266dfea2c3e6b58b71 (diff) | |
parent | 9b95cd26d90665efa54cd7c4d7e872713fa72e44 (diff) |
Merge pull request #25893 from vespa-engine/revert-25874-balder/less-getSeconds
Revert "Ă˜ess use of getSeconds/getMicroSeconds/getMilliSeconds."
64 files changed, 769 insertions, 527 deletions
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index 290d05e9a59..a92cf121fab 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -37,10 +37,10 @@ struct MetricsTest : public Test { std::shared_ptr<BucketManagerMetrics> _bucketManagerMetrics; std::shared_ptr<VisitorMetrics> _visitorMetrics; - void createSnapshotForPeriod(std::chrono::seconds secs) const; + void createSnapshotForPeriod(std::chrono::seconds secs); void assertMetricLastValue(const std::string& name, int interval, - uint64_t expected) const; + uint64_t expected); MetricsTest(); ~MetricsTest() override; @@ -55,8 +55,8 @@ namespace { { framework::Clock& _clock; explicit MetricClock(framework::Clock& c) : _clock(c) {} - [[nodiscard]] time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); } - time_t getTimeInMilliSecs() const override { return vespalib::count_ms(_clock.getMonotonicTime().time_since_epoch()); } + time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); } + time_t getTimeInMilliSecs() const override { return _clock.getTimeInMillis().getTime(); } }; } @@ -137,8 +137,8 @@ void MetricsTest::createFakeLoad() disk.queueSize.addValue(4 * n); disk.averageQueueWaitingTime.addValue(10 * n); disk.pendingMerges.addValue(4 * n); - for (const auto & metric : disk.threads) { - FileStorThreadMetrics& thread(*metric); + for (uint32_t j=0; j<disk.threads.size(); ++j) { + FileStorThreadMetrics& thread(*disk.threads[j]); thread.operations.inc(120 * n); thread.failedOperations.inc(2 * n); @@ -180,8 +180,8 @@ void MetricsTest::createFakeLoad() thread.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue(0.8); } } - for (const auto & metric : _visitorMetrics->threads) { - VisitorThreadMetrics& thread(*metric); + for (uint32_t i=0; i<_visitorMetrics->threads.size(); ++i) { + VisitorThreadMetrics& thread(*_visitorMetrics->threads[i]); thread.queueSize.addValue(2); thread.averageQueueWaitingTime.addValue(10); thread.averageVisitorLifeTime.addValue(1000); @@ -192,7 +192,9 @@ void MetricsTest::createFakeLoad() } _clock->addSecondsToTime(60); _metricManager->timeChangedNotification(); - while (uint64_t(_metricManager->getLastProcessedTime()) < _clock->getTimeInSeconds().getTime()) { + while (uint64_t(_metricManager->getLastProcessedTime()) + < _clock->getTimeInSeconds().getTime()) + { std::this_thread::sleep_for(5ms); _metricManager->timeChangedNotification(); } @@ -282,7 +284,9 @@ TEST_F(MetricsTest, html_metrics_report) { } void -MetricsTest::assertMetricLastValue(const std::string& name, int interval, uint64_t expected) const +MetricsTest::assertMetricLastValue(const std::string& name, + int interval, + uint64_t expected) { std::ostringstream path; path << "metrics?interval=" << interval @@ -301,7 +305,7 @@ MetricsTest::assertMetricLastValue(const std::string& name, int interval, uint64 using namespace std::chrono_literals; void -MetricsTest::createSnapshotForPeriod(std::chrono::seconds secs) const +MetricsTest::createSnapshotForPeriod(std::chrono::seconds secs) { _clock->addSecondsToTime(secs.count()); _metricManager->timeChangedNotification(); diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index 94996346d73..91fdf5aa602 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -22,6 +22,18 @@ using storage::framework::defaultimplementation::ComponentRegisterImpl; namespace storage { +namespace { + template<typename T> + struct ConfigReader : public T::Subscriber, + public T + { + ConfigReader(const std::string& configId) { + T::subscribe(configId, *this); + } + void configure(const T& c) { dynamic_cast<T&>(*this) = c; } + }; +} + TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg, const lib::NodeType& type, NodeIndex index, vespalib::stringref configId) @@ -72,7 +84,7 @@ TestStorageApp::setDistribution(Redundancy redundancy, NodeCount nodeCount) void TestStorageApp::setTypeRepo(std::shared_ptr<const document::DocumentTypeRepo> repo) { - _compReg.setDocumentTypeRepo(std::move(repo)); + _compReg.setDocumentTypeRepo(repo); } void @@ -82,19 +94,21 @@ TestStorageApp::setClusterState(const lib::ClusterState& c) } void -TestStorageApp::waitUntilInitialized(StorageBucketDBInitializer* initializer, vespalib::duration timeout) const +TestStorageApp::waitUntilInitialized( + StorageBucketDBInitializer* initializer, framework::SecondTime timeout) { // Always use real clock for wait timeouts. Component clock may be faked // in tests framework::defaultimplementation::RealClock clock; - vespalib::steady_time endTime(clock.getMonotonicTime() + timeout); + framework::MilliSecTime endTime(clock.getTimeInMillis() + timeout.getMillis()); while (!isInitialized()) { std::this_thread::sleep_for(1ms); - vespalib::steady_time currentTime(clock.getMonotonicTime()); + framework::MilliSecTime currentTime(clock.getTimeInMillis()); if (currentTime > endTime) { std::ostringstream error; - error << "Failed to initialize service layer within timeout of " << vespalib::to_s(timeout) << " seconds."; - if (initializer != nullptr) { + error << "Failed to initialize service layer within timeout of " + << timeout << " seconds."; + if (initializer != 0) { error << " "; LOG(error, "%s", error.str().c_str()); throw std::runtime_error(error.str()); @@ -160,7 +174,7 @@ TestServiceLayerApp::setPersistenceProvider(PersistenceProviderUP provider) spi::PersistenceProvider& TestServiceLayerApp::getPersistenceProvider() { - if ( ! _persistenceProvider) { + if (_persistenceProvider.get() == 0) { throw vespalib::IllegalStateException("Persistence provider requested but not initialized.", VESPA_STRLOC); } return *_persistenceProvider; @@ -168,7 +182,7 @@ TestServiceLayerApp::getPersistenceProvider() namespace { template<typename T> - T getConfig(vespalib::stringref configId) { + const T getConfig(vespalib::stringref configId) { config::ConfigUri uri(configId); return *config::ConfigGetter<T>::getConfig(uri.getConfigId(), uri.getContext()); } @@ -178,9 +192,9 @@ void TestDistributorApp::configure(vespalib::stringref id) { if (id.empty()) return; - auto dc(getConfig<vespa::config::content::core::StorDistributormanagerConfig>(id)); + DistributorConfig dc(getConfig<vespa::config::content::core::StorDistributormanagerConfig>(id)); _compReg.setDistributorConfig(dc); - auto vc(getConfig<vespa::config::content::core::StorVisitordispatcherConfig>(id)); + VisitorConfig vc(getConfig<vespa::config::content::core::StorVisitordispatcherConfig>(id)); _compReg.setVisitorConfig(vc); } diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 7ca910721b3..de1dc99bb6e 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -68,7 +68,7 @@ public: TestStorageApp(StorageComponentRegisterImpl::UP compReg, const lib::NodeType&, NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = ""); - ~TestStorageApp() override; + ~TestStorageApp(); // Set functions, to be able to modify content while running. void setDistribution(Redundancy, NodeCount); @@ -77,12 +77,15 @@ public: // Utility functions for getting a hold of currently used bits. Practical // to avoid adding extra components in the tests. - virtual StorageComponentRegisterImpl& getComponentRegister() override { return _compReg; } + StorageComponentRegisterImpl& getComponentRegister() { return _compReg; } document::TestDocMan& getTestDocMan() { return _docMan; } - std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() { return _compReg.getTypeRepo(); } - const document::BucketIdFactory& getBucketIdFactory() { return _compReg.getBucketIdFactory(); } + std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() + { return _compReg.getTypeRepo(); } + const document::BucketIdFactory& getBucketIdFactory() + { return _compReg.getBucketIdFactory(); } TestNodeStateUpdater& getStateUpdater() { return _nodeStateUpdater; } - std::shared_ptr<lib::Distribution> & getDistribution() { return _compReg.getDistribution(); } + std::shared_ptr<lib::Distribution> & getDistribution() + { return _compReg.getDistribution(); } TestNodeStateUpdater& getNodeStateUpdater() { return _nodeStateUpdater; } uint16_t getIndex() const { return _compReg.getIndex(); } const NodeIdentity& node_identity() const noexcept { return _node_identity; } @@ -92,7 +95,9 @@ public: DoneInitializeHandler& getDoneInitializeHandler() { return *this; } void notifyDoneInitializing() override { _initialized = true; } bool isInitialized() const { return _initialized; } - void waitUntilInitialized(StorageBucketDBInitializer* initializer = nullptr, vespalib::duration timeout = 30s) const; + void waitUntilInitialized( + StorageBucketDBInitializer* initializer = 0, + framework::SecondTime timeout = framework::SecondTime(30)); private: // Storage server interface implementation (until we can remove it) @@ -111,14 +116,14 @@ class TestServiceLayerApp : public TestStorageApp HostInfo _host_info; public: - explicit TestServiceLayerApp(vespalib::stringref configId); - explicit TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = ""); - ~TestServiceLayerApp() override; + TestServiceLayerApp(vespalib::stringref configId); + TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = ""); + ~TestServiceLayerApp(); void setupDummyPersistence(); void setPersistenceProvider(PersistenceProviderUP); - ServiceLayerComponentRegisterImpl& getComponentRegister() override { return _compReg; } + ServiceLayerComponentRegisterImpl& getComponentRegister() { return _compReg; } HostInfo &get_host_info() noexcept { return _host_info; } spi::PersistenceProvider& getPersistenceProvider(); @@ -148,7 +153,7 @@ public: explicit TestDistributorApp(NodeIndex index, vespalib::stringref configId = ""); ~TestDistributorApp() override; - DistributorComponentRegisterImpl& getComponentRegister() override { + DistributorComponentRegisterImpl& getComponentRegister() { return _compReg; } diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp index 42ee4675e26..5d11f9653ea 100644 --- a/storage/src/tests/distributor/bucketstateoperationtest.cpp +++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp @@ -43,7 +43,7 @@ TEST_F(BucketStateOperationTest, activate_single_node) { SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(1, _sender.commands().size()); @@ -79,7 +79,7 @@ TEST_F(BucketStateOperationTest, activate_and_deactivate_nodes) { SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(1, _sender.commands().size()); { @@ -135,7 +135,7 @@ TEST_F(BucketStateOperationTest, do_not_deactivate_if_activate_fails) { SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(1, _sender.commands().size()); { @@ -178,7 +178,7 @@ TEST_F(BucketStateOperationTest, bucket_db_not_updated_on_failure) { SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(1, _sender.commands().size()); diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index c2f4836f4cb..1a104727f43 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -139,7 +139,7 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { TEST_F(GarbageCollectionOperationTest, simple_legacy) { auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); EXPECT_FALSE(op->is_two_phase()); ASSERT_EQ(2, _sender.commands().size()); @@ -158,7 +158,7 @@ TEST_F(GarbageCollectionOperationTest, simple_legacy) { TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) { auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); EXPECT_EQ(0u, gc_removed_documents_metric()); @@ -175,7 +175,7 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) { auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); reply_to_nth_request(*op, 0, 1234, 0); @@ -195,20 +195,20 @@ TEST_F(GarbageCollectionOperationTest, two_phase_gc_requires_config_enabling_and // Config enabled, but only 1 node says it supports two-phase RemoveLocation auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); EXPECT_FALSE(op->is_two_phase()); // Node 0 suddenly upgraded...! set_node_supported_features(0, with_two_phase); op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); EXPECT_TRUE(op->is_two_phase()); // But doesn't matter if two-phase GC is config-disabled config_enable_two_phase_gc(false); op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); EXPECT_FALSE(op->is_two_phase()); } @@ -216,7 +216,7 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l enable_two_phase_gc(); auto op = create_op(); op->setPriority(getConfig().getMaintenancePriorities().garbageCollection); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); for (int i : {0, 1}) { @@ -229,7 +229,7 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_union_of_returned_entries_with_feed_pri) { enable_two_phase_gc(); auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); auto r1 = make_remove_location_reply(*_sender.command(0)); @@ -256,7 +256,7 @@ TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_un TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_results) { enable_two_phase_gc(); auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); auto r1 = make_remove_location_reply(*_sender.command(0)); @@ -272,7 +272,7 @@ TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_res TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) { enable_two_phase_gc(); auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); auto r1 = make_remove_location_reply(*_sender.command(0)); @@ -314,7 +314,7 @@ struct GarbageCollectionOperationPhase1FailureTest : GarbageCollectionOperationT enable_two_phase_gc(); _op = create_op(); - _op->start(_sender); + _op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); _r1 = make_remove_location_reply(*_sender.command(0)); @@ -367,7 +367,7 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) { enable_two_phase_gc(); auto op = create_op(); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); auto r1 = make_remove_location_reply(*_sender.command(0)); diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 8d188f6c005..6b27af63fb7 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -4,6 +4,7 @@ #include <vespa/config/helper/configgetter.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/config/documenttypes_config_fwd.h> +#include <vespa/document/config/config-documenttypes.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> @@ -61,7 +62,7 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(), msg, metrics().gets, consistency); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); } static constexpr uint32_t LastCommand = UINT32_MAX; diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp index 570fe24679e..bc87893b610 100644 --- a/storage/src/tests/distributor/joinbuckettest.cpp +++ b/storage/src/tests/distributor/joinbuckettest.cpp @@ -46,7 +46,7 @@ TEST_F(JoinOperationTest, simple) { document::BucketId(33, 0x100000001))); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}); @@ -103,7 +103,7 @@ TEST_F(JoinOperationTest, send_sparse_joins_to_nodes_without_both_source_buckets document::BucketId(33, 0x100000001))); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}})); ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}})); diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 512c092d8ae..9e0c89819a7 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -71,7 +71,7 @@ MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes, Pr auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes)); op->setIdealStateManager(&getIdealStateManager()); op->setPriority(merge_pri); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); return op; } @@ -291,7 +291,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) { MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " @@ -352,7 +352,7 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) { MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); std::string merge( "MergeBucketCommand(BucketId(0x4000000000000001), to time " @@ -501,7 +501,7 @@ TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) { const uint16_t max_merge_size = 2; MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2, 3)), max_merge_size); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); // Must include missing node 0 and not just 2 existing replicas EXPECT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 3bfa1027a82..0ba374f7190 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -175,7 +175,7 @@ TEST_F(PendingMessageTrackerTest, simple) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> " + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> " "Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" "</ul>\n")); } @@ -248,17 +248,17 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" "</ul>\n" "<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000011d7))</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n")); } { @@ -268,23 +268,44 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { EXPECT_THAT(ost.str(), HasSubstr( "<b>Node 0 (pending count: 4)</b>\n" "<ul>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n" "<b>Node 1 (pending count: 4)</b>\n" "<ul>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" - "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" + "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n" "</ul>\n")); } } namespace { +template <typename T> +std::string setToString(const std::set<T>& s) +{ + std::ostringstream ost; + ost << '{'; + for (typename std::set<T>::const_iterator i(s.begin()), e(s.end()); + i != e; ++i) + { + if (i != s.begin()) { + ost << ','; + } + ost << *i; + } + ost << '}'; + return ost.str(); +} + +} + +namespace { + class TestChecker : public PendingMessageTracker::Checker { public: @@ -422,7 +443,7 @@ TEST_F(PendingMessageTrackerTest, busy_reply_marks_node_as_busy) { TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); - f.tracker().setNodeBusyDuration(10s); + f.tracker().setNodeBusyDuration(std::chrono::seconds(10)); f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0)); f.clock().addSecondsToTime(11); diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 735666e5c89..53773a55826 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -76,7 +76,7 @@ public: getDistributorBucketSpace(), msg, metrics().puts); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); } const document::DocumentType& doc_type() const { diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp index 68d86884036..971ff36c833 100644 --- a/storage/src/tests/distributor/removebucketoperationtest.cpp +++ b/storage/src/tests/distributor/removebucketoperationtest.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "dummy_cluster_context.h" +#include <tests/common/dummystoragelink.h> #include <tests/distributor/distributor_stripe_test_util.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/top_level_distributor.h> @@ -36,9 +37,9 @@ TEST_F(RemoveBucketOperationTest, simple) { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), - toVector<uint16_t>(1,2))); + toVector<uint16_t>(1,2))); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Delete bucket => 1," @@ -70,7 +71,7 @@ TEST_F(RemoveBucketOperationTest, bucket_info_mismatch_failure) { BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(1))); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true)); ASSERT_EQ(1, _sender.commands().size()); @@ -105,7 +106,7 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) { BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(1))); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true)); ASSERT_EQ(1, _sender.commands().size()); diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp index b19a448199b..889b5c833af 100644 --- a/storage/src/tests/distributor/removelocationtest.cpp +++ b/storage/src/tests/distributor/removelocationtest.cpp @@ -35,7 +35,7 @@ struct RemoveLocationOperationTest : Test, DistributorStripeTestUtil { msg, metrics().removelocations); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); } }; diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index ed24b7271b8..b3104cae623 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -1,5 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <iomanip> #include <tests/distributor/distributor_stripe_test_util.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/top_level_distributor.h> @@ -40,7 +41,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { msg, metrics().removes); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); } void replyToMessage(RemoveOperation& callback, @@ -54,7 +55,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { std::shared_ptr<api::StorageMessage> msg2 = _sender.command(index); auto* removec = dynamic_cast<api::RemoveCommand*>(msg2.get()); std::unique_ptr<api::StorageReply> reply(removec->makeReply()); - auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get()); + auto* removeR = static_cast<api::RemoveReply*>(reply.get()); removeR->setOldTimestamp(oldTimestamp); callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release())); } diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index edb392d9532..1e951029994 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -65,7 +65,7 @@ TEST_F(SplitOperationTest, simple) { splitByteSize); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); { ASSERT_EQ(1, _sender.commands().size()); @@ -134,7 +134,7 @@ TEST_F(SplitOperationTest, multi_node_failure) { splitByteSize); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); { ASSERT_EQ(2, _sender.commands().size()); @@ -218,7 +218,7 @@ TEST_F(SplitOperationTest, copy_trusted_status_not_carried_over_after_split) { splitByteSize); op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(3, _sender.commands().size()); diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp index ec0165dde05..e3323d601df 100644 --- a/storage/src/tests/distributor/statoperationtest.cpp +++ b/storage/src/tests/distributor/statoperationtest.cpp @@ -35,7 +35,7 @@ TEST_F(StatOperationTest, bucket_info) { std::make_shared<api::StatBucketCommand>( makeDocumentBucket(document::BucketId(16, 5)), "")); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Statbucket => 0,Statbucket => 1", _sender.getCommands(true)); @@ -76,7 +76,7 @@ TEST_F(StatOperationTest, bucket_list) { getIdealStateManager(), node_context().node_index(), msg); - op.start(_sender); + op.start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(1, _sender.replies().size()); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 579fd156962..6eb15bf05e7 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -154,7 +154,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase); configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); return cb; } @@ -342,7 +342,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, TEST_F(TwoPhaseUpdateOperationTest, simple) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cb = sendUpdate("0=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0", _sender.getCommands(true)); @@ -359,7 +359,7 @@ TEST_F(TwoPhaseUpdateOperationTest, simple) { TEST_F(TwoPhaseUpdateOperationTest, non_existing) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cb = sendUpdate(""); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)", @@ -371,7 +371,7 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing) { TEST_F(TwoPhaseUpdateOperationTest, update_failed) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cb = sendUpdate("0=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0", _sender.getCommands(true)); @@ -386,7 +386,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_failed) { TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); @@ -413,7 +413,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) { TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); @@ -434,7 +434,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_error) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); @@ -451,7 +451,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_err TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); @@ -473,7 +473,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); @@ -501,7 +501,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_started) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); @@ -526,7 +526,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_st TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsistent_split) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3", UpdateOptions().makeInconsistentSplit(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); std::string wanted("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0," "Get(BucketId(0x440000000000cac4), id:ns:testdoctype1::1) => 0"); @@ -567,7 +567,7 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo( TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_update) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cb = sendUpdate("0=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0", _sender.getCommands(true)); @@ -578,7 +578,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_upd TEST_F(TwoPhaseUpdateOperationTest, n_of_m) { setup_stripe(2, 2, "storage:2 distributor:1", 1); auto cb = sendUpdate("0=1/2/3,1=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); @@ -606,7 +606,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document) setup_stripe(3, 3, "storage:3 distributor:1"); // 0,1 in sync. 2 out of sync. auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0," "Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 2", @@ -639,7 +639,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document) TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_all_empty_gets) { setup_stripe(3, 3, "storage:3 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); replyToGet(*cb, _sender, 0, 0, false); @@ -670,7 +670,7 @@ TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_a TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) { setup_stripe(3, 3, "storage:3 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); replyToGet(*cb, _sender, 0, 0, false); @@ -697,7 +697,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) { TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); replyToGet(*cb, _sender, 0, 0, false, api::ReturnCode::IO_FAILURE); @@ -717,7 +717,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) { setup_stripe(2, 2, "storage:2 distributor:1"); // Create update for wrong doctype which will fail the update. auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError()); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); replyToGet(*cb, _sender, 0, 50); @@ -737,7 +737,7 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) " "Reasons to start: => 0," @@ -766,7 +766,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_time setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().timestampToUpdate(1234)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); replyToGet(*cb, _sender, 0, 100); @@ -786,7 +786,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_time TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings_to_gets_and_puts) { setup_stripe(3, 3, "storage:3 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); checkMessageSettingsPropagatedTo(_sender.command(0)); @@ -805,7 +805,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replies) { setup_stripe(3, 3, "storage:3 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); replyToGet(*cb, _sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings"); @@ -832,7 +832,7 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec setup_stripe(2, 2, "storage:2 distributor:1"); // Update towards inconsistent bucket invokes safe path. auto cb = sendUpdate("0=1/2/3,1=2/3/4"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); @@ -875,7 +875,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_ setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120")); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); // Newest doc has headerval==110, not 120. replyToGet(*cb, _sender, 0, 100); replyToGet(*cb, _sender, 1, 110); @@ -894,7 +894,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_up setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==110")); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); replyToGet(*cb, _sender, 0, 100); replyToGet(*cb, _sender, 1, 110); ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2)); @@ -904,7 +904,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.san==fran...cisco")); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); replyToGet(*cb, _sender, 0, 100); replyToGet(*cb, _sender, 1, 110); // NOTE: condition is currently not attempted parsed until Gets have been @@ -924,7 +924,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_w setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("langbein.headerval=1234")); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); replyToGet(*cb, _sender, 0, 100); replyToGet(*cb, _sender, 1, 110); // NOTE: condition is currently not attempted parsed until Gets have been @@ -943,7 +943,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_ setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120")); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); // Both Gets return nothing at all, nothing at all. replyToGet(*cb, _sender, 0, 100, false); replyToGet(*cb, _sender, 1, 110, false); @@ -964,7 +964,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_aut .condition("testdoctype1.headerval==120") .createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); replyToGet(*cb, _sender, 0, 0, false); replyToGet(*cb, _sender, 1, 0, false); ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2)); @@ -999,7 +999,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) { setup_stripe(1, 1, "storage:1 distributor:1"); // Only 1 replica; consistent with itself by definition. auto cb = sendUpdate("0=1/2/3"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0", _sender.getCommands(true)); // Close the operation. This should generate a single reply that is @@ -1017,7 +1017,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) { setup_stripe(2, 2, "storage:2 distributor:1"); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); // Closing the operation should now only return an ABORTED reply for @@ -1037,7 +1037,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_re configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); Timestamp old_timestamp = 500; ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); @@ -1065,7 +1065,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); Timestamp old_timestamp = 500; ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); @@ -1086,7 +1086,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); // Replica set changes between time of Get requests sent and // responses received. This may happen e.g. if concurrent mutations @@ -1112,7 +1112,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_foun configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); replyToGet(*cb, _sender, 0, Timestamp(0), false); @@ -1131,7 +1131,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_no_initial_replic // No replicas, technically consistent but cannot use fast path. auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); } @@ -1146,7 +1146,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0)); @@ -1156,7 +1156,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency TEST_F(TwoPhaseUpdateOperationTest, operation_is_rejected_in_safe_path_if_feed_is_blocked) { set_up_distributor_with_feed_blocked_state(); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas to trigger safe path - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) " @@ -1275,7 +1275,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_ cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true); configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); // Add new replica to deterministic test bucket after gets have been sent BucketId bucket(0x400000000000cac4); // Always the same in the test. addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); @@ -1299,7 +1299,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_cannot_restart_in_fast_path) { cfg->set_update_fast_path_restart_enabled(true); configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); reply_to_metadata_get(*cb, _sender, 0, 1000U); @@ -1360,7 +1360,7 @@ TEST_F(ThreePhaseUpdateTest, safe_mode_is_implicitly_triggered_if_no_replicas_ex cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true); configure_stripe(cfg); auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) " "Reasons to start: => 0," @@ -1419,7 +1419,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_tombstone_is_no_op_without_auto_cre cfg->set_update_fast_path_restart_enabled(true); configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); reply_to_metadata_get(*cb, _sender, 0, 1000U); @@ -1443,7 +1443,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_tombstone_sends_puts_with_auto_crea cfg->set_update_fast_path_restart_enabled(true); configure_stripe(cfg); auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true)); - cb->start(_sender); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); reply_to_metadata_get(*cb, _sender, 0, 1000U); diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index d0ae31b9524..f0cb30368cb 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -1,5 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> #include <tests/distributor/distributor_stripe_test_util.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/repo/documenttyperepo.h> @@ -90,7 +91,7 @@ TEST_F(UpdateOperationTest, simple) { std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3")); DistributorMessageSenderStub sender; - cb->start(sender); + cb->start(sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0", sender.getCommands(true)); @@ -109,7 +110,7 @@ TEST_F(UpdateOperationTest, not_found) { std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3")); DistributorMessageSenderStub sender; - cb->start(sender); + cb->start(sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0", sender.getCommands(true)); @@ -124,7 +125,7 @@ TEST_F(UpdateOperationTest, multi_node) { setup_stripe(2, 2, "distributor:1 storage:2"); std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); DistributorMessageSenderStub sender; - cb->start(sender); + cb->start(sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); @@ -148,7 +149,7 @@ TEST_F(UpdateOperationTest, multi_node_inconsistent_timestamp) { setup_stripe(2, 2, "distributor:1 storage:2"); std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); DistributorMessageSenderStub sender; - cb->start(sender); + cb->start(sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); @@ -168,7 +169,7 @@ TEST_F(UpdateOperationTest, test_and_set_failures_increment_tas_metric) { setup_stripe(2, 2, "distributor:1 storage:1"); std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3")); DistributorMessageSenderStub sender; - cb->start(sender); + cb->start(sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0", sender.getCommands(true)); api::ReturnCode result(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, "bork bork"); replyToMessage(*cb, sender, 0, 1234, api::BucketInfo(), result); @@ -197,7 +198,7 @@ TEST_F(UpdateOperationTest, create_if_missing_update_sentinel_timestamp_is_treat setup_stripe(2, 2, "distributor:1 storage:2"); std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3", true)); DistributorMessageSenderStub sender; - cb->start(sender); + cb->start(sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); @@ -219,7 +220,7 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest setup_stripe(2, 3, "distributor:1 storage:3"); std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3", true)); DistributorMessageSenderStub sender; - cb->start(sender); + cb->start(sender, framework::MilliSecTime(0)); ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true)); replyToMessage(*cb, sender, 0, 100); // Newly created diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index ecfa7232def..6c597b620dd 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -41,7 +41,7 @@ struct VisitorOperationTest : Test, DistributorStripeTestUtil { document::BucketId nullId; VisitorOperation::Config defaultConfig; - static api::CreateVisitorCommand::SP + api::CreateVisitorCommand::SP createVisitorCommand(std::string instanceId, document::BucketId superBucket, document::BucketId lastBucket, @@ -122,7 +122,7 @@ struct VisitorOperationTest : Test, DistributorStripeTestUtil { */ std::string runEmptyVisitor(api::CreateVisitorCommand::SP msg) { auto op = createOpWithDefaultConfig(std::move(msg)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); return _sender.getLastReply(); } @@ -178,7 +178,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState) auto op = createOpWithDefaultConfig(std::move(msg)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -228,7 +228,7 @@ TEST_F(VisitorOperationTest, shutdown) { auto op = createOpWithDefaultConfig(std::move(msg)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -296,7 +296,7 @@ TEST_F(VisitorOperationTest, no_resend_after_timeout_passed) { auto op = createOpWithDefaultConfig( createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20ms)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -346,7 +346,7 @@ TEST_F(VisitorOperationTest, user_single_bucket) { "dumpvisitor", "true")); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)) << _sender.getLastReply(); sendReply(*op); @@ -371,7 +371,7 @@ VisitorOperationTest::runVisitor(document::BucketId id, "dumpvisitor", "true")); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); sendReply(*op); @@ -437,7 +437,7 @@ TEST_F(VisitorOperationTest, bucket_removed_while_visitor_pending) { auto op = createOpWithDefaultConfig( createVisitorCommand("removefrombucketdb", id, nullId)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -459,7 +459,7 @@ TEST_F(VisitorOperationTest, empty_buckets_visited_when_visiting_removes) { auto op = createOpWithDefaultConfig( createVisitorCommand("emptybucket", id, nullId, 8, 500ms, false, true)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); // Since visitRemoves is true, the empty bucket will be visited ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -474,7 +474,7 @@ TEST_F(VisitorOperationTest, resend_to_other_storage_node_on_failure) { auto op = createOpWithDefaultConfig( createVisitorCommand("emptyinconsistent", id, nullId)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -502,7 +502,7 @@ TEST_F(VisitorOperationTest, timeout_only_after_reply_from_all_storage_nodes) { nullId, 8)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0,Visitor Create => 1", _sender.getCommands(true)); @@ -539,7 +539,7 @@ TEST_F(VisitorOperationTest, timeout_does_not_override_critical_error) { 8, 500ms)); // ms timeout - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0,Visitor Create => 1", _sender.getCommands(true)); @@ -614,7 +614,7 @@ TEST_F(VisitorOperationTest, bucket_high_bit_count) { "dumpvisitor", "true")); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); EXPECT_EQ("Visitor Create => 0", _sender.getCommands(true)); } @@ -640,7 +640,7 @@ TEST_F(VisitorOperationTest, bucket_low_bit_count) { "dumpvisitor", "true")); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " "ReturnCode(WRONG_DISTRIBUTION, distributor:1 storage:1)", _sender.getLastReply()); @@ -661,7 +661,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node) { createVisitorCommand("multiplebuckets", id, nullId, 31), VisitorOperation::Config(1, 4)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0,Visitor Create => 0," "Visitor Create => 0,Visitor Create => 0", @@ -708,7 +708,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node) { createVisitorCommand("multiplebuckets", id, document::BucketId(0x54000000000f0001), 31), VisitorOperation::Config(minBucketsPerVisitor, maxVisitorsPerNode)); - op2->start(_sender); + op2->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -736,7 +736,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_resend_only_failing) { createVisitorCommand("multiplebuckets", id, nullId, 31), VisitorOperation::Config(minBucketsPerVisitor, maxVisitorsPerNode)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0,Visitor Create => 0," "Visitor Create => 0,Visitor Create => 0", @@ -775,7 +775,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node_one_super_buc createVisitorCommand("multiplebucketsonesuper", id, nullId), VisitorOperation::Config(5, 4)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -833,7 +833,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) { createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500ms, true), VisitorOperation::Config(5, 4)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 1", _sender.getCommands(true)); @@ -858,7 +858,7 @@ TEST_F(VisitorOperationTest, visit_ideal_node) { auto op = createOpWithDefaultConfig( createVisitorCommand("multinode", id, nullId, 8)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -889,7 +889,7 @@ TEST_F(VisitorOperationTest, no_resending_on_critical_failure) { auto op = createOpWithDefaultConfig( createVisitorCommand("multinodefailurecritical", id, nullId, 8)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -913,7 +913,7 @@ TEST_F(VisitorOperationTest, failure_on_all_nodes) { auto op = createOpWithDefaultConfig( createVisitorCommand("multinodefailurecritical", id, nullId, 8)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); @@ -992,7 +992,7 @@ VisitorOperationTest::startOperationWith2StorageNodeVisitors(bool inconsistent) 500ms, inconsistent)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); assert(_sender.getCommands(true) == "Visitor Create => 0,Visitor Create => 1"); return op; @@ -1043,7 +1043,7 @@ TEST_F(VisitorOperationTest, queue_timeout_is_factor_of_total_timeout) { auto op = createOpWithDefaultConfig( createVisitorCommand("foo", id, nullId, 8, 10000ms)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0)); @@ -1061,7 +1061,7 @@ VisitorOperationTest::do_visitor_roundtrip_with_statistics( auto op = createOpWithDefaultConfig( createVisitorCommand("metricstats", id, nullId)); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0)); auto reply = cmd.makeReply(); @@ -1109,7 +1109,7 @@ TEST_F(VisitorOperationTest, assigning_put_lock_access_token_sets_special_visito auto op = createOpWithDefaultConfig(createVisitorCommand("metricstats", id, nullId)); op->assign_put_lock_access_token("its-a me, mario"); - op->start(_sender); + op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); auto cmd = std::dynamic_pointer_cast<api::CreateVisitorCommand>(_sender.command(0)); ASSERT_TRUE(cmd); diff --git a/storage/src/tests/storageframework/clock/timetest.cpp b/storage/src/tests/storageframework/clock/timetest.cpp index 9dbcdd409d8..e1270156aa0 100644 --- a/storage/src/tests/storageframework/clock/timetest.cpp +++ b/storage/src/tests/storageframework/clock/timetest.cpp @@ -8,12 +8,16 @@ namespace storage::framework::defaultimplementation { TEST(TimeTest, testBasics) { + SecondTime timeSec(1); - MilliSecTime timeMillis(1000); + MilliSecTime timeMillis = timeSec.getMillis(); EXPECT_EQ(uint64_t(1000), timeMillis.getTime()); + EXPECT_EQ(timeSec, timeMillis.getSeconds()); - MicroSecTime timeMicros = timeMillis.getMicros(); + MicroSecTime timeMicros = timeSec.getMicros(); + EXPECT_EQ(timeSec.getMicros(), timeMillis.getMicros()); EXPECT_EQ(timeMillis, timeMicros.getMillis()); + EXPECT_EQ(timeSec, timeMicros.getSeconds()); MicroSecTime timeMicros2 = timeMicros; EXPECT_EQ(timeMicros2, timeMicros); @@ -28,6 +32,7 @@ TEST(TimeTest, testBasics) MilliSecTime timeMillis2 = timeMicros2.getMillis(); EXPECT_GT(timeMillis2, timeMillis); EXPECT_EQ(uint64_t(1050), timeMillis2.getTime()); + EXPECT_EQ(timeSec, timeMillis2.getSeconds()); } TEST(TimeTest, testCreatedFromClock) @@ -35,6 +40,7 @@ TEST(TimeTest, testCreatedFromClock) defaultimplementation::FakeClock clock; clock.setAbsoluteTimeInSeconds(600); + EXPECT_EQ(SecondTime(600), SecondTime(clock)); EXPECT_EQ(MilliSecTime(600 * 1000), MilliSecTime(clock)); EXPECT_EQ(MicroSecTime(600 * 1000 * 1000), MicroSecTime(clock)); } @@ -45,6 +51,7 @@ TEST(TimeTest, canAssignMicrosecondResolutionTimeToFakeClock) clock.setAbsoluteTimeInMicroSeconds(1234567); // 1.234567 seconds // All non-microsec time points must necessarily be truncated. + EXPECT_EQ(SecondTime(1), SecondTime(clock)); EXPECT_EQ(MilliSecTime(1234), MilliSecTime(clock)); EXPECT_EQ(MicroSecTime(1234567), MicroSecTime(clock)); } diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp index 08355c105d5..47d70cf436e 100644 --- a/storage/src/tests/storageserver/statereportertest.cpp +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -25,8 +25,8 @@ namespace storage { class DummyApplicationGenerationFether : public ApplicationGenerationFetcher { public: - [[nodiscard]] int64_t getGeneration() const override { return 1; } - [[nodiscard]] std::string getComponentName() const override { return "component"; } + int64_t getGeneration() const override { return 1; } + std::string getComponentName() const override { return "component"; } }; struct StateReporterTest : Test { @@ -54,8 +54,8 @@ struct MetricClock : public metrics::MetricManager::Timer { framework::Clock& _clock; explicit MetricClock(framework::Clock& c) : _clock(c) {} - [[nodiscard]] time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); } - [[nodiscard]] time_t getTimeInMilliSecs() const override { return vespalib::count_ms(_clock.getSystemTime().time_since_epoch()); } + time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); } + time_t getTimeInMilliSecs() const override { return _clock.getTimeInMillis().getTime(); } }; } @@ -245,8 +245,8 @@ TEST_F(StateReporterTest, report_metrics) { "/state/v1/metrics?consumer=status" }; - for (auto & path_str : paths) { - framework::HttpUrlPath path(path_str); + for (int i = 0; i < pathCount; i++) { + framework::HttpUrlPath path(paths[i]); std::ostringstream ost; _stateReporter->reportStatus(ost, path); std::string jsonData = ost.str(); diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h index 4479b194396..c0c3b8429b2 100644 --- a/storage/src/tests/storageserver/testvisitormessagesession.h +++ b/storage/src/tests/storageserver/testvisitormessagesession.h @@ -32,7 +32,10 @@ public: std::deque<std::unique_ptr<documentapi::DocumentMessage> > sentMessages; - TestVisitorMessageSession(VisitorThread& t, Visitor& v, const mbus::Error& autoReplyError, bool autoReply); + TestVisitorMessageSession(VisitorThread& t, + Visitor& v, + const mbus::Error& autoReplyError, + bool autoReply); void reply(mbus::Reply::UP rep); uint32_t pending() override { return pendingCount; } diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index a82514acb03..be4e7270c69 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -217,7 +217,7 @@ VisitorManagerTest::getSession(uint32_t n) // Wait until we have started the visitor const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - vespalib::steady_time endTime = clock.getMonotonicTime() + 30s; + framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); while (true) { { std::lock_guard lock(_messageSessionFactory->_accessLock); @@ -225,8 +225,9 @@ VisitorManagerTest::getSession(uint32_t n) return *sessions[n]; } } - if (clock.getMonotonicTime() > endTime) { - throw vespalib::IllegalStateException("Timed out waiting for visitor session", VESPA_STRLOC); + if (clock.getTimeInMillis() > endTime) { + throw vespalib::IllegalStateException( + "Timed out waiting for visitor session", VESPA_STRLOC); } std::this_thread::sleep_for(10ms); } @@ -254,10 +255,12 @@ VisitorManagerTest::getMessagesAndReply( switch (session.sentMessages[i]->getType()) { case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT: - docs.push_back(static_cast<documentapi::PutDocumentMessage&>(*session.sentMessages[i]).getDocumentSP()); + docs.push_back(static_cast<documentapi::PutDocumentMessage&>( + *session.sentMessages[i]).getDocumentSP()); break; case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(*session.sentMessages[i]).getDocumentId()); + docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>( + *session.sentMessages[i]).getDocumentId()); break; default: break; @@ -352,7 +355,10 @@ TEST_F(VisitorManagerTest, normal_usage) { getMessagesAndReply(1, getSession(0), docs, docIds); // All data has been replied to, expecting to get a create visitor reply - ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK, int(docs.size()), getTotalSerializedSize(docs))); + ASSERT_NO_FATAL_FAILURE( + verifyCreateVisitorReply(api::ReturnCode::OK, + int(docs.size()), + getTotalSerializedSize(docs))); EXPECT_EQ(1u, getMatchingDocuments(docs)); EXPECT_FALSE(_manager->hasPendingMessageState()); diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index 565131b3b99..f3a538b7832 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -256,9 +256,11 @@ TestVisitorMessageSession& VisitorTest::getSession(uint32_t n) { // Wait until we have started the visitor - const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); + const std::vector<TestVisitorMessageSession*>& sessions( + _messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - vespalib::steady_time endTime = clock.getMonotonicTime() + 30s; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); while (true) { { std::lock_guard lock(_messageSessionFactory->_accessLock); @@ -266,7 +268,7 @@ VisitorTest::getSession(uint32_t n) return *sessions[n]; } } - if (clock.getMonotonicTime() > endTime) { + if (clock.getTimeInMillis() > endTime) { throw vespalib::IllegalStateException( "Timed out waiting for visitor session", VESPA_STRLOC); } diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index cf98585bc82..9fafc87688f 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -279,8 +279,8 @@ void BucketManager::updateMinUsedBits() // Responsible for sending on messages that was previously queued void BucketManager::run(framework::ThreadHandle& thread) { - constexpr vespalib::duration CHECK_MINUSEDBITS_INTERVAL = 30s; - vespalib::steady_time timeToCheckMinUsedBits = vespalib::steady_time::min(); + const int64_t CHECK_MINUSEDBITS_INTERVAL = 1000*30; + framework::MilliSecTime timeToCheckMinUsedBits(0); while (!thread.interrupted()) { bool didWork = false; BucketInfoRequestMap infoReqs; @@ -305,9 +305,10 @@ void BucketManager::run(framework::ThreadHandle& thread) thread.registerTick(framework::PROCESS_CYCLE); } } - if (timeToCheckMinUsedBits < _component.getClock().getMonotonicTime()) { + if (timeToCheckMinUsedBits < _component.getClock().getTimeInMillis()) { updateMinUsedBits(); - timeToCheckMinUsedBits = _component.getClock().getMonotonicTime() + CHECK_MINUSEDBITS_INTERVAL; + timeToCheckMinUsedBits = _component.getClock().getTimeInMillis(); + timeToCheckMinUsedBits += framework::MilliSecTime(CHECK_MINUSEDBITS_INTERVAL); } } } diff --git a/storage/src/vespa/storage/common/statusmetricconsumer.cpp b/storage/src/vespa/storage/common/statusmetricconsumer.cpp index e9360c35f3c..9ffb044b0a5 100644 --- a/storage/src/vespa/storage/common/statusmetricconsumer.cpp +++ b/storage/src/vespa/storage/common/statusmetricconsumer.cpp @@ -15,20 +15,32 @@ LOG_SETUP(".status.metricreporter"); namespace storage { -StatusMetricConsumer::StatusMetricConsumer(StorageComponentRegister& compReg, metrics::MetricManager& manager, const std::string& name) +StatusMetricConsumer::StatusMetricConsumer( + StorageComponentRegister& compReg, metrics::MetricManager& manager, + const std::string& name) : framework::StatusReporter("metrics", "Performance metrics"), _manager(manager), _component(compReg, "statusmetricsconsumer"), _name(name), - _lock() + _lock(), + _startTime(_component.getClock().getTimeInSeconds()), + _processedTime(0) { LOG(debug, "Started metrics consumer"); setlocale(LC_NUMERIC, ""); + _component.registerMetricUpdateHook(*this, 3600s); _component.registerStatusPage(*this); } StatusMetricConsumer::~StatusMetricConsumer() = default; +void +StatusMetricConsumer::updateMetrics(const MetricLockGuard & guard) +{ + metrics::MemoryConsumption::UP mc(_manager.getMemoryConsumption(guard)); + // TODO is this hook needed anymore? +} + vespalib::string StatusMetricConsumer::getReportContentType(const framework::HttpUrlPath& path) const { @@ -65,7 +77,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out, } else { LOG(debug, "Not calling update hooks as dontcallupdatehooks option has been given"); } - int64_t currentTimeS(vespalib::count_s(_component.getClock().getMonotonicTime().time_since_epoch())); + framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); bool xml = (path.getAttribute("format") == "xml"); bool json = (path.getAttribute("format") == "json"); @@ -77,7 +89,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out, if (path.hasAttribute("task") && path.getAttribute("task") == "reset") { std::lock_guard guard(_lock); - _manager.reset(currentTimeS); + _manager.reset(currentTime.getTime()); } if (path.hasAttribute("interval")) { @@ -88,7 +100,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out, const metrics::MetricSnapshot* snapshot; if (interval == -2) { snapshot = &_manager.getActiveMetrics(metricLock); - _manager.getActiveMetrics(metricLock).setToTime(currentTimeS); + _manager.getActiveMetrics(metricLock).setToTime(currentTime.getTime()); } else if (interval == -1) { // "Prime" the metric structure by first fetching the set of active // metrics (complete with structure) and resetting these. This @@ -100,17 +112,19 @@ StatusMetricConsumer::reportStatus(std::ostream& out, _manager.getActiveMetrics(metricLock).getMetrics(), copyUnset); generated->reset(0); - _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTimeS); - _manager.getActiveMetrics(metricLock).addToSnapshot(*generated, currentTimeS); + _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTime.getTime()); + _manager.getActiveMetrics(metricLock).addToSnapshot(*generated, currentTime.getTime()); generated->setFromTime(_manager.getTotalMetricSnapshot(metricLock).getFromTime()); snapshot = generated.get(); } else if (interval == 0) { if (copyUnset) { generated = std::make_unique<metrics::MetricSnapshot>( - _manager.getTotalMetricSnapshot(metricLock).getName(), 0, - _manager.getActiveMetrics(metricLock).getMetrics(), true); + _manager.getTotalMetricSnapshot(metricLock).getName(), + 0, + _manager.getActiveMetrics(metricLock).getMetrics(), + true); generated->reset(0); - _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTimeS); + _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTime.getTime()); snapshot = generated.get(); } else { snapshot = &_manager.getTotalMetricSnapshot(metricLock); @@ -122,7 +136,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out, _manager.getActiveMetrics(metricLock).getMetrics(), true); generated->reset(0); _manager.getMetricSnapshot(metricLock, interval, temporarySnap) - .addToSnapshot(*generated, currentTimeS); + .addToSnapshot(*generated, currentTime.getTime()); snapshot = generated.get(); } else { snapshot = &_manager.getMetricSnapshot(metricLock, interval, temporarySnap); diff --git a/storage/src/vespa/storage/common/statusmetricconsumer.h b/storage/src/vespa/storage/common/statusmetricconsumer.h index b25c2d5db48..337c3ea7ff0 100644 --- a/storage/src/vespa/storage/common/statusmetricconsumer.h +++ b/storage/src/vespa/storage/common/statusmetricconsumer.h @@ -11,18 +11,27 @@ #include "storagecomponent.h" #include <vespa/storageframework/generic/status/statusreporter.h> -#include <vespa/vespalib/util/jsonstream.h> +#include <vespa/storageframework/generic/metric/metricupdatehook.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/metrics/metricmanager.h> +#include <map> namespace vespalib { class StringTokenizer; } namespace metrics { class MetricManager; } namespace storage { +namespace framework { class MemoryToken; } + class StatusMetricConsumer : public framework::StatusReporter, + private framework::MetricUpdateHook, private vespalib::JsonStreamTypes { public: - StatusMetricConsumer(StorageComponentRegister&, metrics::MetricManager&, const std::string& name = "status"); + StatusMetricConsumer( + StorageComponentRegister&, + metrics::MetricManager&, + const std::string& name = "status"); ~StatusMetricConsumer() override; // Metric reporting requires the "vespa.content.metrics_api" capability @@ -31,11 +40,14 @@ public: } vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; bool reportStatus(std::ostream& out, const framework::HttpUrlPath&) const override; + void updateMetrics(const MetricLockGuard & guard) override; private: metrics::MetricManager& _manager; StorageComponent _component; std::string _name; mutable std::mutex _lock; + framework::SecondTime _startTime; + framework::SecondTime _processedTime; }; } // storage diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 2f150cf7250..cb1c935082e 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -2,6 +2,7 @@ #include "blockingoperationstarter.h" #include "distributor_bucket_space.h" +#include "distributor_status.h" #include "distributor_stripe.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" @@ -852,7 +853,7 @@ DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& ne // Note: this assumes that std::chrono::system_clock and the framework // system clock have the same epoch, which should be a reasonable // assumption. - TimePoint now = _component.getClock().getSystemTime(); + const auto now = TimePoint(std::chrono::milliseconds(_component.getClock().getTimeInMillis().getTime())); _externalOperationHandler.rejectFeedBeforeTimeReached(_ownershipSafeTimeCalc->safeTimePoint(now)); } _bucketDBUpdater.handle_activated_cluster_state_bundle(); // Triggers resending of queued requests diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp index 7b7c9f431f7..81512393c5b 100644 --- a/storage/src/vespa/storage/distributor/operationowner.cpp +++ b/storage/src/vespa/storage/distributor/operationowner.cpp @@ -31,7 +31,7 @@ OperationOwner::handleReply(const std::shared_ptr<api::StorageReply>& reply) { std::shared_ptr<Operation> cb = _sentMessageMap.pop(reply->getMsgId()); - if (cb) { + if (cb.get() != 0) { Sender sender(*this, _sender, cb); cb->receive(sender, reply); return true; @@ -41,11 +41,13 @@ OperationOwner::handleReply(const std::shared_ptr<api::StorageReply>& reply) } bool -OperationOwner::start(const std::shared_ptr<Operation>& operation, Priority) +OperationOwner::start(const std::shared_ptr<Operation>& operation, + Priority priority) { + (void) priority; LOG(spam, "Starting operation %s", operation->toString().c_str()); Sender sender(*this, _sender, operation); - operation->start(sender, _clock.getSystemTime()); + operation->start(sender, _clock.getTimeInMillis()); return true; } @@ -61,7 +63,7 @@ OperationOwner::onClose() while (true) { std::shared_ptr<Operation> cb = _sentMessageMap.pop(); - if (cb) { + if (cb.get()) { Sender sender(*this, _sender, std::shared_ptr<Operation>()); cb->onClose(sender); } else { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 55fe2e039e1..2acd6068e1a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -203,7 +203,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorStripeMessageSender& sen (_node_ctx, _op_ctx, _bucketSpace, _updateCmd, std::move(entries), _updateMetric); UpdateOperation & op = *updateOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender); - op.start(intermediate, _node_ctx.clock().getSystemTime()); + op.start(intermediate, _node_ctx.clock().getTimeInMillis()); transitionTo(SendState::UPDATES_SENT); if (intermediate._reply.get()) { @@ -223,7 +223,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorStripeMessageSender& sen GetOperation& op = *get_operation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(get_operation), sender); _replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time - op.start(intermediate, _node_ctx.clock().getSystemTime()); + op.start(intermediate, _node_ctx.clock().getTimeInMillis()); transitionTo(_use_initial_cheap_metadata_fetch_phase ? SendState::METADATA_GETS_SENT @@ -322,7 +322,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric); PutOperation & op = *putOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender); - op.start(intermediate, _node_ctx.clock().getSystemTime()); + op.start(intermediate, _node_ctx.clock().getTimeInMillis()); transitionTo(SendState::PUTS_SENT); LOG(debug, "Update(%s): sending Puts at timestamp %" PRIu64, update_doc_id().c_str(), putTimestamp); @@ -601,7 +601,8 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const _bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries); std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now; - for (const auto & e : entries) { + for (uint32_t j = 0; j < entries.size(); ++j) { + const auto& e = entries[j]; for (uint32_t i = 0; i < e->getNodeCount(); i++) { const auto& copy = e->getNodeRef(i); replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode()); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 667afbf67a0..6aa243d5e99 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -13,8 +13,6 @@ #include <vespa/log/bufferedlogger.h> LOG_SETUP(".distributor.operation.idealstate.merge"); -using vespalib::to_utc; -using vespalib::to_string; namespace storage::distributor { MergeOperation::~MergeOperation() = default; @@ -25,7 +23,7 @@ MergeOperation::getStatus() const return Operation::getStatus() + vespalib::make_string(" . Sent MergeBucketCommand at %s", - to_string(to_utc(_sentMessageTime)).c_str()); + _sentMessageTime.toString().c_str()); } void @@ -35,11 +33,11 @@ MergeOperation::addIdealNodes( std::vector<MergeMetaData>& result) { // Add all ideal nodes first. These are never marked source-only. - for (unsigned short idealNode : idealNodes) { + for (uint32_t i = 0; i < idealNodes.size(); i++) { const MergeMetaData* entry = nullptr; - for (const auto & node : nodes) { - if (idealNode == node._nodeIndex) { - entry = &node; + for (uint32_t j = 0; j < nodes.size(); j++) { + if (idealNodes[i] == nodes[j]._nodeIndex) { + entry = &nodes[j]; break; } } @@ -52,20 +50,21 @@ MergeOperation::addIdealNodes( } void -MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy, - const std::vector<MergeMetaData>& nodes, - std::vector<MergeMetaData>& result) +MergeOperation::addCopiesNotAlreadyAdded( + uint16_t redundancy, + const std::vector<MergeMetaData>& nodes, + std::vector<MergeMetaData>& result) { - for (auto node : nodes) { + for (uint32_t i = 0; i < nodes.size(); i++) { bool found = false; - for (const auto & mergeData : result) { - if (mergeData._nodeIndex == node._nodeIndex) { + for (uint32_t j = 0; j < result.size(); j++) { + if (result[j]._nodeIndex == nodes[i]._nodeIndex) { found = true; } } if (!found) { - result.push_back(node); + result.push_back(nodes[i]); result.back()._sourceOnly = (result.size() > redundancy); } } @@ -79,7 +78,8 @@ MergeOperation::generateSortedNodeList( MergeLimiter& limiter, std::vector<MergeMetaData>& nodes) { - std::vector<uint16_t> idealNodes(distribution.getIdealStorageNodes(state, bucketId, "ui")); + std::vector<uint16_t> idealNodes( + distribution.getIdealStorageNodes(state, bucketId, "ui")); std::vector<MergeMetaData> result; const uint16_t redundancy = distribution.getRedundancy(); @@ -123,25 +123,31 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) std::vector<std::unique_ptr<BucketCopy> > newCopies; std::vector<MergeMetaData> nodes; - for (unsigned short node : getNodes()) { - const BucketCopy* copy = entry->getNode(node); + for (uint32_t i = 0; i < getNodes().size(); ++i) { + const BucketCopy* copy = entry->getNode(getNodes()[i]); if (copy == nullptr) { // New copies? - newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node))); + newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, getNodes()[i]))); copy = newCopies.back().get(); } - nodes.emplace_back(node, *copy); + nodes.emplace_back(getNodes()[i], *copy); } _infoBefore = entry.getBucketInfo(); - generateSortedNodeList(_bucketSpace->getDistribution(), clusterState, getBucketId(), _limiter, nodes); + generateSortedNodeList(_bucketSpace->getDistribution(), + clusterState, + getBucketId(), + _limiter, + nodes); for (const auto& node : nodes) { _mnodes.emplace_back(node._nodeIndex, node._sourceOnly); } if (_mnodes.size() > 1) { - auto msg = std::make_shared<api::MergeBucketCommand>(getBucket(), _mnodes, - _manager->operation_context().generate_unique_timestamp(), - clusterState.getVersion()); + auto msg = std::make_shared<api::MergeBucketCommand>( + getBucket(), + _mnodes, + _manager->operation_context().generate_unique_timestamp(), + clusterState.getVersion()); const bool may_send_unordered = (_manager->operation_context().distributor_config().use_unordered_merge_chaining() && all_involved_nodes_support_unordered_merge_chaining()); if (!may_send_unordered) { @@ -163,7 +169,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) sender.sendToNode(lib::NodeType::STORAGE, _mnodes[0].index, msg); - _sentMessageTime = _manager->node_context().clock().getMonotonicTime(); + _sentMessageTime = _manager->node_context().clock().getTimeInSeconds(); } else { LOGBP(debug, "Unable to merge bucket %s, since only one copy is available. System state %s", @@ -178,20 +184,28 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge( const BucketDatabase::Entry& currentState) const { assert(currentState.valid()); - for (auto mnode : _mnodes) { - const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index)); + for (size_t i = 0; i < _mnodes.size(); ++i) { + const BucketCopy* copyBefore(_infoBefore.getNode(_mnodes[i].index)); if (!copyBefore) { continue; } - const BucketCopy* copyAfter(currentState->getNode(mnode.index)); + const BucketCopy* copyAfter(currentState->getNode(_mnodes[i].index)); if (!copyAfter) { LOG(debug, "Copy of %s on node %u removed during merge. Was %s", - getBucketId().toString().c_str(), mnode.index, copyBefore->toString().c_str()); + getBucketId().toString().c_str(), + _mnodes[i].index, + copyBefore->toString().c_str()); continue; } - if (mnode.sourceOnly && !copyBefore->consistentWith(*copyAfter)){ - LOG(debug, "Source-only copy of %s on node %u changed from %s to %s during the course of the merge. Failing it.", - getBucketId().toString().c_str(), mnode.index, copyBefore->toString().c_str(), copyAfter->toString().c_str()); + if (_mnodes[i].sourceOnly + && !copyBefore->consistentWith(*copyAfter)) + { + LOG(debug, "Source-only copy of %s on node %u changed from " + "%s to %s during the course of the merge. Failing it.", + getBucketId().toString().c_str(), + _mnodes[i].index, + copyBefore->toString().c_str(), + copyAfter->toString().c_str()); return true; } } @@ -206,22 +220,25 @@ MergeOperation::deleteSourceOnlyNodes( { assert(currentState.valid()); std::vector<uint16_t> sourceOnlyNodes; - for (auto & mnode : _mnodes) { - const uint16_t nodeIndex = mnode.index; + for (uint32_t i = 0; i < _mnodes.size(); ++i) { + const uint16_t nodeIndex = _mnodes[i].index; const BucketCopy* copy = currentState->getNode(nodeIndex); if (!copy) { continue; // No point in deleting what's not even there now. } - if (mnode.sourceOnly) { + if (_mnodes[i].sourceOnly) { sourceOnlyNodes.push_back(nodeIndex); } } LOG(debug, "Attempting to delete %zu source only copies for %s", - sourceOnlyNodes.size(), getBucketId().toString().c_str()); + sourceOnlyNodes.size(), + getBucketId().toString().c_str()); if (!sourceOnlyNodes.empty()) { - _removeOperation = std::make_unique<RemoveBucketOperation>(_manager->node_context(), BucketAndNodes(getBucket(), sourceOnlyNodes)); + _removeOperation = std::make_unique<RemoveBucketOperation>( + _manager->node_context(), + BucketAndNodes(getBucket(), sourceOnlyNodes)); // Must not send removes to source only copies if something has caused // pending load to the copy after the merge was sent! if (_removeOperation->isBlocked(_manager->operation_context(), sender.operation_sequencer())) { @@ -251,7 +268,8 @@ MergeOperation::deleteSourceOnlyNodes( } void -MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) +MergeOperation::onReceive(DistributorStripeMessageSender& sender, + const std::shared_ptr<api::StorageReply> & msg) { if (_removeOperation) { if (_removeOperation->onReceiveInternal(msg)) { @@ -269,14 +287,18 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::sha } auto& reply = dynamic_cast<api::MergeBucketReply&>(*msg); - LOG(debug, "Merge operation for bucket %s finished", getBucketId().toString().c_str()); + LOG(debug, + "Merge operation for bucket %s finished", + getBucketId().toString().c_str()); api::ReturnCode result = reply.getResult(); _ok = result.success(); if (_ok) { - BucketDatabase::Entry entry(_bucketSpace->getBucketDatabase().get(getBucketId())); + BucketDatabase::Entry entry( + _bucketSpace->getBucketDatabase().get(getBucketId())); if (!entry.valid()) { - LOG(debug, "Bucket %s no longer exists after merge", getBucketId().toString().c_str()); + LOG(debug, "Bucket %s no longer exists after merge", + getBucketId().toString().c_str()); done(); // Nothing more we can do. return; } @@ -293,8 +315,11 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::sha return; } else if (result.isBusy()) { } else if (result.isCriticalForMaintenance()) { - LOGBP(warning, "Merging failed for %s: %s with error '%s'", - getBucketId().toString().c_str(), msg->toString().c_str(), msg->getResult().toString().c_str()); + LOGBP(warning, + "Merging failed for %s: %s with error '%s'", + getBucketId().toString().c_str(), + msg->toString().c_str(), + msg->getResult().toString().c_str()); } else { LOG(debug, "Merge failed for %s with non-critical failure: %s", getBucketId().toString().c_str(), result.toString().c_str()); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 44449633559..5416df3a43d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -20,7 +20,7 @@ class MergeOperation : public IdealStateOperation protected: bool sourceOnlyCopyChangedDuringMerge(const BucketDatabase::Entry&) const; - vespalib::steady_time _sentMessageTime; + framework::SecondTime _sentMessageTime; std::vector<api::MergeBucketCommand::Node> _mnodes; std::unique_ptr<RemoveBucketOperation> _removeOperation; BucketInfo _infoBefore; @@ -30,7 +30,7 @@ public: MergeOperation(const BucketAndNodes& nodes, uint16_t maxNodes = 16) : IdealStateOperation(nodes), - _sentMessageTime(), + _sentMessageTime(0), _limiter(maxNodes) {} diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp index 4d82de170ae..a48fb53a7ce 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/operation.cpp @@ -12,7 +12,7 @@ LOG_SETUP(".distributor.callback"); namespace storage::distributor { Operation::Operation() - : _startTime() + : _startTime(0) { } @@ -21,23 +21,19 @@ Operation::~Operation() = default; std::string Operation::getStatus() const { - return vespalib::make_string("%s (started %s)", getName(), vespalib::to_string(_startTime).c_str()); + return vespalib::make_string("%s (started %s)", + getName(), _startTime.toString().c_str()); } void -Operation::start(DistributorStripeMessageSender& sender, vespalib::system_time startTime) +Operation::start(DistributorStripeMessageSender& sender, + framework::MilliSecTime startTime) { _startTime = startTime; onStart(sender); } void -Operation::start(DistributorStripeMessageSender& sender) -{ - start(sender, vespalib::system_time()); -} - -void Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCommand& target) { target.getTrace().setLevel(source.getTrace().getLevel()); diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index 8bb81b8d365..e24aa976221 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -45,25 +45,24 @@ public: onReceive(sender, msg); } - [[nodiscard]] virtual const char* getName() const noexcept = 0; + virtual const char* getName() const noexcept = 0; - [[nodiscard]] virtual std::string getStatus() const; + virtual std::string getStatus() const; - [[nodiscard]] virtual std::string toString() const { + virtual std::string toString() const { return std::string(getName()); } /** Starts the callback, sending any messages etc. Sets _startTime to current time */ - virtual void start(DistributorStripeMessageSender& sender, vespalib::system_time startTime); - void start(DistributorStripeMessageSender& sender); + virtual void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime); /** * Returns true if we are blocked to start this operation given * the pending messages. */ - [[nodiscard]] virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const { + virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const { return false; } @@ -78,6 +77,11 @@ public: virtual void on_throttled(); /** + Returns the timestamp on which the first message was sent from this callback. + */ + framework::MilliSecTime getStartTime() const { return _startTime; } + + /** Transfers message settings such as priority, timeout, etc. from one message to another. */ static void copyMessageSettings(const api::StorageCommand& source, @@ -93,7 +97,7 @@ private: const std::shared_ptr<api::StorageReply> & msg) = 0; protected: - vespalib::system_time _startTime; + framework::MilliSecTime _startTime; }; } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index c86254cb69a..c03b211d1aa 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -10,6 +10,7 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/xmlstream.hpp> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <climits> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".pendingclusterstate"); @@ -249,7 +250,9 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request api::ReturnCode result(reply->getResult()); if (!result.success()) { - _delayedRequests.emplace_back(_clock.getMonotonicTime() + 100ms, bucketSpaceAndNode); + framework::MilliSecTime resendTime(_clock); + resendTime += framework::MilliSecTime(100); + _delayedRequests.emplace_back(resendTime, bucketSpaceAndNode); _sentMessages.erase(iter); update_reply_failure_statistics(result, bucketSpaceAndNode); return true; @@ -270,9 +273,9 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request void PendingClusterState::resendDelayedMessages() { if (_delayedRequests.empty()) return; // Don't fetch time if not needed - vespalib::steady_time currentTime = _clock.getMonotonicTime(); + framework::MilliSecTime currentTime(_clock); while (!_delayedRequests.empty() - && (currentTime >= _delayedRequests.front().first)) + && currentTime >= _delayedRequests.front().first) { requestNode(_delayedRequests.front().second); _delayedRequests.pop_front(); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index 8af08e1ba4d..24b31e45cbb 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -193,7 +193,8 @@ private: struct BucketSpaceAndNode { document::BucketSpace bucketSpace; uint16_t node; - BucketSpaceAndNode(document::BucketSpace bucketSpace_, uint16_t node_) + BucketSpaceAndNode(document::BucketSpace bucketSpace_, + uint16_t node_) : bucketSpace(bucketSpace_), node(node_) { @@ -217,7 +218,7 @@ private: void update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply); using SentMessages = std::map<uint64_t, BucketSpaceAndNode>; - using DelayedRequests = std::deque<std::pair<vespalib::steady_time , BucketSpaceAndNode>>; + using DelayedRequests = std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode>>; using PendingTransitions = std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash>; using NodeFeatures = vespalib::hash_map<uint16_t, NodeSupportedFeatures>; diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 8618d570685..533493a79a2 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -3,6 +3,7 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> #include <map> +#include <algorithm> #include <vespa/log/log.h> LOG_SETUP(".pendingmessages"); @@ -14,7 +15,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr, u vespalib::make_string("Pending messages to storage nodes (stripe %u)", stripe_index)), _component(cr, "pendingmessagetracker"), _nodeInfo(_component.getClock()), - _nodeBusyDuration(60s), + _nodeBusyDuration(60), _deferred_read_tasks(), _lock() { @@ -37,7 +38,7 @@ vespalib::string PendingMessageTracker::MessageEntry::toHtml() const { vespalib::asciistream ss; ss << "<li><i>Node " << nodeIdx << "</i>: " - << "<b>" << vespalib::to_string(timeStamp) << "</b> " + << "<b>" << framework::MilliSecTime(timeStamp.count()).toString() << "</b> " << api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n"; return ss.str(); } @@ -45,7 +46,7 @@ PendingMessageTracker::MessageEntry::toHtml() const { PendingMessageTracker::TimePoint PendingMessageTracker::currentTime() const { - return _component.getClock().getSystemTime(); + return TimePoint(_component.getClock().getTimeInMillis().getTime()); } namespace { diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index fb672d5ee31..93238b5a83f 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -68,7 +68,13 @@ public: virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0; }; - using TimePoint = vespalib::system_time; + /** + * Time point represented as the millisecond interval from the framework + * clock's epoch to a given point in time. Note that it'd be more + * semantically correct to use std::chrono::time_point, but it is bound + * to specific chrono clock types, their epochs and duration resolution. + */ + using TimePoint = std::chrono::milliseconds; PendingMessageTracker(framework::ComponentRegister&, uint32_t stripe_index); ~PendingMessageTracker() override; @@ -113,8 +119,8 @@ public: */ std::vector<uint64_t> clearMessagesForNode(uint16_t node); - void setNodeBusyDuration(vespalib::duration duration) noexcept { - _nodeBusyDuration = duration; + void setNodeBusyDuration(std::chrono::seconds secs) noexcept { + _nodeBusyDuration = secs; } void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task); @@ -130,7 +136,7 @@ private: MessageEntry(TimePoint timeStamp, uint32_t msgType, uint32_t priority, uint64_t msgId, document::Bucket bucket, uint16_t nodeIdx) noexcept; - [[nodiscard]] vespalib::string toHtml() const; + vespalib::string toHtml() const; }; struct MessageIdKey : boost::multi_index::member<MessageEntry, uint64_t, &MessageEntry::msgId> {}; @@ -181,7 +187,7 @@ private: Messages _messages; framework::Component _component; NodeInfo _nodeInfo; - vespalib::duration _nodeBusyDuration; + std::chrono::seconds _nodeBusyDuration; DeferredBucketTaskMap _deferred_read_tasks; // Since distributor is currently single-threaded, this will only diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp index f69f9e3d427..5a584f7c332 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp @@ -440,7 +440,9 @@ StripeBucketDBUpdater::handleSingleBucketInfoFailure( req.targetNode, repl->getResult().toString().c_str()); if (req.bucket.getBucketId() != document::BucketId(0)) { - _delayedRequests.emplace_back(_node_ctx.clock().getMonotonicTime() + 100ms, req); + framework::MilliSecTime sendTime(_node_ctx.clock()); + sendTime += framework::MilliSecTime(100); + _delayedRequests.emplace_back(sendTime, req); } } @@ -450,7 +452,7 @@ StripeBucketDBUpdater::resendDelayedMessages() if (_delayedRequests.empty()) { return; // Don't fetch time if not needed } - vespalib::steady_time currentTime(_node_ctx.clock().getMonotonicTime()); + framework::MilliSecTime currentTime(_node_ctx.clock()); while (!_delayedRequests.empty() && currentTime >= _delayedRequests.front().first) { @@ -642,7 +644,7 @@ void StripeBucketDBUpdater::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const { for (const auto& entry : _delayedRequests) { - entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", vespalib::count_ms(vespalib::to_utc(entry.first).time_since_epoch()))); + entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime())); } } diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index 6339283f963..2f6e665be14 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -231,7 +231,7 @@ private: using DbGuards = std::unordered_map<document::BucketSpace, std::shared_ptr<BucketDatabase::ReadGuard>, document::BucketSpace::hash>; - using DelayedRequestsQueue = std::deque<std::pair<vespalib::steady_time, BucketRequest>>; + using DelayedRequestsQueue = std::deque<std::pair<framework::MilliSecTime, BucketRequest>>; const DistributorNodeContext& _node_ctx; DistributorStripeOperationContext& _op_ctx; diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h index 8b6ade7e7d1..a0613c60fa4 100644 --- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h +++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h @@ -13,9 +13,9 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow class ThrottlingOperation : public Operation { public: - ThrottlingOperation(Operation::SP operation, + ThrottlingOperation(const Operation::SP& operation, ThrottlingOperationStarter& operationStarter) - : _operation(std::move(operation)), + : _operation(operation), _operationStarter(operationStarter) {} @@ -30,21 +30,24 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow void onClose(DistributorStripeMessageSender& sender) override { _operation->onClose(sender); } - [[nodiscard]] const char* getName() const noexcept override { + const char* getName() const noexcept override { return _operation->getName(); } - [[nodiscard]] std::string getStatus() const override { + std::string getStatus() const override { return _operation->getStatus(); } - [[nodiscard]] std::string toString() const override { + std::string toString() const override { return _operation->toString(); } - void start(DistributorStripeMessageSender& sender, vespalib::system_time startTime) override { + void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime) override { _operation->start(sender, startTime); } void receive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override { _operation->receive(sender, msg); } + framework::MilliSecTime getStartTime() const { + return _operation->getStartTime(); + } void onStart(DistributorStripeMessageSender&) override { // Should never be called directly on the throttled operation // instance, but rather on its wrapped implementation. @@ -58,7 +61,7 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow OperationStarter& _starterImpl; public: - explicit ThrottlingOperationStarter(OperationStarter& starterImpl) + ThrottlingOperationStarter(OperationStarter& starterImpl) : _starterImpl(starterImpl), _minPending(0), _maxPending(UINT32_MAX), @@ -68,9 +71,9 @@ public: bool start(const std::shared_ptr<Operation>& operation, Priority priority) override; - [[nodiscard]] bool may_allow_operation_with_priority(Priority priority) const noexcept override; + bool may_allow_operation_with_priority(Priority priority) const noexcept override; - [[nodiscard]] bool canStart(uint32_t currentOperationCount, Priority priority) const; + bool canStart(uint32_t currentOperationCount, Priority priority) const; void setMaxPendingRange(uint32_t minPending, uint32_t maxPending) { _minPending = minPending; diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index b0702ac7bf0..54dedbebbfe 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -531,7 +531,8 @@ TopLevelDistributor::on_cluster_state_bundle_activated(const lib::ClusterStateBu } if (has_bucket_ownership_transfer && _maintenance_safe_time_delay.count() > 0) { OwnershipTransferSafeTimePointCalculator safe_time_calc(_maintenance_safe_time_delay); - const auto now = _component.getClock().getSystemTime(); + using TimePoint = OwnershipTransferSafeTimePointCalculator::TimePoint; + const auto now = TimePoint(std::chrono::milliseconds(_component.getClock().getTimeInMillis().getTime())); _maintenance_safe_time_point = safe_time_calc.safeTimePoint(now); // All stripes are in a waiting pattern and will observe this on their next tick. // Memory visibility enforced by all stripes being held under a mutex by our caller. diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index ec22d7c064e..0680c10ab29 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -9,6 +9,7 @@ #include <vespa/slobrok/sbmirror.h> #include <vespa/storage/common/bucket_resolver.h> #include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageserver/configurable_bucket_resolver.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h> @@ -48,14 +49,13 @@ CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageRepl } namespace { + vespalib::string getNodeId(StorageComponent& sc) { + vespalib::asciistream ost; + ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex(); + return ost.str(); + } -vespalib::string getNodeId(StorageComponent& sc) { - vespalib::asciistream ost; - ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex(); - return ost.str(); -} - -vespalib::duration TEN_MINUTES = 600s; + framework::SecondTime TEN_MINUTES(600); } @@ -151,7 +151,8 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) std::shared_ptr<api::StorageCommand> originalCommand; { std::lock_guard lock(_messageBusSentLock); - auto iter(_messageBusSent.find(reply->getContext().value.UINT64)); + using MessageMap = std::map<api::StorageMessage::Id, api::StorageCommand::SP>; + MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64)); if (iter != _messageBusSent.end()) { originalCommand.swap(iter->second); _messageBusSent.erase(iter); @@ -192,13 +193,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space( namespace { struct PlaceHolderBucketResolver : public BucketResolver { - [[nodiscard]] document::Bucket bucketFromId(const document::DocumentId &) const override { - return {FixedBucketSpaces::default_space(), document::BucketId(0)}; + document::Bucket bucketFromId(const document::DocumentId &) const override { + return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0)); } - [[nodiscard]] document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { + document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { return FixedBucketSpaces::default_space(); } - [[nodiscard]] vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { + vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { assert(bucketSpace == FixedBucketSpaces::default_space()); return FixedBucketSpaces::to_string(bucketSpace); } @@ -437,7 +438,7 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - process(msg); + process(std::move(msg)); } void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) { @@ -450,7 +451,7 @@ CommunicationManager::onUp(const std::shared_ptr<api::StorageMessage> & msg) { MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString()); if (msg->getType().isReply()) { - const auto & m = static_cast<const api::StorageReply&>(*msg); + const api::StorageReply & m = static_cast<const api::StorageReply&>(*msg); if (m.getResult().failed()) { LOG(debug, "Request %s failed: %s", msg->getType().toString().c_str(), m.getResult().toString().c_str()); } @@ -603,7 +604,7 @@ CommunicationManager::sendDirectRPCReply( request.addReturnString(m.data(), m.size()); if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) { - auto& gns(static_cast<api::GetNodeStateReply&>(*reply)); + api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, false); request.addReturnString(ns.str().c_str()); @@ -692,9 +693,9 @@ CommunicationManager::run(framework::ThreadHandle& thread) process(msg); } std::lock_guard<std::mutex> guard(_earlierGenerationsLock); - for (auto it(_earlierGenerations.begin()); + for (EarlierProtocols::iterator it(_earlierGenerations.begin()); !_earlierGenerations.empty() && - ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime()); + ((it->first + TEN_MINUTES) < _component.getClock().getTimeInSeconds()); it = _earlierGenerations.begin()) { _earlierGenerations.erase(it); @@ -717,10 +718,10 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string& void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo>& repo) { if (_mbus) { - vespalib::steady_time now(_component.getClock().getMonotonicTime()); + framework::SecondTime now(_component.getClock().getTimeInSeconds()); auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo); std::lock_guard<std::mutex> guard(_earlierGenerationsLock); - _earlierGenerations.emplace_back(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)); + _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol))); } if (_message_codec_provider) { _message_codec_provider->update_atomically(repo); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index e83a6517c45..6f953411cac 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -72,6 +72,9 @@ class CommunicationManager final public MessageDispatcher { private: + CommunicationManager(const CommunicationManager&); + CommunicationManager& operator=(const CommunicationManager&); + StorageComponent _component; CommunicationManagerMetrics _metrics; @@ -82,7 +85,7 @@ private: Queue _eventQueue; // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? std::unique_ptr<config::ConfigFetcher> _configFetcher; - using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>; + using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>; using EarlierProtocols = std::vector<EarlierProtocol>; std::mutex _earlierGenerationsLock; EarlierProtocols _earlierGenerations; @@ -123,8 +126,6 @@ private: friend struct CommunicationManagerTest; public: - CommunicationManager(const CommunicationManager&) = delete; - CommunicationManager& operator=(const CommunicationManager&) = delete; CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri); ~CommunicationManager() override; diff --git a/storage/src/vespa/storage/storageserver/opslogger.cpp b/storage/src/vespa/storage/storageserver/opslogger.cpp index e5785968eb1..03322cb55fd 100644 --- a/storage/src/vespa/storage/storageserver/opslogger.cpp +++ b/storage/src/vespa/storage/storageserver/opslogger.cpp @@ -77,7 +77,7 @@ OpsLogger::onPutReply(const std::shared_ptr<api::PutReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << vespalib::to_string(_component.getClock().getSystemTime()) + ost << _component.getClock().getTimeInSeconds().getTime() << "\tPUT\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { @@ -94,7 +94,7 @@ OpsLogger::onUpdateReply(const std::shared_ptr<api::UpdateReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << vespalib::to_string(_component.getClock().getSystemTime()) + ost << _component.getClock().getTimeInSeconds().getTime() << "\tUPDATE\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { @@ -111,7 +111,7 @@ OpsLogger::onRemoveReply(const std::shared_ptr<api::RemoveReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << vespalib::to_string(_component.getClock().getSystemTime()) + ost << _component.getClock().getTimeInSeconds().getTime() << "\tREMOVE\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { @@ -128,7 +128,7 @@ OpsLogger::onGetReply(const std::shared_ptr<api::GetReply>& msg) { if (_targetFile == nullptr) return false; std::ostringstream ost; - ost << vespalib::to_string(_component.getClock().getSystemTime()) + ost << _component.getClock().getTimeInSeconds().getTime() << "\tGET\t" << msg->getDocumentId() << "\t" << msg->getResult() << "\n"; { diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 647cba52bfc..81961370ed3 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -9,14 +9,16 @@ #include <vespa/metrics/metricset.h> #include <vespa/metrics/metrictimer.h> #include <vespa/metrics/valuemetric.h> +#include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/string_escape.h> #include <vespa/vespalib/util/stringfmt.h> + #include <fstream> -#include <ranges> +#include <unistd.h> #include <vespa/log/log.h> LOG_SETUP(".state.manager"); @@ -69,7 +71,7 @@ StateManager::StateManager(StorageComponentRegister& compReg, _requested_almost_immediate_node_state_replies(false) { _nodeState->setMinUsedBits(58); - _nodeState->setStartTimestamp(_component.getClock().getSystemTime()); + _nodeState->setStartTimestamp(_component.getClock().getTimeInSeconds().getTime()); _component.registerStatusPage(*this); _component.registerMetric(*_metrics); } @@ -133,9 +135,9 @@ StateManager::reportHtmlStatus(std::ostream& out, << "<h1>System state history</h1>\n" << "<table border=\"1\"><tr>" << "<th>Received at time</th><th>State</th></tr>\n"; - for (const auto & it : std::ranges::reverse_view(_systemStateHistory)) { - out << "<tr><td>" << vespalib::to_string(vespalib::to_utc(it.first)) << "</td><td>" - << xml_content_escaped(it.second->getBaselineClusterState()->toString()) << "</td></tr>\n"; + for (auto it = _systemStateHistory.rbegin(); it != _systemStateHistory.rend(); ++it) { + out << "<tr><td>" << it->first << "</td><td>" + << xml_content_escaped(it->second->getBaselineClusterState()->toString()) << "</td></tr>\n"; } out << "</table>\n"; } @@ -144,7 +146,7 @@ StateManager::reportHtmlStatus(std::ostream& out, lib::Node StateManager::thisNode() const { - return { _component.getNodeType(), _component.getIndex() }; + return lib::Node(_component.getNodeType(), _component.getIndex()); } lib::NodeState::CSP @@ -296,7 +298,7 @@ StateManager::enableNextClusterState() _reported_host_info_cluster_state_version = _systemState->getVersion(); } // else: reported version updated upon explicit activation edge _nextSystemState.reset(); - _systemStateHistory.emplace_back(_component.getClock().getMonotonicTime(), _systemState); + _systemStateHistory.emplace_back(_component.getClock().getTimeInMillis(), _systemState); } namespace { @@ -390,7 +392,8 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) { bool sentReply = false; if (cmd->getSourceIndex() != 0xffff) { - sentReply = sendGetNodeStateReplies(vespalib::steady_time::max(), cmd->getSourceIndex()); + sentReply = sendGetNodeStateReplies(framework::MilliSecTime(0), + cmd->getSourceIndex()); } std::shared_ptr<api::GetNodeStateReply> reply; { @@ -401,13 +404,16 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) && (*cmd->getExpectedState() == *_nodeState || sentReply) && is_up_to_date) { - vespalib::duration timeout = cmd->getTimeout(); + int64_t msTimeout = vespalib::count_ms(cmd->getTimeout()); LOG(debug, "Received get node state request with timeout of " - "%f seconds. Scheduling to be answered in " - "%f seconds unless a node state change " + "%" PRId64 " milliseconds. Scheduling to be answered in " + "%" PRId64 " milliseconds unless a node state change " "happens before that time.", - vespalib::to_s(timeout), vespalib::to_s(timeout)*0.8); - TimeStateCmdPair pair(_component.getClock().getMonotonicTime() + timeout, cmd); + msTimeout, msTimeout * 800 / 1000); + TimeStateCmdPair pair( + _component.getClock().getTimeInMillis() + + framework::MilliSecTime(msTimeout * 800 / 1000), + cmd); _queuedStateRequests.emplace_back(std::move(pair)); } else { LOG(debug, "Answered get node state request right away since it " @@ -491,14 +497,13 @@ StateManager::tick() { bool almost_immediate_replies = _requested_almost_immediate_node_state_replies.load(std::memory_order_relaxed); if (almost_immediate_replies) { _requested_almost_immediate_node_state_replies.store(false, std::memory_order_relaxed); - sendGetNodeStateReplies(); - } else { - sendGetNodeStateReplies(_component.getClock().getMonotonicTime()); } + framework::MilliSecTime time(almost_immediate_replies ? framework::MilliSecTime(0) : _component.getClock().getTimeInMillis()); + sendGetNodeStateReplies(time); } bool -StateManager::sendGetNodeStateReplies(vespalib::steady_time olderThanTime, uint16_t node) +StateManager::sendGetNodeStateReplies(framework::MilliSecTime olderThanTime, uint16_t node) { std::vector<std::shared_ptr<api::GetNodeStateReply>> replies; { @@ -506,8 +511,9 @@ StateManager::sendGetNodeStateReplies(vespalib::steady_time olderThanTime, uint1 for (auto it = _queuedStateRequests.begin(); it != _queuedStateRequests.end();) { if (node != 0xffff && node != it->second->getSourceIndex()) { ++it; - } else if (it->first < olderThanTime) { - LOG(debug, "Sending reply to msg with id %" PRIu64, it->second->getMsgId()); + } else if (!olderThanTime.isSet() || it->first < olderThanTime) { + LOG(debug, "Sending reply to msg with id %" PRIu64, + it->second->getMsgId()); replies.emplace_back(std::make_shared<api::GetNodeStateReply>(*it->second, *_nodeState)); auto eraseIt = it++; diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index 3605a0b1605..74b59875ff8 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -42,8 +42,8 @@ class StateManager : public NodeStateUpdater, private vespalib::JsonStreamTypes { using ClusterStateBundle = lib::ClusterStateBundle; - using TimeStateCmdPair = std::pair<vespalib::steady_time, api::GetNodeStateCommand::SP>; - using TimeSysStatePair = std::pair<vespalib::steady_time, std::shared_ptr<const ClusterStateBundle>>; + using TimeStateCmdPair = std::pair<framework::MilliSecTime, api::GetNodeStateCommand::SP>; + using TimeSysStatePair = std::pair<framework::MilliSecTime, std::shared_ptr<const ClusterStateBundle>>; struct StateManagerMetrics; @@ -109,7 +109,7 @@ private: void notifyStateListeners(); bool sendGetNodeStateReplies( - vespalib::steady_time olderThanTime = vespalib::steady_time::max(), + framework::MilliSecTime olderThanTime = framework::MilliSecTime(0), uint16_t index = 0xffff); void mark_controller_as_having_observed_explicit_node_state(const std::unique_lock<std::mutex> &, uint16_t controller_index); diff --git a/storage/src/vespa/storage/storageserver/statereporter.cpp b/storage/src/vespa/storage/storageserver/statereporter.cpp index 373cd186708..b2337ae1223 100644 --- a/storage/src/vespa/storage/storageserver/statereporter.cpp +++ b/storage/src/vespa/storage/storageserver/statereporter.cpp @@ -29,7 +29,9 @@ StateReporter::StateReporter( _component.registerStatusPage(*this); } -StateReporter::~StateReporter() = default; +StateReporter::~StateReporter() +{ +} vespalib::string StateReporter::getReportContentType( @@ -82,7 +84,7 @@ StateReporter::getMetrics(const vespalib::string &consumer) snapshot.reset(0); _manager.getMetricSnapshot(guard, interval).addToSnapshot( - snapshot, vespalib::count_s(_component.getClock().getSystemTime().time_since_epoch())); + snapshot, _component.getClock().getTimeInSeconds().getTime()); vespalib::asciistream json; vespalib::JsonStream stream(json); @@ -104,7 +106,7 @@ StateReporter::getHealth() const lib::NodeState cns(*_component.getStateUpdater().getCurrentNodeState()); bool up = cns.getState().oneOf("u"); std::string message = up ? "" : "Node state: " + cns.toString(true); - return { up, message }; + return vespalib::HealthProducer::Health(up, message); } void diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 5ece2a12f71..3987827a264 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -372,7 +372,7 @@ StorageNode::shutdown() _chain->flush(); } - if ( !_pidFile.empty() ) { + if (_pidFile != "") { LOG(debug, "Removing pid file"); removePidFile(_pidFile); } @@ -510,8 +510,10 @@ StorageNode::updateMetrics(const MetricLockGuard &) { } void -StorageNode::waitUntilInitialized(vespalib::duration timeout) { - vespalib::steady_time doom = vespalib::steady_clock::now(); +StorageNode::waitUntilInitialized(uint32_t timeout) { + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(1000 * timeout)); while (true) { { NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); @@ -519,7 +521,7 @@ StorageNode::waitUntilInitialized(vespalib::duration timeout) { if (nodeState.getState() == lib::State::UP) break; } std::this_thread::sleep_for(10ms); - if (vespalib::steady_clock::now() >= doom) { + if (clock.getTimeInMillis() >= endTime) { std::ostringstream ost; ost << "Storage server not initialized after waiting timeout of " << timeout << " seconds."; diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 19b930c184f..0e420f206e2 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -78,7 +78,7 @@ public: virtual const lib::NodeType& getNodeType() const = 0; bool attemptedStopped() const; void notifyDoneInitializing() override; - void waitUntilInitialized(vespalib::duration timeout = 15s); + void waitUntilInitialized(uint32_t timeoutSeconds = 15); void updateMetrics(const MetricLockGuard & guard) override; /** Updates the document type repo. */ diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index 6d36abc896e..91f304ad9a0 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -121,9 +121,12 @@ Visitor::VisitorTarget::metaForMessageId(uint64_t msgId) void Visitor::VisitorTarget::discardQueuedMessages() { - for (const auto & entry : _queuedMessages) { - LOG(spam, "Erasing queued message with id %" PRIu64, entry.second); - releaseMetaForMessageId(entry.second); + for (MessageQueue::iterator + it(_queuedMessages.begin()), e(_queuedMessages.end()); + it != e; ++it) + { + LOG(spam, "Erasing queued message with id %" PRIu64, it->second); + releaseMetaForMessageId(it->second); } _queuedMessages.clear(); } @@ -307,14 +310,17 @@ Visitor::getStateName(VisitorState s) return "COMPLETED"; default: assert(!"Unknown visitor state"); - return nullptr; + return NULL; } } Visitor::VisitorState Visitor::transitionTo(VisitorState newState) { - LOG(debug, "Visitor '%s' state transition %s -> %s", _id.c_str(), getStateName(_state), getStateName(newState)); + LOG(debug, "Visitor '%s' state transition %s -> %s", + _id.c_str(), + getStateName(_state), + getStateName(newState)); VisitorState oldState = _state; _state = newState; return oldState; @@ -333,10 +339,12 @@ Visitor::mayTransitionToCompleted() const void Visitor::forceClose() { - for (auto * state : _bucketStates) { + for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin(); + it != _bucketStates.end(); ++it) + { // Reset iterator id so no destroy iterator will be sent - state->setIteratorId(spi::IteratorId(0)); - delete state; + (*it)->setIteratorId(spi::IteratorId(0)); + delete *it; } _bucketStates.clear(); transitionTo(STATE_COMPLETED); @@ -350,7 +358,7 @@ Visitor::sendReplyOnce() std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply()); _hitCounter->updateVisitorStatistics(_visitorStatistics); - dynamic_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics); + static_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics); if (shouldAddMbusTrace()) { _trace.moveTraceTo(reply->getTrace()); } @@ -365,15 +373,17 @@ void Visitor::finalize() { if (_state != STATE_COMPLETED) { - LOG(error, "Attempting to finalize non-completed visitor %s", _id.c_str()); + LOG(error, "Attempting to finalize non-completed visitor %s", + _id.c_str()); assert(false); } assert(_bucketStates.empty()); if (_result.success()) { - if (_messageSession->pending() > 0) { + if (_messageSession->pending() > 0) + { _result = api::ReturnCode(api::ReturnCode::ABORTED); - try { + try{ abortedVisiting(); } catch (std::exception& e) { LOG(warning, "Visitor %s had a problem in abortVisiting(). As " @@ -394,31 +404,43 @@ Visitor::finalize() void Visitor::discardAllNoPendingBucketStates() { - for (auto it = _bucketStates.begin(); it !=_bucketStates.end();) { + for (BucketStateList::iterator + it(_bucketStates.begin()), e(_bucketStates.end()); + it != e;) + { BucketIterationState& bstate(**it); if (bstate.hasPendingControlCommand() || bstate.hasPendingIterators()) { - LOG(debug, "Visitor '%s' not discarding bucket state %s since it has pending operations", - _id.c_str(), bstate.toString().c_str()); + LOG(debug, + "Visitor '%s' not discarding bucket state %s " + "since it has pending operations", + _id.c_str(), + bstate.toString().c_str()); ++it; continue; } - LOG(debug, "Visitor '%s' discarding bucket state %s", _id.c_str(), bstate.toString().c_str()); + LOG(debug, "Visitor '%s' discarding bucket state %s", + _id.c_str(), bstate.toString().c_str()); delete *it; it = _bucketStates.erase(it); } } void -Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError) +Visitor::fail(const api::ReturnCode& reason, + bool overrideExistingError) { assert(_state != STATE_COMPLETED); if (_result.getResult() < reason.getResult() || overrideExistingError) { - LOG(debug, "Setting result of visitor '%s' to %s", _id.c_str(), reason.toString().c_str()); + LOG(debug, "Setting result of visitor '%s' to %s", + _id.c_str(), reason.toString().c_str()); _result = reason; } if (_visitorTarget.hasQueuedMessages()) { - LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s since visitor has failed", - _id.c_str(), _visitorTarget._queuedMessages.size(), _controlDestination->toString().c_str()); + LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s " + "since visitor has failed", + _id.c_str(), + _visitorTarget._queuedMessages.size(), + _controlDestination->toString().c_str()); _visitorTarget.discardQueuedMessages(); } discardAllNoPendingBucketStates(); @@ -426,7 +448,8 @@ Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError) } bool -Visitor::shouldReportProblemToClient(const api::ReturnCode& code, size_t retryCount) +Visitor::shouldReportProblemToClient(const api::ReturnCode& code, + size_t retryCount) const { // Report _once_ per message if we reach a certain retry threshold. if (retryCount == TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY) { @@ -498,7 +521,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId, _visitorOptions._fromTime = fromTimestamp; _visitorOptions._toTime = toTimestamp; _currentBucket = 0; - _hitCounter = std::make_unique<HitCounter>(); + _hitCounter.reset(new HitCounter()); _messageSession = std::move(messageSession); _documentPriority = documentPriority; @@ -589,7 +612,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met uint64_t messageId = reply->getContext().value.UINT64; uint32_t removed = _visitorTarget._pendingMessages.erase(messageId); - LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), reply->toString().c_str(), messageId); + LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), + reply->toString().c_str(), messageId); assert(removed == 1); (void) removed; @@ -610,16 +634,20 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met metrics.visitorDestinationFailureReplies.inc(); if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) { - LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str()); - api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), - reply->getError(0).getMessage()); + LOG(debug, "Aborting visitor as we failed to talk to controller: %s", + reply->getError(0).toString().c_str()); + api::ReturnCode returnCode( + static_cast<api::ReturnCode::Result>( + reply->getError(0).getCode()), + reply->getError(0).getMessage()); fail(returnCode, true); close(); return; } - api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), - reply->getError(0).getMessage()); + api::ReturnCode returnCode( + static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), + reply->getError(0).getMessage()); const bool should_fail = remap_docapi_message_error_code(returnCode); if (should_fail) { // Abort - something is wrong with target. @@ -629,7 +657,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met } if (failed()) { - LOG(debug, "Failed to send message from visitor '%s', due to %s. Not resending since visitor has failed", + LOG(debug, "Failed to send message from visitor '%s', due to " + "%s. Not resending since visitor has failed", _id.c_str(), returnCode.toString().c_str()); return; } @@ -680,7 +709,8 @@ Visitor::onCreateIteratorReply( if (reply->getResult().failed()) { LOG(debug, "Failed to create iterator for bucket %s: %s", - bucketId.toString().c_str(), reply->getResult().toString().c_str()); + bucketId.toString().c_str(), + reply->getResult().toString().c_str()); fail(reply->getResult()); delete *it; _bucketStates.erase((++it).base()); @@ -688,14 +718,17 @@ Visitor::onCreateIteratorReply( } bucketState.setIteratorId(reply->getIteratorId()); if (failed()) { - LOG(debug, "Create iterator for bucket %s is OK, but visitor has failed: %s", - bucketId.toString().c_str(), _result.toString().c_str()); + LOG(debug, "Create iterator for bucket %s is OK, " + "but visitor has failed: %s", + bucketId.toString().c_str(), + _result.toString().c_str()); delete *it; _bucketStates.erase((++it).base()); return; } - LOG(debug, "Visitor '%s' starting to visit bucket %s.", _id.c_str(), bucketId.toString().c_str()); + LOG(debug, "Visitor '%s' starting to visit bucket %s.", + _id.c_str(), bucketId.toString().c_str()); auto cmd = std::make_shared<GetIterCommand>(bucket, bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); @@ -704,10 +737,13 @@ Visitor::onCreateIteratorReply( } void -Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThreadMetrics& metrics) +Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, + VisitorThreadMetrics& metrics) { LOG(debug, "Visitor '%s' got get iter reply for bucket %s: %s", - _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str()); + _id.c_str(), + reply->getBucketId().toString().c_str(), + reply->getResult().toString().c_str()); auto it = _bucketStates.rbegin(); // New requests will be pushed on end of list.. So searching @@ -727,8 +763,10 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThrea !reply->getResult().isShutdownRelated() && !reply->getResult().isBucketDisappearance()) { - LOG(warning, "Failed to talk to persistence layer for bucket %s. Aborting visitor '%s': %s", - reply->getBucketId().toString().c_str(), _id.c_str(), reply->getResult().toString().c_str()); + LOG(warning, "Failed to talk to persistence layer for bucket " + "%s. Aborting visitor '%s': %s", + reply->getBucketId().toString().c_str(), + _id.c_str(), reply->getResult().toString().c_str()); } fail(reply->getResult()); BucketIterationState& bucketState(**it); @@ -745,14 +783,17 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThrea bucketState.setCompleted(reply->isCompleted()); --bucketState._pendingIterators; if (!reply->getEntries().empty()) { - LOG(debug, "Processing documents in handle given from bucket %s.", reply->getBucketId().toString().c_str()); + LOG(debug, "Processing documents in handle given from bucket %s.", + reply->getBucketId().toString().c_str()); // While handling documents we should not keep locks, such // that visitor may process several things at once. if (isRunning()) { MBUS_TRACE(reply->getTrace(), 5, vespalib::make_string("Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size())); - LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size()); + LOG(debug, "Visitor %s handling block of %zu documents.", + _id.c_str(), + reply->getEntries().size()); try { framework::MilliSecTimer processingTimer(_component.getClock()); handleDocuments(reply->getBucketId(), reply->getEntries(), *_hitCounter); @@ -872,11 +913,15 @@ Visitor::continueVisitor() } } - LOG(debug, "No pending messages, tagging visitor '%s' complete", _id.c_str()); + LOG(debug, "No pending messages, tagging visitor '%s' complete", + _id.c_str()); transitionTo(STATE_COMPLETED); } else { - LOG(debug, "Visitor %s waiting for all commands to be replied to (pending=%zu, queued=%zu)", - _id.c_str(), _visitorTarget._pendingMessages.size(), _visitorTarget._queuedMessages.size()); + LOG(debug, "Visitor %s waiting for all commands to be replied to " + "(pending=%zu, queued=%zu)", + _id.c_str(), + _visitorTarget._pendingMessages.size(), + _visitorTarget._queuedMessages.size()); } return false; } else { @@ -936,14 +981,14 @@ Visitor::getStatus(std::ostream& out, bool verbose) const << (_visitorOptions._visitRemoves ? "true" : "false") << "</td></tr>\n"; out << "<tr><td>Control destination</td><td>"; - if (_controlDestination) { + if (_controlDestination.get()) { out << xml_content_escaped(_controlDestination->toString()); } else { out << "nil"; } out << "</td></tr>\n"; out << "<tr><td>Data destination</td><td>"; - if (_dataDestination) { + if (_dataDestination.get()) { out << xml_content_escaped(_dataDestination->toString()); } else { out << "nil"; @@ -1033,13 +1078,17 @@ Visitor::getStatus(std::ostream& out, bool verbose) const bool Visitor::getIterators() { - LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, _currentBucket = %d", - _id.c_str(), _buckets.size(), _bucketStates.size(), _currentBucket); + LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, " + "_currentBucket = %d", + _id.c_str(), _buckets.size(), + _bucketStates.size(), _currentBucket); // Don't send any further GetIters if we're closing if (!isRunning()) { if (hasPendingIterators()) { - LOG(debug, "Visitor has failed but waiting for %zu buckets to finish processing", _bucketStates.size()); + LOG(debug, "Visitor has failed but waiting for %zu " + "buckets to finish processing", + _bucketStates.size()); return true; } else { return false; @@ -1048,10 +1097,13 @@ Visitor::getIterators() // Go through buckets found. Take the first that doesn't have requested // state and request a new piece. - for (auto it = _bucketStates.begin();it != _bucketStates.end();) { + for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin(); + it != _bucketStates.end();) + { assert(*it); BucketIterationState& bucketState(**it); - if ((bucketState._pendingIterators >= _visitorOptions._maxParallelOneBucket) + if ((bucketState._pendingIterators + >= _visitorOptions._maxParallelOneBucket) || bucketState.hasPendingControlCommand()) { ++it; @@ -1066,17 +1118,20 @@ Visitor::getIterators() } try{ completedBucket(bucketState.getBucketId(), *_hitCounter); - _visitorStatistics.setBucketsVisited(_visitorStatistics.getBucketsVisited() + 1); + _visitorStatistics.setBucketsVisited( + _visitorStatistics.getBucketsVisited() + 1); } catch (std::exception& e) { std::ostringstream ost; - ost << "Visitor fail to run completedBucket() notification: " << e.what(); + ost << "Visitor fail to run completedBucket() notification: " + << e.what(); reportProblem(ost.str()); } delete *it; it = _bucketStates.erase(it); continue; } - auto cmd = std::make_shared<GetIterCommand>(bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); + auto cmd = std::make_shared<GetIterCommand>( + bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize); cmd->getTrace().setLevel(_traceLevel); cmd->setPriority(_priority); _messageHandler->send(cmd, *this); @@ -1088,7 +1143,7 @@ Visitor::getIterators() } // If there aren't anymore buckets to iterate, we're done - if (_bucketStates.empty() && _currentBucket >= _buckets.size()) { + if (_bucketStates.size() == 0 && _currentBucket >= _buckets.size()) { LOG(debug, "No more buckets to visit for visitor '%s'.", _id.c_str()); return false; } @@ -1102,13 +1157,17 @@ Visitor::getIterators() _currentBucket < _buckets.size()) { document::Bucket bucket(_bucketSpace, _buckets[_currentBucket]); - auto newBucketState = std::make_unique<BucketIterationState>(*this, *_messageHandler, bucket); + std::unique_ptr<BucketIterationState> newBucketState( + new BucketIterationState(*this, *_messageHandler, bucket)); LOG(debug, "Visitor '%s': Sending create iterator for bucket %s.", _id.c_str(), bucket.getBucketId().toString().c_str()); - spi::Selection selection = spi::Selection(spi::DocumentSelection(_documentSelectionString)); - selection.setFromTimestamp(spi::Timestamp(_visitorOptions._fromTime.getTime())); - selection.setToTimestamp(spi::Timestamp(_visitorOptions._toTime.getTime())); + spi::Selection selection + = spi::Selection(spi::DocumentSelection(_documentSelectionString)); + selection.setFromTimestamp( + spi::Timestamp(_visitorOptions._fromTime.getTime())); + selection.setToTimestamp( + spi::Timestamp(_visitorOptions._toTime.getTime())); auto cmd = std::make_shared<CreateIteratorCommand>(bucket, selection,_visitorOptions._fieldSet, _visitorOptions._visitRemoves @@ -1125,7 +1184,8 @@ Visitor::getIterators() } if (sentCount == 0) { if (LOG_WOULD_LOG(debug)) { - LOG(debug, "Enough iterators being processed. Doing nothing for visitor '%s' bucketStates = %zu.", + LOG(debug, "Enough iterators being processed. Doing nothing for " + "visitor '%s' bucketStates = %zu.", _id.c_str(), _bucketStates.size()); for (const auto& state : _bucketStates) { LOG(debug, "Existing: %s", state->toString().c_str()); diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 9b6d8e348b9..0737c5612c0 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -136,24 +136,28 @@ private: {} /** Sends DestroyIterator over _messageHandler if _iteratorId != 0 */ - ~BucketIterationState() override; + ~BucketIterationState(); void setCompleted(bool completed = true) { _completed = completed; } - [[nodiscard]] bool isCompleted() const { return _completed; } + bool isCompleted() const { return _completed; } - [[nodiscard]] document::Bucket getBucket() const { return _bucket; } - [[nodiscard]] document::BucketId getBucketId() const { return _bucket.getBucketId(); } + document::Bucket getBucket() const { return _bucket; } + document::BucketId getBucketId() const { return _bucket.getBucketId(); } void setIteratorId(spi::IteratorId iteratorId) { _iteratorId = iteratorId; } - [[nodiscard]] spi::IteratorId getIteratorId() const { return _iteratorId; } + spi::IteratorId getIteratorId() const { return _iteratorId; } - [[nodiscard]] bool hasPendingControlCommand() const { + void setPendingControlCommand() { + _iteratorId = spi::IteratorId(0); + } + + bool hasPendingControlCommand() const { return _iteratorId == spi::IteratorId(0); } - [[nodiscard]] bool hasPendingIterators() const { return _pendingIterators > 0; } + bool hasPendingIterators() const { return _pendingIterators > 0; } void print(std::ostream& out, bool, const std::string& ) const override { out << "BucketIterationState(" @@ -243,10 +247,12 @@ private: MessageMeta releaseMetaForMessageId(uint64_t msgId); void reinsertMeta(MessageMeta); - [[nodiscard]] bool hasQueuedMessages() const { return !_queuedMessages.empty(); } + bool hasQueuedMessages() const { return !_queuedMessages.empty(); } void discardQueuedMessages(); - [[nodiscard]] uint32_t getMemoryUsage() const noexcept { return _memoryUsage; } + uint32_t getMemoryUsage() const noexcept { + return _memoryUsage; + } VisitorTarget(); ~VisitorTarget(); @@ -320,9 +326,9 @@ protected: std::string _documentSelectionString; vdslib::VisitorStatistics _visitorStatistics; - [[nodiscard]] bool isCompletedCalled() const { return _calledCompletedVisitor; } + bool isCompletedCalled() const { return _calledCompletedVisitor; } - [[nodiscard]] uint32_t traceLevel() const noexcept { return _traceLevel; } + uint32_t traceLevel() const noexcept { return _traceLevel; } /** * Attempts to add the given trace message to the internal, memory bounded @@ -333,7 +339,7 @@ protected: */ bool addBoundedTrace(uint32_t level, const vespalib::string& message); - [[nodiscard]] const vdslib::Parameters& visitor_parameters() const noexcept; + const vdslib::Parameters& visitor_parameters() const noexcept; // Possibly modifies the ReturnCode parameter in-place if its return code should // be changed based on visitor subclass-specific behavior. @@ -411,7 +417,7 @@ public: * The consistency level provided here is propagated through the SPI * Context object for createIterator calls. */ - [[nodiscard]] virtual spi::ReadConsistency getRequiredReadConsistency() const { + virtual spi::ReadConsistency getRequiredReadConsistency() const { return spi::ReadConsistency::STRONG; } @@ -422,7 +428,8 @@ public: /** * Used to silence transient errors that can happen during normal operation. */ - [[nodiscard]] static bool shouldReportProblemToClient(const api::ReturnCode&, size_t retryCount) ; + bool shouldReportProblemToClient(const api::ReturnCode&, + size_t retryCount) const; /** Called to send report to client of potential non-critical problems. */ void reportProblem(const std::string& problem); @@ -485,16 +492,18 @@ public: void getStatus(std::ostream& out, bool verbose) const; - void setMaxParallel(uint32_t maxParallel) { _visitorOptions._maxParallel = maxParallel; } - void setMaxParallelPerBucket(uint32_t max) { _visitorOptions._maxParallelOneBucket = max; } + void setMaxParallel(uint32_t maxParallel) + { _visitorOptions._maxParallel = maxParallel; } + void setMaxParallelPerBucket(uint32_t max) + { _visitorOptions._maxParallelOneBucket = max; } /** * Sends a message to the data handler for this visitor. */ void sendMessage(std::unique_ptr<documentapi::DocumentMessage> documentMessage); - [[nodiscard]] bool isRunning() const { return _state == STATE_RUNNING; } - [[nodiscard]] bool isCompleted() const { return _state == STATE_COMPLETED; } + bool isRunning() const { return _state == STATE_RUNNING; } + bool isCompleted() const { return _state == STATE_COMPLETED; } private: /** @@ -533,9 +542,11 @@ private: void sendReplyOnce(); - [[nodiscard]] bool hasFailedVisiting() const { return _result.failed(); } - [[nodiscard]] bool hasPendingIterators() const { return !_bucketStates.empty(); } - [[nodiscard]] bool mayTransitionToCompleted() const; + bool hasFailedVisiting() const { return _result.failed(); } + + bool hasPendingIterators() const { return !_bucketStates.empty(); } + + bool mayTransitionToCompleted() const; void discardAllNoPendingBucketStates(); @@ -554,7 +565,9 @@ private: * * Precondition: attach() must have been called on `this`. */ - [[nodiscard]] bool shouldAddMbusTrace() const noexcept { return _traceLevel != 0; } + bool shouldAddMbusTrace() const noexcept { + return _traceLevel != 0; + } /** * Set internal state to the given state value. diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 07938002746..a03b9a9a8a3 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -187,8 +187,9 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi for (int32_t i=0; i<config->visitorthreads; ++i) { _visitorThread.emplace_back( // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager - std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory, - _visitorFactories, *_metrics->threads[i], *this)), + std::shared_ptr<VisitorThread>( + new VisitorThread(i, _componentRegister, _messageSessionFactory, + _visitorFactories, *_metrics->threads[i], *this)), std::map<api::VisitorId, std::string>()); } } @@ -449,7 +450,8 @@ VisitorManager::processReply(const std::shared_ptr<api::StorageReply>& reply) } void -VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& visitor) +VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, + Visitor& visitor) { assert(cmd->getType() == api::MessageType::INTERNAL); // Only add to internal state if not destroy iterator command, as @@ -458,7 +460,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& v if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) { MessageInfo inf; inf.id = visitor.getVisitorId(); - inf.timestamp = _component.getClock().getSystemTime(); + inf.timestamp = _component.getClock().getTimeInSeconds().getTime(); inf.timeout = cmd->getTimeout(); if (cmd->getAddress()) { @@ -621,7 +623,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out, out << "<tr>" << "<td>" << entry.first << "</td>" << "<td>" << entry.second.id << "</td>" - << "<td>" << vespalib::to_string(entry.second.timestamp) << "</td>" + << "<td>" << entry.second.timestamp << "</td>" << "<td>" << vespalib::count_ms(entry.second.timeout) << "</td>" << "<td>" << xml_content_escaped(entry.second.destination) << "</td>" << "</tr>\n"; diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 3e331e1c9a2..33703b392bc 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -57,7 +57,7 @@ private: struct MessageInfo { api::VisitorId id; - vespalib::system_time timestamp; + time_t timestamp; vespalib::duration timeout; std::string destination; }; @@ -168,7 +168,9 @@ private: * by the formula: fixed + variable * ((255 - priority) / 255) */ uint32_t maximumConcurrent(const api::CreateVisitorCommand& cmd) const { - return _maxFixedConcurrentVisitors + static_cast<uint32_t>(_maxVariableConcurrentVisitors * ((255.0 - cmd.getPriority()) / 255.0)); + return _maxFixedConcurrentVisitors + static_cast<uint32_t>( + _maxVariableConcurrentVisitors + * ((255.0 - cmd.getPriority()) / 255.0)); } void updateMetrics(const MetricLockGuard &) override; diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index e3ebef3a3ef..55ef83ba658 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -126,10 +126,10 @@ VisitorThread::shutdown() if (event._message.get()) { if (!event._message->getType().isReply() && (event._message->getType() != api::MessageType::INTERNAL - || dynamic_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) + || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) { std::shared_ptr<api::StorageReply> reply( - dynamic_cast<api::StorageCommand&>(*event._message).makeReply()); + static_cast<api::StorageCommand&>(*event._message).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); _messageSender.send(reply); } @@ -197,7 +197,7 @@ VisitorThread::run(framework::ThreadHandle& thread) // disappear when no visiting is done) if (entry._message.get() && (entry._message->getType() != api::MessageType::INTERNAL - || dynamic_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID)) + || static_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID)) { entry._timer.stop(_metrics.averageQueueWaitingTime); } @@ -290,7 +290,7 @@ VisitorThread::close() } else { _metrics.completedVisitors.inc(1); } - vespalib::steady_time currentTime(_component.getClock().getMonotonicTime()); + framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); trimRecentlyCompletedList(currentTime); _recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime); _visitors.erase(_currentlyRunningVisitor); @@ -298,9 +298,9 @@ VisitorThread::close() } void -VisitorThread::trimRecentlyCompletedList(vespalib::steady_time currentTime) +VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime) { - vespalib::steady_time recentLimit(currentTime - 30s); + framework::SecondTime recentLimit(currentTime - framework::SecondTime(30)); // Dump all elements that aren't recent anymore while (!_recentlyCompleted.empty() && _recentlyCompleted.front().second < recentLimit) @@ -313,7 +313,8 @@ void VisitorThread::handleNonExistingVisitorCall(const Event& entry, ReturnCode& code) { // Get current time. Set the time that is the oldest still recent. - trimRecentlyCompletedList(_component.getClock().getMonotonicTime()); + framework::SecondTime currentTime(_component.getClock().getTimeInSeconds()); + trimRecentlyCompletedList(currentTime); // Go through all recent visitors. Ignore request if recent for (const auto& e : _recentlyCompleted) { @@ -343,7 +344,7 @@ VisitorThread::createVisitor(vespalib::stringref libName, auto it = _visitorFactories.find(str); if (it == _visitorFactories.end()) { error << "Visitor library " << str << " not found."; - return {}; + return std::shared_ptr<Visitor>(); } auto libIter = _libs.find(str); @@ -362,7 +363,7 @@ VisitorThread::createVisitor(vespalib::stringref libName, } catch (std::exception& e) { error << "Failed to create visitor instance of type " << libName << ": " << e.what(); - return {}; + return std::shared_ptr<Visitor>(); } } @@ -689,7 +690,7 @@ VisitorThread::getStatus(vespalib::asciistream& out, } for (const auto& cv : _recentlyCompleted) { out << "<li> Visitor " << cv.first << " done at " - << vespalib::to_string(vespalib::to_utc(cv.second)) << "\n"; + << cv.second.getTime() << "\n"; } out << "</ul>\n"; out << "<h3>Current queue size: " << _queue.size() << "</h3>\n"; @@ -735,10 +736,12 @@ VisitorThread::getStatus(vespalib::asciistream& out, if (_visitors.empty()) { out << "None\n"; } - for (const auto & v : _visitors) { - out << "<a href=\"?visitor=" << v.first + for (VisitorMap::const_iterator it = _visitors.begin(); + it != _visitors.end(); ++it) + { + out << "<a href=\"?visitor=" << it->first << (verbose ? "&verbose" : "") << "\">Visitor " - << v.first << "</a><br>\n"; + << it->first << "</a><br>\n"; } } } diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index 56e40328fda..226e7c0631b 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -38,7 +38,7 @@ class VisitorThread : public framework::Runnable, using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>; VisitorMap _visitors; - std::deque<std::pair<api::VisitorId, vespalib::steady_time>> _recentlyCompleted; + std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted; struct Event { enum class Type { @@ -118,7 +118,7 @@ private: */ Event popNextQueuedEventIfAvailable(); void tick(); - void trimRecentlyCompletedList(vespalib::steady_time currentTime); + void trimRecentlyCompletedList(framework::SecondTime currentTime); void handleNonExistingVisitorCall(const Event& entry, api::ReturnCode& code); std::shared_ptr<Visitor> createVisitor(vespalib::stringref libName, diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h index d234f432f2b..0ca28f8114d 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h @@ -67,13 +67,9 @@ public: framework::SecondTime getTimeInSeconds() const override { return getTimeInMicros().getSeconds(); } - vespalib::system_time getSystemTime() const override { + framework::MonotonicTimePoint getMonotonicTime() const override { // For simplicity, assume fake monotonic time follows fake wall clock. - return vespalib::system_time(std::chrono::microseconds(getTimeInMicros().getTime())); - } - vespalib::steady_time getMonotonicTime() const override { - // For simplicity, assume fake monotonic time follows fake wall clock. - return vespalib::steady_time(std::chrono::microseconds(getTimeInMicros().getTime())); + return MonotonicTimePoint(std::chrono::microseconds(getTimeInMicros().getTime())); } }; diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp index df6115aa416..0303481feb5 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp @@ -5,36 +5,27 @@ namespace storage::framework::defaultimplementation { -MicroSecTime -RealClock::getTimeInMicros() const { +MicroSecTime RealClock::getTimeInMicros() const { struct timeval mytime; gettimeofday(&mytime, 0); return MicroSecTime(mytime.tv_sec * 1000000llu + mytime.tv_usec); } -MilliSecTime -RealClock::getTimeInMillis() const { +MilliSecTime RealClock::getTimeInMillis() const { struct timeval mytime; gettimeofday(&mytime, 0); return MilliSecTime( mytime.tv_sec * 1000llu + mytime.tv_usec / 1000); } -SecondTime -RealClock::getTimeInSeconds() const { +SecondTime RealClock::getTimeInSeconds() const { struct timeval mytime; gettimeofday(&mytime, 0); return SecondTime(mytime.tv_sec); } -vespalib::steady_time -RealClock::getMonotonicTime() const { - return vespalib::steady_clock::now(); -} - -vespalib::system_time -RealClock::getSystemTime() const { - return vespalib::system_clock::now(); +MonotonicTimePoint RealClock::getMonotonicTime() const { + return std::chrono::steady_clock::now(); } } diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h index de176a3e402..a4b80a990c9 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h @@ -17,8 +17,7 @@ struct RealClock : public Clock { MicroSecTime getTimeInMicros() const override; MilliSecTime getTimeInMillis() const override; SecondTime getTimeInSeconds() const override; - vespalib::steady_time getMonotonicTime() const override; - vespalib::system_time getSystemTime() const override; + MonotonicTimePoint getMonotonicTime() const override; }; } diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h index d228dace1ed..bd4afa6c9ad 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h +++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h @@ -21,14 +21,14 @@ namespace storage::framework::defaultimplementation { class TestComponentRegister { ComponentRegisterImpl::UP _compReg; - FakeClock _clock; - ThreadPoolImpl _threadPool; + FakeClock _clock; + ThreadPoolImpl _threadPool; public: - explicit TestComponentRegister(ComponentRegisterImpl::UP compReg); - virtual ~TestComponentRegister(); + TestComponentRegister(ComponentRegisterImpl::UP compReg); + ~TestComponentRegister(); - virtual ComponentRegisterImpl& getComponentRegister() { return *_compReg; } + ComponentRegisterImpl& getComponentRegister() { return *_compReg; } FakeClock& getClock() { return _clock; } ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; } FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } diff --git a/storage/src/vespa/storageframework/generic/clock/clock.h b/storage/src/vespa/storageframework/generic/clock/clock.h index e1f8419f069..c9b8f652bfe 100644 --- a/storage/src/vespa/storageframework/generic/clock/clock.h +++ b/storage/src/vespa/storageframework/generic/clock/clock.h @@ -28,8 +28,7 @@ struct Clock { virtual SecondTime getTimeInSeconds() const = 0; // Time point resolution is intentionally not defined here. - virtual vespalib::steady_time getMonotonicTime() const = 0; - virtual vespalib::system_time getSystemTime() const = 0; + virtual MonotonicTimePoint getMonotonicTime() const = 0; }; } diff --git a/storage/src/vespa/storageframework/generic/clock/time.h b/storage/src/vespa/storageframework/generic/clock/time.h index 882ff58fb74..372110a1374 100644 --- a/storage/src/vespa/storageframework/generic/clock/time.h +++ b/storage/src/vespa/storageframework/generic/clock/time.h @@ -10,8 +10,8 @@ namespace vespalib { namespace storage::framework { -using MonotonicTimePoint = vespalib::steady_time; -using MonotonicDuration = vespalib::duration; +using MonotonicTimePoint = std::chrono::steady_clock::time_point; +using MonotonicDuration = std::chrono::steady_clock::duration; struct Clock; @@ -111,6 +111,9 @@ struct SecondTime : public Time<SecondTime, 1000000> { explicit SecondTime(uint64_t t = 0) : Time<SecondTime, 1000000>(t) {} explicit SecondTime(const Clock& clock) : Time<SecondTime, 1000000>(getRawMicroTime(clock) / 1000000) {} + + [[nodiscard]] MilliSecTime getMillis() const; + [[nodiscard]] MicroSecTime getMicros() const; }; /** @@ -128,6 +131,7 @@ struct MilliSecTime : public Time<MilliSecTime, 1000> { explicit MilliSecTime(const Clock& clock) : Time<MilliSecTime, 1000>(getRawMicroTime(clock) / 1000) {} + [[nodiscard]] SecondTime getSeconds() const { return SecondTime(getTime() / 1000); } [[nodiscard]] MicroSecTime getMicros() const; }; @@ -150,6 +154,14 @@ struct MicroSecTime : public Time<MicroSecTime, 1> { [[nodiscard]] SecondTime getSeconds() const { return SecondTime(getTime() / 1000000); } }; +inline MilliSecTime SecondTime::getMillis() const { + return MilliSecTime(getTime() * 1000); +} + +inline MicroSecTime SecondTime::getMicros() const { + return MicroSecTime(getTime() * 1000 * 1000); +} + inline MicroSecTime MilliSecTime::getMicros() const { return MicroSecTime(getTime() * 1000); } @@ -168,6 +180,13 @@ operator + (MilliSecTime a, MilliSecTime b) { return result; } +inline SecondTime +operator + (SecondTime a, SecondTime b) { + SecondTime result(a); + result += b; + return result; +} + inline MicroSecTime operator - (MicroSecTime a, MicroSecTime b) { MicroSecTime result(a); @@ -175,4 +194,11 @@ operator - (MicroSecTime a, MicroSecTime b) { return result; } +inline SecondTime +operator - (SecondTime a, SecondTime b) { + SecondTime result(a); + result -= b; + return result; +} + } diff --git a/vdslib/src/tests/state/nodestatetest.cpp b/vdslib/src/tests/state/nodestatetest.cpp index c854adf3915..000542e77fe 100644 --- a/vdslib/src/tests/state/nodestatetest.cpp +++ b/vdslib/src/tests/state/nodestatetest.cpp @@ -20,7 +20,7 @@ TEST(NodeStateTest, test_parsing) { NodeState ns = NodeState("t:4"); EXPECT_EQ(std::string("s:u t:4"), ns.toString()); - EXPECT_EQ(4s, ns.getStartTimestamp().time_since_epoch()); + EXPECT_EQ(uint64_t(4), ns.getStartTimestamp()); } { NodeState ns = NodeState("s:u c:2.4 b:12"); diff --git a/vdslib/src/vespa/vdslib/state/nodestate.cpp b/vdslib/src/vespa/vdslib/state/nodestate.cpp index 0dd7f5abb4c..a7c5476456a 100644 --- a/vdslib/src/vespa/vdslib/state/nodestate.cpp +++ b/vdslib/src/vespa/vdslib/state/nodestate.cpp @@ -8,8 +8,9 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/stllike/asciistream.h> #include <sstream> - +#include <cmath> #include <vespa/log/log.h> + LOG_SETUP(".vdslib.nodestate"); namespace storage::lib { @@ -18,16 +19,16 @@ NodeState::NodeState(const NodeState &) = default; NodeState & NodeState::operator = (const NodeState &) = default; NodeState::NodeState(NodeState &&) noexcept = default; NodeState & NodeState::operator = (NodeState &&) noexcept = default; -NodeState::~NodeState() = default; +NodeState::~NodeState() { } NodeState::NodeState() - : _type(nullptr), - _state(nullptr), + : _type(0), + _state(0), _description(""), _capacity(1.0), _initProgress(0.0), _minUsedBits(16), - _startTimestamp() + _startTimestamp(0) { setState(State::UP); } @@ -35,12 +36,12 @@ NodeState::NodeState() NodeState::NodeState(const NodeType& type, const State& state, vespalib::stringref description, double capacity) : _type(&type), - _state(nullptr), + _state(0), _description(description), _capacity(1.0), _initProgress(0.0), _minUsedBits(16), - _startTimestamp() + _startTimestamp(0) { setState(state); if (type == NodeType::STORAGE) { @@ -55,24 +56,25 @@ NodeState::NodeState(vespalib::stringref serialized, const NodeType* type) _capacity(1.0), _initProgress(0.0), _minUsedBits(16), - _startTimestamp() + _startTimestamp(0) { vespalib::StringTokenizer st(serialized, " \t\f\r\n"); st.removeEmptyTokens(); - for (auto it : st) + for (vespalib::StringTokenizer::Iterator it = st.begin(); + it != st.end(); ++it) { - std::string::size_type index = it.find(':'); + std::string::size_type index = it->find(':'); if (index == std::string::npos) { throw vespalib::IllegalArgumentException( - "Token " + it + " does not contain ':': " + serialized, + "Token " + *it + " does not contain ':': " + serialized, VESPA_STRLOC); } - std::string key = it.substr(0, index); - std::string value = it.substr(index + 1); - if (!key.empty()) switch (key[0]) { + std::string key = it->substr(0, index); + std::string value = it->substr(index + 1); + if (key.size() > 0) switch (key[0]) { case 'b': - if (_type != nullptr && *type != NodeType::STORAGE) break; + if (_type != 0 && *type != NodeType::STORAGE) break; if (key.size() > 1) break; try{ setMinUsedBits(boost::lexical_cast<uint32_t>(value)); @@ -89,7 +91,7 @@ NodeState::NodeState(vespalib::stringref serialized, const NodeType* type) continue; case 'c': if (key.size() > 1) break; - if (_type != nullptr && *type != NodeType::STORAGE) break; + if (_type != 0 && *type != NodeType::STORAGE) break; try{ setCapacity(boost::lexical_cast<double>(value)); } catch (...) { @@ -113,7 +115,7 @@ NodeState::NodeState(vespalib::stringref serialized, const NodeType* type) case 't': if (key.size() > 1) break; try{ - setStartTimestamp(vespalib::system_time(std::chrono::seconds(boost::lexical_cast<uint64_t>(value)))); + setStartTimestamp(boost::lexical_cast<uint64_t>(value)); } catch (...) { throw vespalib::IllegalArgumentException( "Illegal start timestamp '" + value + "'. Start " @@ -163,7 +165,7 @@ NodeState::serialize(vespalib::asciistream & out, vespalib::stringref prefix, SeparatorPrinter sep; // Always give node state if not part of a system state // to prevent empty serialization - if (*_state != State::UP || prefix.empty()) { + if (*_state != State::UP || prefix.size() == 0) { out << sep << prefix << "s:"; out << _state->serialize(); } @@ -176,8 +178,8 @@ NodeState::serialize(vespalib::asciistream & out, vespalib::stringref prefix, if (*_state == State::INITIALIZING) { out << sep << prefix << "i:" << _initProgress; } - if (_startTimestamp != vespalib::system_time()) { - out << sep << prefix << "t:" << vespalib::count_s(_startTimestamp.time_since_epoch()); + if (_startTimestamp != 0) { + out << sep << prefix << "t:" << _startTimestamp; } if (includeDescription && ! _description.empty()) { out << sep << prefix << "m:" @@ -188,7 +190,7 @@ NodeState::serialize(vespalib::asciistream & out, vespalib::stringref prefix, void NodeState::setState(const State& state) { - if (_type != nullptr) { + if (_type != 0) { // We don't know whether you want to store reported, wanted or // current node state, so we must accept any. if (!state.validReportedNodeState(*_type) @@ -223,7 +225,7 @@ NodeState::setCapacity(vespalib::Double capacity) "must be a positive floating point number"; throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC); } - if (_type != nullptr && *_type != NodeType::STORAGE) { + if (_type != 0 && *_type != NodeType::STORAGE) { throw vespalib::IllegalArgumentException( "Capacity only make sense for storage nodes.", VESPA_STRLOC); } @@ -243,7 +245,7 @@ NodeState::setInitProgress(vespalib::Double initProgress) } void -NodeState::setStartTimestamp(vespalib::system_time startTimestamp) +NodeState::setStartTimestamp(uint64_t startTimestamp) { _startTimestamp = startTimestamp; } @@ -268,10 +270,10 @@ NodeState::print(std::ostream& out, bool verbose, if (*_state == State::INITIALIZING) { out << ", init progress " << _initProgress; } - if (_startTimestamp != vespalib::system_time()) { - out << ", start timestamp " << vespalib::to_string(_startTimestamp); + if (_startTimestamp != 0) { + out << ", start timestamp " << _startTimestamp; } - if (!_description.empty()) { + if (_description.size() > 0) { out << ": " << _description; } } @@ -315,7 +317,7 @@ NodeState::similarTo(const NodeState& other) const void NodeState::verifySupportForNodeType(const NodeType& type) const { - if (_type != nullptr && *_type == type) return; + if (_type != 0 && *_type == type) return; if (!_state->validReportedNodeState(type) && !_state->validWantedNodeState(type)) { @@ -355,8 +357,8 @@ NodeState::getTextualDifference(const NodeState& other) const { } } if (_startTimestamp != other._startTimestamp) { - source << ", start timestamp " << vespalib::to_string(_startTimestamp); - target << ", start timestamp " << vespalib::to_string(other._startTimestamp); + source << ", start timestamp " << _startTimestamp; + target << ", start timestamp " << other._startTimestamp; } if (source.str().length() < 2 || target.str().length() < 2) { diff --git a/vdslib/src/vespa/vdslib/state/nodestate.h b/vdslib/src/vespa/vdslib/state/nodestate.h index 4fb035b6dcd..541395e15cb 100644 --- a/vdslib/src/vespa/vdslib/state/nodestate.h +++ b/vdslib/src/vespa/vdslib/state/nodestate.h @@ -13,7 +13,6 @@ #include "state.h" #include <vespa/document/util/printable.h> #include <vespa/vespalib/objects/floatingpointtype.h> -#include <vespa/vespalib/util/time.h> #include <memory> namespace storage::lib { @@ -26,7 +25,7 @@ class NodeState : public document::Printable vespalib::Double _capacity; vespalib::Double _initProgress; uint32_t _minUsedBits; - vespalib::system_time _startTimestamp; + uint64_t _startTimestamp; public: using CSP = std::shared_ptr<const NodeState>; @@ -44,8 +43,8 @@ public: vespalib::stringref description = "", double capacity = 1.0); /** Set type if you want to verify that content fit with the given type. */ - explicit NodeState(vespalib::stringref serialized, const NodeType* nodeType = nullptr); - ~NodeState() override; + NodeState(vespalib::stringref serialized, const NodeType* nodeType = 0); + ~NodeState(); /** * Setting prefix to something implies using this function to write a @@ -55,27 +54,26 @@ public: void serialize(vespalib::asciistream & out, vespalib::stringref prefix = "", bool includeDescription = true) const; - [[nodiscard]] const State& getState() const { return *_state; } - [[nodiscard]] vespalib::Double getCapacity() const { return _capacity; } - [[nodiscard]] uint32_t getMinUsedBits() const { return _minUsedBits; } - [[nodiscard]] vespalib::Double getInitProgress() const { return _initProgress; } - [[nodiscard]] const vespalib::string& getDescription() const { return _description; } - [[nodiscard]] vespalib::system_time getStartTimestamp() const { return _startTimestamp; } + const State& getState() const { return *_state; } + vespalib::Double getCapacity() const { return _capacity; } + uint32_t getMinUsedBits() const { return _minUsedBits; } + vespalib::Double getInitProgress() const { return _initProgress; } + const vespalib::string& getDescription() const { return _description; } + uint64_t getStartTimestamp() const { return _startTimestamp; } void setState(const State& state); void setCapacity(vespalib::Double capacity); void setMinUsedBits(uint32_t usedBits); void setInitProgress(vespalib::Double initProgress); - void setStartTimestamp(vespalib::system_time startTimestamp); + void setStartTimestamp(uint64_t startTimestamp); void setDescription(vespalib::stringref desc) { _description = desc; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; bool operator==(const NodeState& other) const; - bool operator!=(const NodeState& other) const { - return !(operator==(other)); - } - [[nodiscard]] bool similarTo(const NodeState& other) const; + bool operator!=(const NodeState& other) const + { return !(operator==(other)); } + bool similarTo(const NodeState& other) const; /** * Verify that the contents of this object fits with the given nodetype. |