aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-07 05:06:25 +0100
committerGitHub <noreply@github.com>2023-02-07 05:06:25 +0100
commit9e403aa14c9d00b958de14336a346e60f67d803a (patch)
tree2b1a0d9ab7240e6b2557d7194aeef8e840bf75e9
parent8cc478d666c44c4fd7add6266dfea2c3e6b58b71 (diff)
parent9b95cd26d90665efa54cd7c4d7e872713fa72e44 (diff)
Merge pull request #25893 from vespa-engine/revert-25874-balder/less-getSeconds
Revert "Ă˜ess use of getSeconds/getMicroSeconds/getMilliSeconds."
-rw-r--r--storage/src/tests/common/metricstest.cpp26
-rw-r--r--storage/src/tests/common/teststorageapp.cpp34
-rw-r--r--storage/src/tests/common/teststorageapp.h27
-rw-r--r--storage/src/tests/distributor/bucketstateoperationtest.cpp8
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp24
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp3
-rw-r--r--storage/src/tests/distributor/joinbuckettest.cpp4
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp8
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp57
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp9
-rw-r--r--storage/src/tests/distributor/removelocationtest.cpp2
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp5
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp6
-rw-r--r--storage/src/tests/distributor/statoperationtest.cpp4
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp86
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp15
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp52
-rw-r--r--storage/src/tests/storageframework/clock/timetest.cpp11
-rw-r--r--storage/src/tests/storageserver/statereportertest.cpp12
-rw-r--r--storage/src/tests/storageserver/testvisitormessagesession.h5
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp18
-rw-r--r--storage/src/tests/visiting/visitortest.cpp8
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp9
-rw-r--r--storage/src/vespa/storage/common/statusmetricconsumer.cpp36
-rw-r--r--storage/src/vespa/storage/common/statusmetricconsumer.h16
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp109
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h18
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h5
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h16
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.h21
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp39
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h7
-rw-r--r--storage/src/vespa/storage/storageserver/opslogger.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp44
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h6
-rw-r--r--storage/src/vespa/storage/storageserver/statereporter.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp10
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h2
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp176
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h57
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp12
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h6
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp29
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h4
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h8
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp19
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h3
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h10
-rw-r--r--storage/src/vespa/storageframework/generic/clock/clock.h3
-rw-r--r--storage/src/vespa/storageframework/generic/clock/time.h30
-rw-r--r--vdslib/src/tests/state/nodestatetest.cpp2
-rw-r--r--vdslib/src/vespa/vdslib/state/nodestate.cpp60
-rw-r--r--vdslib/src/vespa/vdslib/state/nodestate.h28
64 files changed, 769 insertions, 527 deletions
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp
index 290d05e9a59..a92cf121fab 100644
--- a/storage/src/tests/common/metricstest.cpp
+++ b/storage/src/tests/common/metricstest.cpp
@@ -37,10 +37,10 @@ struct MetricsTest : public Test {
std::shared_ptr<BucketManagerMetrics> _bucketManagerMetrics;
std::shared_ptr<VisitorMetrics> _visitorMetrics;
- void createSnapshotForPeriod(std::chrono::seconds secs) const;
+ void createSnapshotForPeriod(std::chrono::seconds secs);
void assertMetricLastValue(const std::string& name,
int interval,
- uint64_t expected) const;
+ uint64_t expected);
MetricsTest();
~MetricsTest() override;
@@ -55,8 +55,8 @@ namespace {
{
framework::Clock& _clock;
explicit MetricClock(framework::Clock& c) : _clock(c) {}
- [[nodiscard]] time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); }
- time_t getTimeInMilliSecs() const override { return vespalib::count_ms(_clock.getMonotonicTime().time_since_epoch()); }
+ time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); }
+ time_t getTimeInMilliSecs() const override { return _clock.getTimeInMillis().getTime(); }
};
}
@@ -137,8 +137,8 @@ void MetricsTest::createFakeLoad()
disk.queueSize.addValue(4 * n);
disk.averageQueueWaitingTime.addValue(10 * n);
disk.pendingMerges.addValue(4 * n);
- for (const auto & metric : disk.threads) {
- FileStorThreadMetrics& thread(*metric);
+ for (uint32_t j=0; j<disk.threads.size(); ++j) {
+ FileStorThreadMetrics& thread(*disk.threads[j]);
thread.operations.inc(120 * n);
thread.failedOperations.inc(2 * n);
@@ -180,8 +180,8 @@ void MetricsTest::createFakeLoad()
thread.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue(0.8);
}
}
- for (const auto & metric : _visitorMetrics->threads) {
- VisitorThreadMetrics& thread(*metric);
+ for (uint32_t i=0; i<_visitorMetrics->threads.size(); ++i) {
+ VisitorThreadMetrics& thread(*_visitorMetrics->threads[i]);
thread.queueSize.addValue(2);
thread.averageQueueWaitingTime.addValue(10);
thread.averageVisitorLifeTime.addValue(1000);
@@ -192,7 +192,9 @@ void MetricsTest::createFakeLoad()
}
_clock->addSecondsToTime(60);
_metricManager->timeChangedNotification();
- while (uint64_t(_metricManager->getLastProcessedTime()) < _clock->getTimeInSeconds().getTime()) {
+ while (uint64_t(_metricManager->getLastProcessedTime())
+ < _clock->getTimeInSeconds().getTime())
+ {
std::this_thread::sleep_for(5ms);
_metricManager->timeChangedNotification();
}
@@ -282,7 +284,9 @@ TEST_F(MetricsTest, html_metrics_report) {
}
void
-MetricsTest::assertMetricLastValue(const std::string& name, int interval, uint64_t expected) const
+MetricsTest::assertMetricLastValue(const std::string& name,
+ int interval,
+ uint64_t expected)
{
std::ostringstream path;
path << "metrics?interval=" << interval
@@ -301,7 +305,7 @@ MetricsTest::assertMetricLastValue(const std::string& name, int interval, uint64
using namespace std::chrono_literals;
void
-MetricsTest::createSnapshotForPeriod(std::chrono::seconds secs) const
+MetricsTest::createSnapshotForPeriod(std::chrono::seconds secs)
{
_clock->addSecondsToTime(secs.count());
_metricManager->timeChangedNotification();
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index 94996346d73..91fdf5aa602 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -22,6 +22,18 @@ using storage::framework::defaultimplementation::ComponentRegisterImpl;
namespace storage {
+namespace {
+ template<typename T>
+ struct ConfigReader : public T::Subscriber,
+ public T
+ {
+ ConfigReader(const std::string& configId) {
+ T::subscribe(configId, *this);
+ }
+ void configure(const T& c) { dynamic_cast<T&>(*this) = c; }
+ };
+}
+
TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg,
const lib::NodeType& type, NodeIndex index,
vespalib::stringref configId)
@@ -72,7 +84,7 @@ TestStorageApp::setDistribution(Redundancy redundancy, NodeCount nodeCount)
void
TestStorageApp::setTypeRepo(std::shared_ptr<const document::DocumentTypeRepo> repo)
{
- _compReg.setDocumentTypeRepo(std::move(repo));
+ _compReg.setDocumentTypeRepo(repo);
}
void
@@ -82,19 +94,21 @@ TestStorageApp::setClusterState(const lib::ClusterState& c)
}
void
-TestStorageApp::waitUntilInitialized(StorageBucketDBInitializer* initializer, vespalib::duration timeout) const
+TestStorageApp::waitUntilInitialized(
+ StorageBucketDBInitializer* initializer, framework::SecondTime timeout)
{
// Always use real clock for wait timeouts. Component clock may be faked
// in tests
framework::defaultimplementation::RealClock clock;
- vespalib::steady_time endTime(clock.getMonotonicTime() + timeout);
+ framework::MilliSecTime endTime(clock.getTimeInMillis() + timeout.getMillis());
while (!isInitialized()) {
std::this_thread::sleep_for(1ms);
- vespalib::steady_time currentTime(clock.getMonotonicTime());
+ framework::MilliSecTime currentTime(clock.getTimeInMillis());
if (currentTime > endTime) {
std::ostringstream error;
- error << "Failed to initialize service layer within timeout of " << vespalib::to_s(timeout) << " seconds.";
- if (initializer != nullptr) {
+ error << "Failed to initialize service layer within timeout of "
+ << timeout << " seconds.";
+ if (initializer != 0) {
error << " ";
LOG(error, "%s", error.str().c_str());
throw std::runtime_error(error.str());
@@ -160,7 +174,7 @@ TestServiceLayerApp::setPersistenceProvider(PersistenceProviderUP provider)
spi::PersistenceProvider&
TestServiceLayerApp::getPersistenceProvider()
{
- if ( ! _persistenceProvider) {
+ if (_persistenceProvider.get() == 0) {
throw vespalib::IllegalStateException("Persistence provider requested but not initialized.", VESPA_STRLOC);
}
return *_persistenceProvider;
@@ -168,7 +182,7 @@ TestServiceLayerApp::getPersistenceProvider()
namespace {
template<typename T>
- T getConfig(vespalib::stringref configId) {
+ const T getConfig(vespalib::stringref configId) {
config::ConfigUri uri(configId);
return *config::ConfigGetter<T>::getConfig(uri.getConfigId(), uri.getContext());
}
@@ -178,9 +192,9 @@ void
TestDistributorApp::configure(vespalib::stringref id)
{
if (id.empty()) return;
- auto dc(getConfig<vespa::config::content::core::StorDistributormanagerConfig>(id));
+ DistributorConfig dc(getConfig<vespa::config::content::core::StorDistributormanagerConfig>(id));
_compReg.setDistributorConfig(dc);
- auto vc(getConfig<vespa::config::content::core::StorVisitordispatcherConfig>(id));
+ VisitorConfig vc(getConfig<vespa::config::content::core::StorVisitordispatcherConfig>(id));
_compReg.setVisitorConfig(vc);
}
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index 7ca910721b3..de1dc99bb6e 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -68,7 +68,7 @@ public:
TestStorageApp(StorageComponentRegisterImpl::UP compReg,
const lib::NodeType&, NodeIndex = NodeIndex(0xffff),
vespalib::stringref configId = "");
- ~TestStorageApp() override;
+ ~TestStorageApp();
// Set functions, to be able to modify content while running.
void setDistribution(Redundancy, NodeCount);
@@ -77,12 +77,15 @@ public:
// Utility functions for getting a hold of currently used bits. Practical
// to avoid adding extra components in the tests.
- virtual StorageComponentRegisterImpl& getComponentRegister() override { return _compReg; }
+ StorageComponentRegisterImpl& getComponentRegister() { return _compReg; }
document::TestDocMan& getTestDocMan() { return _docMan; }
- std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() { return _compReg.getTypeRepo(); }
- const document::BucketIdFactory& getBucketIdFactory() { return _compReg.getBucketIdFactory(); }
+ std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo()
+ { return _compReg.getTypeRepo(); }
+ const document::BucketIdFactory& getBucketIdFactory()
+ { return _compReg.getBucketIdFactory(); }
TestNodeStateUpdater& getStateUpdater() { return _nodeStateUpdater; }
- std::shared_ptr<lib::Distribution> & getDistribution() { return _compReg.getDistribution(); }
+ std::shared_ptr<lib::Distribution> & getDistribution()
+ { return _compReg.getDistribution(); }
TestNodeStateUpdater& getNodeStateUpdater() { return _nodeStateUpdater; }
uint16_t getIndex() const { return _compReg.getIndex(); }
const NodeIdentity& node_identity() const noexcept { return _node_identity; }
@@ -92,7 +95,9 @@ public:
DoneInitializeHandler& getDoneInitializeHandler() { return *this; }
void notifyDoneInitializing() override { _initialized = true; }
bool isInitialized() const { return _initialized; }
- void waitUntilInitialized(StorageBucketDBInitializer* initializer = nullptr, vespalib::duration timeout = 30s) const;
+ void waitUntilInitialized(
+ StorageBucketDBInitializer* initializer = 0,
+ framework::SecondTime timeout = framework::SecondTime(30));
private:
// Storage server interface implementation (until we can remove it)
@@ -111,14 +116,14 @@ class TestServiceLayerApp : public TestStorageApp
HostInfo _host_info;
public:
- explicit TestServiceLayerApp(vespalib::stringref configId);
- explicit TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = "");
- ~TestServiceLayerApp() override;
+ TestServiceLayerApp(vespalib::stringref configId);
+ TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = "");
+ ~TestServiceLayerApp();
void setupDummyPersistence();
void setPersistenceProvider(PersistenceProviderUP);
- ServiceLayerComponentRegisterImpl& getComponentRegister() override { return _compReg; }
+ ServiceLayerComponentRegisterImpl& getComponentRegister() { return _compReg; }
HostInfo &get_host_info() noexcept { return _host_info; }
spi::PersistenceProvider& getPersistenceProvider();
@@ -148,7 +153,7 @@ public:
explicit TestDistributorApp(NodeIndex index, vespalib::stringref configId = "");
~TestDistributorApp() override;
- DistributorComponentRegisterImpl& getComponentRegister() override {
+ DistributorComponentRegisterImpl& getComponentRegister() {
return _compReg;
}
diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp
index 42ee4675e26..5d11f9653ea 100644
--- a/storage/src/tests/distributor/bucketstateoperationtest.cpp
+++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp
@@ -43,7 +43,7 @@ TEST_F(BucketStateOperationTest, activate_single_node) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(1, _sender.commands().size());
@@ -79,7 +79,7 @@ TEST_F(BucketStateOperationTest, activate_and_deactivate_nodes) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(1, _sender.commands().size());
{
@@ -135,7 +135,7 @@ TEST_F(BucketStateOperationTest, do_not_deactivate_if_activate_fails) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(1, _sender.commands().size());
{
@@ -178,7 +178,7 @@ TEST_F(BucketStateOperationTest, bucket_db_not_updated_on_failure) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(1, _sender.commands().size());
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index c2f4836f4cb..1a104727f43 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -139,7 +139,7 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
TEST_F(GarbageCollectionOperationTest, simple_legacy) {
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
EXPECT_FALSE(op->is_two_phase());
ASSERT_EQ(2, _sender.commands().size());
@@ -158,7 +158,7 @@ TEST_F(GarbageCollectionOperationTest, simple_legacy) {
TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) {
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
EXPECT_EQ(0u, gc_removed_documents_metric());
@@ -175,7 +175,7 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until
TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
reply_to_nth_request(*op, 0, 1234, 0);
@@ -195,20 +195,20 @@ TEST_F(GarbageCollectionOperationTest, two_phase_gc_requires_config_enabling_and
// Config enabled, but only 1 node says it supports two-phase RemoveLocation
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
EXPECT_FALSE(op->is_two_phase());
// Node 0 suddenly upgraded...!
set_node_supported_features(0, with_two_phase);
op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
EXPECT_TRUE(op->is_two_phase());
// But doesn't matter if two-phase GC is config-disabled
config_enable_two_phase_gc(false);
op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
EXPECT_FALSE(op->is_two_phase());
}
@@ -216,7 +216,7 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l
enable_two_phase_gc();
auto op = create_op();
op->setPriority(getConfig().getMaintenancePriorities().garbageCollection);
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
for (int i : {0, 1}) {
@@ -229,7 +229,7 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l
TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_union_of_returned_entries_with_feed_pri) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
@@ -256,7 +256,7 @@ TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_un
TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_results) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
@@ -272,7 +272,7 @@ TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_res
TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
@@ -314,7 +314,7 @@ struct GarbageCollectionOperationPhase1FailureTest : GarbageCollectionOperationT
enable_two_phase_gc();
_op = create_op();
- _op->start(_sender);
+ _op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
_r1 = make_remove_location_reply(*_sender.command(0));
@@ -367,7 +367,7 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in
TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 8d188f6c005..6b27af63fb7 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -4,6 +4,7 @@
#include <vespa/config/helper/configgetter.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/config/documenttypes_config_fwd.h>
+#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
@@ -61,7 +62,7 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
msg, metrics().gets,
consistency);
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
}
static constexpr uint32_t LastCommand = UINT32_MAX;
diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp
index 570fe24679e..bc87893b610 100644
--- a/storage/src/tests/distributor/joinbuckettest.cpp
+++ b/storage/src/tests/distributor/joinbuckettest.cpp
@@ -46,7 +46,7 @@ TEST_F(JoinOperationTest, simple) {
document::BucketId(33, 0x100000001)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}});
@@ -103,7 +103,7 @@ TEST_F(JoinOperationTest, send_sparse_joins_to_nodes_without_both_source_buckets
document::BucketId(33, 0x100000001)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}));
ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}}));
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 512c092d8ae..9e0c89819a7 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -71,7 +71,7 @@ MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes, Pr
auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes));
op->setIdealStateManager(&getIdealStateManager());
op->setPriority(merge_pri);
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
return op;
}
@@ -291,7 +291,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
@@ -352,7 +352,7 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) {
MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
std::string merge(
"MergeBucketCommand(BucketId(0x4000000000000001), to time "
@@ -501,7 +501,7 @@ TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
const uint16_t max_merge_size = 2;
MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2, 3)), max_merge_size);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
// Must include missing node 0 and not just 2 existing replicas
EXPECT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 3bfa1027a82..0ba374f7190 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -175,7 +175,7 @@ TEST_F(PendingMessageTrackerTest, simple) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> "
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> "
"Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
"</ul>\n"));
}
@@ -248,17 +248,17 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
"</ul>\n"
"<b>Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000011d7))</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"));
}
{
@@ -268,23 +268,44 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
EXPECT_THAT(ost.str(), HasSubstr(
"<b>Node 0 (pending count: 4)</b>\n"
"<ul>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 0</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 0</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"
"<b>Node 1 (pending count: 4)</b>\n"
"<ul>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
- "<li><i>Node 1</i>: <b>1970-01-01 00:00:01.000 UTC</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000004d2), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
+ "<li><i>Node 1</i>: <b>1970-01-01 00:00:01</b> Remove(BucketId(0x40000000000011d7), priority=127)</li>\n"
"</ul>\n"));
}
}
namespace {
+template <typename T>
+std::string setToString(const std::set<T>& s)
+{
+ std::ostringstream ost;
+ ost << '{';
+ for (typename std::set<T>::const_iterator i(s.begin()), e(s.end());
+ i != e; ++i)
+ {
+ if (i != s.begin()) {
+ ost << ',';
+ }
+ ost << *i;
+ }
+ ost << '}';
+ return ost.str();
+}
+
+}
+
+namespace {
+
class TestChecker : public PendingMessageTracker::Checker
{
public:
@@ -422,7 +443,7 @@ TEST_F(PendingMessageTrackerTest, busy_reply_marks_node_as_busy) {
TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) {
Fixture f;
auto cmd = f.sendPut(RequestBuilder().toNode(0));
- f.tracker().setNodeBusyDuration(10s);
+ f.tracker().setNodeBusyDuration(std::chrono::seconds(10));
f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0));
f.clock().addSecondsToTime(11);
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 735666e5c89..53773a55826 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -76,7 +76,7 @@ public:
getDistributorBucketSpace(),
msg,
metrics().puts);
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
}
const document::DocumentType& doc_type() const {
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index 68d86884036..971ff36c833 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "dummy_cluster_context.h"
+#include <tests/common/dummystoragelink.h>
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -36,9 +37,9 @@ TEST_F(RemoveBucketOperationTest, simple) {
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
- toVector<uint16_t>(1,2)));
+ toVector<uint16_t>(1,2)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Delete bucket => 1,"
@@ -70,7 +71,7 @@ TEST_F(RemoveBucketOperationTest, bucket_info_mismatch_failure) {
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(1)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true));
ASSERT_EQ(1, _sender.commands().size());
@@ -105,7 +106,7 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) {
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(1)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true));
ASSERT_EQ(1, _sender.commands().size());
diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp
index b19a448199b..889b5c833af 100644
--- a/storage/src/tests/distributor/removelocationtest.cpp
+++ b/storage/src/tests/distributor/removelocationtest.cpp
@@ -35,7 +35,7 @@ struct RemoveLocationOperationTest : Test, DistributorStripeTestUtil {
msg,
metrics().removelocations);
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
}
};
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index ed24b7271b8..b3104cae623 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -1,5 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <iomanip>
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -40,7 +41,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
msg,
metrics().removes);
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
}
void replyToMessage(RemoveOperation& callback,
@@ -54,7 +55,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
std::shared_ptr<api::StorageMessage> msg2 = _sender.command(index);
auto* removec = dynamic_cast<api::RemoveCommand*>(msg2.get());
std::unique_ptr<api::StorageReply> reply(removec->makeReply());
- auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get());
+ auto* removeR = static_cast<api::RemoveReply*>(reply.get());
removeR->setOldTimestamp(oldTimestamp);
callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release()));
}
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index edb392d9532..1e951029994 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -65,7 +65,7 @@ TEST_F(SplitOperationTest, simple) {
splitByteSize);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
{
ASSERT_EQ(1, _sender.commands().size());
@@ -134,7 +134,7 @@ TEST_F(SplitOperationTest, multi_node_failure) {
splitByteSize);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
{
ASSERT_EQ(2, _sender.commands().size());
@@ -218,7 +218,7 @@ TEST_F(SplitOperationTest, copy_trusted_status_not_carried_over_after_split) {
splitByteSize);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(3, _sender.commands().size());
diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp
index ec0165dde05..e3323d601df 100644
--- a/storage/src/tests/distributor/statoperationtest.cpp
+++ b/storage/src/tests/distributor/statoperationtest.cpp
@@ -35,7 +35,7 @@ TEST_F(StatOperationTest, bucket_info) {
std::make_shared<api::StatBucketCommand>(
makeDocumentBucket(document::BucketId(16, 5)), ""));
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Statbucket => 0,Statbucket => 1", _sender.getCommands(true));
@@ -76,7 +76,7 @@ TEST_F(StatOperationTest, bucket_list) {
getIdealStateManager(),
node_context().node_index(),
msg);
- op.start(_sender);
+ op.start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(1, _sender.replies().size());
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 579fd156962..6eb15bf05e7 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -154,7 +154,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
return cb;
}
@@ -342,7 +342,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
TEST_F(TwoPhaseUpdateOperationTest, simple) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0", _sender.getCommands(true));
@@ -359,7 +359,7 @@ TEST_F(TwoPhaseUpdateOperationTest, simple) {
TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)",
@@ -371,7 +371,7 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0", _sender.getCommands(true));
@@ -386,7 +386,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -413,7 +413,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) {
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -434,7 +434,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found)
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_error) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -451,7 +451,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_err
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -473,7 +473,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error)
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -501,7 +501,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error)
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_started) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -526,7 +526,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_st
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsistent_split) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3", UpdateOptions().makeInconsistentSplit(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
std::string wanted("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x440000000000cac4), id:ns:testdoctype1::1) => 0");
@@ -567,7 +567,7 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_update) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0", _sender.getCommands(true));
@@ -578,7 +578,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_upd
TEST_F(TwoPhaseUpdateOperationTest, n_of_m) {
setup_stripe(2, 2, "storage:2 distributor:1", 1);
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -606,7 +606,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document)
setup_stripe(3, 3, "storage:3 distributor:1");
// 0,1 in sync. 2 out of sync.
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 2",
@@ -639,7 +639,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document)
TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_all_empty_gets) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 0, false);
@@ -670,7 +670,7 @@ TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_a
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 0, false);
@@ -697,7 +697,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) {
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
@@ -717,7 +717,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setup_stripe(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError());
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 50);
@@ -737,7 +737,7 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
"Reasons to start: => 0,"
@@ -766,7 +766,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_time
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().timestampToUpdate(1234));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 100);
@@ -786,7 +786,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_time
TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings_to_gets_and_puts) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
checkMessageSettingsPropagatedTo(_sender.command(0));
@@ -805,7 +805,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings
TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replies) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
@@ -832,7 +832,7 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
setup_stripe(2, 2, "storage:2 distributor:1");
// Update towards inconsistent bucket invokes safe path.
auto cb = sendUpdate("0=1/2/3,1=2/3/4");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -875,7 +875,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
// Newest doc has headerval==110, not 120.
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
@@ -894,7 +894,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_up
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==110"));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
@@ -904,7 +904,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.san==fran...cisco"));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
@@ -924,7 +924,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_w
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("langbein.headerval=1234"));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
@@ -943,7 +943,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
// Both Gets return nothing at all, nothing at all.
replyToGet(*cb, _sender, 0, 100, false);
replyToGet(*cb, _sender, 1, 110, false);
@@ -964,7 +964,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_aut
.condition("testdoctype1.headerval==120")
.createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
replyToGet(*cb, _sender, 0, 0, false);
replyToGet(*cb, _sender, 1, 0, false);
ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
@@ -999,7 +999,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
setup_stripe(1, 1, "storage:1 distributor:1");
// Only 1 replica; consistent with itself by definition.
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0", _sender.getCommands(true));
// Close the operation. This should generate a single reply that is
@@ -1017,7 +1017,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Closing the operation should now only return an ABORTED reply for
@@ -1037,7 +1037,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_re
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1065,7 +1065,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1086,7 +1086,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
@@ -1112,7 +1112,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_foun
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, Timestamp(0), false);
@@ -1131,7 +1131,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_no_initial_replic
// No replicas, technically consistent but cannot use fast path.
auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0",
_sender.getCommands(true));
}
@@ -1146,7 +1146,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
@@ -1156,7 +1156,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency
TEST_F(TwoPhaseUpdateOperationTest, operation_is_rejected_in_safe_path_if_feed_is_blocked) {
set_up_distributor_with_feed_blocked_state();
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas to trigger safe path
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
@@ -1275,7 +1275,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_
cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
// Add new replica to deterministic test bucket after gets have been sent
BucketId bucket(0x400000000000cac4); // Always the same in the test.
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
@@ -1299,7 +1299,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_cannot_restart_in_fast_path) {
cfg->set_update_fast_path_restart_enabled(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
reply_to_metadata_get(*cb, _sender, 0, 1000U);
@@ -1360,7 +1360,7 @@ TEST_F(ThreePhaseUpdateTest, safe_mode_is_implicitly_triggered_if_no_replicas_ex
cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
configure_stripe(cfg);
auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
"Reasons to start: => 0,"
@@ -1419,7 +1419,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_tombstone_is_no_op_without_auto_cre
cfg->set_update_fast_path_restart_enabled(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4");
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
reply_to_metadata_get(*cb, _sender, 0, 1000U);
@@ -1443,7 +1443,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_tombstone_sends_puts_with_auto_crea
cfg->set_update_fast_path_restart_enabled(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender);
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
reply_to_metadata_get(*cb, _sender, 0, 1000U);
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index d0ae31b9524..f0cb30368cb 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -1,5 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/repo/documenttyperepo.h>
@@ -90,7 +91,7 @@ TEST_F(UpdateOperationTest, simple) {
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender);
+ cb->start(sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0", sender.getCommands(true));
@@ -109,7 +110,7 @@ TEST_F(UpdateOperationTest, not_found) {
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender);
+ cb->start(sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0", sender.getCommands(true));
@@ -124,7 +125,7 @@ TEST_F(UpdateOperationTest, multi_node) {
setup_stripe(2, 2, "distributor:1 storage:2");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender);
+ cb->start(sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
@@ -148,7 +149,7 @@ TEST_F(UpdateOperationTest, multi_node_inconsistent_timestamp) {
setup_stripe(2, 2, "distributor:1 storage:2");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender);
+ cb->start(sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
@@ -168,7 +169,7 @@ TEST_F(UpdateOperationTest, test_and_set_failures_increment_tas_metric) {
setup_stripe(2, 2, "distributor:1 storage:1");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender);
+ cb->start(sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0", sender.getCommands(true));
api::ReturnCode result(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, "bork bork");
replyToMessage(*cb, sender, 0, 1234, api::BucketInfo(), result);
@@ -197,7 +198,7 @@ TEST_F(UpdateOperationTest, create_if_missing_update_sentinel_timestamp_is_treat
setup_stripe(2, 2, "distributor:1 storage:2");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3", true));
DistributorMessageSenderStub sender;
- cb->start(sender);
+ cb->start(sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
@@ -219,7 +220,7 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest
setup_stripe(2, 3, "distributor:1 storage:3");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3", true));
DistributorMessageSenderStub sender;
- cb->start(sender);
+ cb->start(sender, framework::MilliSecTime(0));
ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true));
replyToMessage(*cb, sender, 0, 100); // Newly created
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index ecfa7232def..6c597b620dd 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -41,7 +41,7 @@ struct VisitorOperationTest : Test, DistributorStripeTestUtil {
document::BucketId nullId;
VisitorOperation::Config defaultConfig;
- static api::CreateVisitorCommand::SP
+ api::CreateVisitorCommand::SP
createVisitorCommand(std::string instanceId,
document::BucketId superBucket,
document::BucketId lastBucket,
@@ -122,7 +122,7 @@ struct VisitorOperationTest : Test, DistributorStripeTestUtil {
*/
std::string runEmptyVisitor(api::CreateVisitorCommand::SP msg) {
auto op = createOpWithDefaultConfig(std::move(msg));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
return _sender.getLastReply();
}
@@ -178,7 +178,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState)
auto op = createOpWithDefaultConfig(std::move(msg));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -228,7 +228,7 @@ TEST_F(VisitorOperationTest, shutdown) {
auto op = createOpWithDefaultConfig(std::move(msg));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -296,7 +296,7 @@ TEST_F(VisitorOperationTest, no_resend_after_timeout_passed) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20ms));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -346,7 +346,7 @@ TEST_F(VisitorOperationTest, user_single_bucket) {
"dumpvisitor",
"true"));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)) << _sender.getLastReply();
sendReply(*op);
@@ -371,7 +371,7 @@ VisitorOperationTest::runVisitor(document::BucketId id,
"dumpvisitor",
"true"));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
sendReply(*op);
@@ -437,7 +437,7 @@ TEST_F(VisitorOperationTest, bucket_removed_while_visitor_pending) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("removefrombucketdb", id, nullId));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -459,7 +459,7 @@ TEST_F(VisitorOperationTest, empty_buckets_visited_when_visiting_removes) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("emptybucket", id, nullId, 8, 500ms, false, true));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
// Since visitRemoves is true, the empty bucket will be visited
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -474,7 +474,7 @@ TEST_F(VisitorOperationTest, resend_to_other_storage_node_on_failure) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("emptyinconsistent", id, nullId));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -502,7 +502,7 @@ TEST_F(VisitorOperationTest, timeout_only_after_reply_from_all_storage_nodes) {
nullId,
8));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0,Visitor Create => 1",
_sender.getCommands(true));
@@ -539,7 +539,7 @@ TEST_F(VisitorOperationTest, timeout_does_not_override_critical_error) {
8,
500ms)); // ms timeout
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0,Visitor Create => 1",
_sender.getCommands(true));
@@ -614,7 +614,7 @@ TEST_F(VisitorOperationTest, bucket_high_bit_count) {
"dumpvisitor",
"true"));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
EXPECT_EQ("Visitor Create => 0", _sender.getCommands(true));
}
@@ -640,7 +640,7 @@ TEST_F(VisitorOperationTest, bucket_low_bit_count) {
"dumpvisitor",
"true"));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
"ReturnCode(WRONG_DISTRIBUTION, distributor:1 storage:1)",
_sender.getLastReply());
@@ -661,7 +661,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node) {
createVisitorCommand("multiplebuckets", id, nullId, 31),
VisitorOperation::Config(1, 4));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0,Visitor Create => 0,"
"Visitor Create => 0,Visitor Create => 0",
@@ -708,7 +708,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node) {
createVisitorCommand("multiplebuckets", id, document::BucketId(0x54000000000f0001), 31),
VisitorOperation::Config(minBucketsPerVisitor, maxVisitorsPerNode));
- op2->start(_sender);
+ op2->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -736,7 +736,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_resend_only_failing) {
createVisitorCommand("multiplebuckets", id, nullId, 31),
VisitorOperation::Config(minBucketsPerVisitor, maxVisitorsPerNode));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0,Visitor Create => 0,"
"Visitor Create => 0,Visitor Create => 0",
@@ -775,7 +775,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node_one_super_buc
createVisitorCommand("multiplebucketsonesuper", id, nullId),
VisitorOperation::Config(5, 4));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -833,7 +833,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) {
createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500ms, true),
VisitorOperation::Config(5, 4));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 1", _sender.getCommands(true));
@@ -858,7 +858,7 @@ TEST_F(VisitorOperationTest, visit_ideal_node) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("multinode", id, nullId, 8));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -889,7 +889,7 @@ TEST_F(VisitorOperationTest, no_resending_on_critical_failure) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("multinodefailurecritical", id, nullId, 8));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -913,7 +913,7 @@ TEST_F(VisitorOperationTest, failure_on_all_nodes) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("multinodefailurecritical", id, nullId, 8));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -992,7 +992,7 @@ VisitorOperationTest::startOperationWith2StorageNodeVisitors(bool inconsistent)
500ms,
inconsistent));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
assert(_sender.getCommands(true) == "Visitor Create => 0,Visitor Create => 1");
return op;
@@ -1043,7 +1043,7 @@ TEST_F(VisitorOperationTest, queue_timeout_is_factor_of_total_timeout) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("foo", id, nullId, 8, 10000ms));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0));
@@ -1061,7 +1061,7 @@ VisitorOperationTest::do_visitor_roundtrip_with_statistics(
auto op = createOpWithDefaultConfig(
createVisitorCommand("metricstats", id, nullId));
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0));
auto reply = cmd.makeReply();
@@ -1109,7 +1109,7 @@ TEST_F(VisitorOperationTest, assigning_put_lock_access_token_sets_special_visito
auto op = createOpWithDefaultConfig(createVisitorCommand("metricstats", id, nullId));
op->assign_put_lock_access_token("its-a me, mario");
- op->start(_sender);
+ op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto cmd = std::dynamic_pointer_cast<api::CreateVisitorCommand>(_sender.command(0));
ASSERT_TRUE(cmd);
diff --git a/storage/src/tests/storageframework/clock/timetest.cpp b/storage/src/tests/storageframework/clock/timetest.cpp
index 9dbcdd409d8..e1270156aa0 100644
--- a/storage/src/tests/storageframework/clock/timetest.cpp
+++ b/storage/src/tests/storageframework/clock/timetest.cpp
@@ -8,12 +8,16 @@ namespace storage::framework::defaultimplementation {
TEST(TimeTest, testBasics)
{
+ SecondTime timeSec(1);
- MilliSecTime timeMillis(1000);
+ MilliSecTime timeMillis = timeSec.getMillis();
EXPECT_EQ(uint64_t(1000), timeMillis.getTime());
+ EXPECT_EQ(timeSec, timeMillis.getSeconds());
- MicroSecTime timeMicros = timeMillis.getMicros();
+ MicroSecTime timeMicros = timeSec.getMicros();
+ EXPECT_EQ(timeSec.getMicros(), timeMillis.getMicros());
EXPECT_EQ(timeMillis, timeMicros.getMillis());
+ EXPECT_EQ(timeSec, timeMicros.getSeconds());
MicroSecTime timeMicros2 = timeMicros;
EXPECT_EQ(timeMicros2, timeMicros);
@@ -28,6 +32,7 @@ TEST(TimeTest, testBasics)
MilliSecTime timeMillis2 = timeMicros2.getMillis();
EXPECT_GT(timeMillis2, timeMillis);
EXPECT_EQ(uint64_t(1050), timeMillis2.getTime());
+ EXPECT_EQ(timeSec, timeMillis2.getSeconds());
}
TEST(TimeTest, testCreatedFromClock)
@@ -35,6 +40,7 @@ TEST(TimeTest, testCreatedFromClock)
defaultimplementation::FakeClock clock;
clock.setAbsoluteTimeInSeconds(600);
+ EXPECT_EQ(SecondTime(600), SecondTime(clock));
EXPECT_EQ(MilliSecTime(600 * 1000), MilliSecTime(clock));
EXPECT_EQ(MicroSecTime(600 * 1000 * 1000), MicroSecTime(clock));
}
@@ -45,6 +51,7 @@ TEST(TimeTest, canAssignMicrosecondResolutionTimeToFakeClock)
clock.setAbsoluteTimeInMicroSeconds(1234567); // 1.234567 seconds
// All non-microsec time points must necessarily be truncated.
+ EXPECT_EQ(SecondTime(1), SecondTime(clock));
EXPECT_EQ(MilliSecTime(1234), MilliSecTime(clock));
EXPECT_EQ(MicroSecTime(1234567), MicroSecTime(clock));
}
diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp
index 08355c105d5..47d70cf436e 100644
--- a/storage/src/tests/storageserver/statereportertest.cpp
+++ b/storage/src/tests/storageserver/statereportertest.cpp
@@ -25,8 +25,8 @@ namespace storage {
class DummyApplicationGenerationFether : public ApplicationGenerationFetcher {
public:
- [[nodiscard]] int64_t getGeneration() const override { return 1; }
- [[nodiscard]] std::string getComponentName() const override { return "component"; }
+ int64_t getGeneration() const override { return 1; }
+ std::string getComponentName() const override { return "component"; }
};
struct StateReporterTest : Test {
@@ -54,8 +54,8 @@ struct MetricClock : public metrics::MetricManager::Timer
{
framework::Clock& _clock;
explicit MetricClock(framework::Clock& c) : _clock(c) {}
- [[nodiscard]] time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); }
- [[nodiscard]] time_t getTimeInMilliSecs() const override { return vespalib::count_ms(_clock.getSystemTime().time_since_epoch()); }
+ time_t getTime() const override { return _clock.getTimeInSeconds().getTime(); }
+ time_t getTimeInMilliSecs() const override { return _clock.getTimeInMillis().getTime(); }
};
}
@@ -245,8 +245,8 @@ TEST_F(StateReporterTest, report_metrics) {
"/state/v1/metrics?consumer=status"
};
- for (auto & path_str : paths) {
- framework::HttpUrlPath path(path_str);
+ for (int i = 0; i < pathCount; i++) {
+ framework::HttpUrlPath path(paths[i]);
std::ostringstream ost;
_stateReporter->reportStatus(ost, path);
std::string jsonData = ost.str();
diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h
index 4479b194396..c0c3b8429b2 100644
--- a/storage/src/tests/storageserver/testvisitormessagesession.h
+++ b/storage/src/tests/storageserver/testvisitormessagesession.h
@@ -32,7 +32,10 @@ public:
std::deque<std::unique_ptr<documentapi::DocumentMessage> > sentMessages;
- TestVisitorMessageSession(VisitorThread& t, Visitor& v, const mbus::Error& autoReplyError, bool autoReply);
+ TestVisitorMessageSession(VisitorThread& t,
+ Visitor& v,
+ const mbus::Error& autoReplyError,
+ bool autoReply);
void reply(mbus::Reply::UP rep);
uint32_t pending() override { return pendingCount; }
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index a82514acb03..be4e7270c69 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -217,7 +217,7 @@ VisitorManagerTest::getSession(uint32_t n)
// Wait until we have started the visitor
const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions);
framework::defaultimplementation::RealClock clock;
- vespalib::steady_time endTime = clock.getMonotonicTime() + 30s;
+ framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
while (true) {
{
std::lock_guard lock(_messageSessionFactory->_accessLock);
@@ -225,8 +225,9 @@ VisitorManagerTest::getSession(uint32_t n)
return *sessions[n];
}
}
- if (clock.getMonotonicTime() > endTime) {
- throw vespalib::IllegalStateException("Timed out waiting for visitor session", VESPA_STRLOC);
+ if (clock.getTimeInMillis() > endTime) {
+ throw vespalib::IllegalStateException(
+ "Timed out waiting for visitor session", VESPA_STRLOC);
}
std::this_thread::sleep_for(10ms);
}
@@ -254,10 +255,12 @@ VisitorManagerTest::getMessagesAndReply(
switch (session.sentMessages[i]->getType()) {
case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT:
- docs.push_back(static_cast<documentapi::PutDocumentMessage&>(*session.sentMessages[i]).getDocumentSP());
+ docs.push_back(static_cast<documentapi::PutDocumentMessage&>(
+ *session.sentMessages[i]).getDocumentSP());
break;
case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
- docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(*session.sentMessages[i]).getDocumentId());
+ docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(
+ *session.sentMessages[i]).getDocumentId());
break;
default:
break;
@@ -352,7 +355,10 @@ TEST_F(VisitorManagerTest, normal_usage) {
getMessagesAndReply(1, getSession(0), docs, docIds);
// All data has been replied to, expecting to get a create visitor reply
- ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK, int(docs.size()), getTotalSerializedSize(docs)));
+ ASSERT_NO_FATAL_FAILURE(
+ verifyCreateVisitorReply(api::ReturnCode::OK,
+ int(docs.size()),
+ getTotalSerializedSize(docs)));
EXPECT_EQ(1u, getMatchingDocuments(docs));
EXPECT_FALSE(_manager->hasPendingMessageState());
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index 565131b3b99..f3a538b7832 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -256,9 +256,11 @@ TestVisitorMessageSession&
VisitorTest::getSession(uint32_t n)
{
// Wait until we have started the visitor
- const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions);
+ const std::vector<TestVisitorMessageSession*>& sessions(
+ _messageSessionFactory->_visitorSessions);
framework::defaultimplementation::RealClock clock;
- vespalib::steady_time endTime = clock.getMonotonicTime() + 30s;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
while (true) {
{
std::lock_guard lock(_messageSessionFactory->_accessLock);
@@ -266,7 +268,7 @@ VisitorTest::getSession(uint32_t n)
return *sessions[n];
}
}
- if (clock.getMonotonicTime() > endTime) {
+ if (clock.getTimeInMillis() > endTime) {
throw vespalib::IllegalStateException(
"Timed out waiting for visitor session", VESPA_STRLOC);
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index cf98585bc82..9fafc87688f 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -279,8 +279,8 @@ void BucketManager::updateMinUsedBits()
// Responsible for sending on messages that was previously queued
void BucketManager::run(framework::ThreadHandle& thread)
{
- constexpr vespalib::duration CHECK_MINUSEDBITS_INTERVAL = 30s;
- vespalib::steady_time timeToCheckMinUsedBits = vespalib::steady_time::min();
+ const int64_t CHECK_MINUSEDBITS_INTERVAL = 1000*30;
+ framework::MilliSecTime timeToCheckMinUsedBits(0);
while (!thread.interrupted()) {
bool didWork = false;
BucketInfoRequestMap infoReqs;
@@ -305,9 +305,10 @@ void BucketManager::run(framework::ThreadHandle& thread)
thread.registerTick(framework::PROCESS_CYCLE);
}
}
- if (timeToCheckMinUsedBits < _component.getClock().getMonotonicTime()) {
+ if (timeToCheckMinUsedBits < _component.getClock().getTimeInMillis()) {
updateMinUsedBits();
- timeToCheckMinUsedBits = _component.getClock().getMonotonicTime() + CHECK_MINUSEDBITS_INTERVAL;
+ timeToCheckMinUsedBits = _component.getClock().getTimeInMillis();
+ timeToCheckMinUsedBits += framework::MilliSecTime(CHECK_MINUSEDBITS_INTERVAL);
}
}
}
diff --git a/storage/src/vespa/storage/common/statusmetricconsumer.cpp b/storage/src/vespa/storage/common/statusmetricconsumer.cpp
index e9360c35f3c..9ffb044b0a5 100644
--- a/storage/src/vespa/storage/common/statusmetricconsumer.cpp
+++ b/storage/src/vespa/storage/common/statusmetricconsumer.cpp
@@ -15,20 +15,32 @@ LOG_SETUP(".status.metricreporter");
namespace storage {
-StatusMetricConsumer::StatusMetricConsumer(StorageComponentRegister& compReg, metrics::MetricManager& manager, const std::string& name)
+StatusMetricConsumer::StatusMetricConsumer(
+ StorageComponentRegister& compReg, metrics::MetricManager& manager,
+ const std::string& name)
: framework::StatusReporter("metrics", "Performance metrics"),
_manager(manager),
_component(compReg, "statusmetricsconsumer"),
_name(name),
- _lock()
+ _lock(),
+ _startTime(_component.getClock().getTimeInSeconds()),
+ _processedTime(0)
{
LOG(debug, "Started metrics consumer");
setlocale(LC_NUMERIC, "");
+ _component.registerMetricUpdateHook(*this, 3600s);
_component.registerStatusPage(*this);
}
StatusMetricConsumer::~StatusMetricConsumer() = default;
+void
+StatusMetricConsumer::updateMetrics(const MetricLockGuard & guard)
+{
+ metrics::MemoryConsumption::UP mc(_manager.getMemoryConsumption(guard));
+ // TODO is this hook needed anymore?
+}
+
vespalib::string
StatusMetricConsumer::getReportContentType(const framework::HttpUrlPath& path) const
{
@@ -65,7 +77,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out,
} else {
LOG(debug, "Not calling update hooks as dontcallupdatehooks option has been given");
}
- int64_t currentTimeS(vespalib::count_s(_component.getClock().getMonotonicTime().time_since_epoch()));
+ framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
bool xml = (path.getAttribute("format") == "xml");
bool json = (path.getAttribute("format") == "json");
@@ -77,7 +89,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out,
if (path.hasAttribute("task") && path.getAttribute("task") == "reset") {
std::lock_guard guard(_lock);
- _manager.reset(currentTimeS);
+ _manager.reset(currentTime.getTime());
}
if (path.hasAttribute("interval")) {
@@ -88,7 +100,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out,
const metrics::MetricSnapshot* snapshot;
if (interval == -2) {
snapshot = &_manager.getActiveMetrics(metricLock);
- _manager.getActiveMetrics(metricLock).setToTime(currentTimeS);
+ _manager.getActiveMetrics(metricLock).setToTime(currentTime.getTime());
} else if (interval == -1) {
// "Prime" the metric structure by first fetching the set of active
// metrics (complete with structure) and resetting these. This
@@ -100,17 +112,19 @@ StatusMetricConsumer::reportStatus(std::ostream& out,
_manager.getActiveMetrics(metricLock).getMetrics(),
copyUnset);
generated->reset(0);
- _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTimeS);
- _manager.getActiveMetrics(metricLock).addToSnapshot(*generated, currentTimeS);
+ _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTime.getTime());
+ _manager.getActiveMetrics(metricLock).addToSnapshot(*generated, currentTime.getTime());
generated->setFromTime(_manager.getTotalMetricSnapshot(metricLock).getFromTime());
snapshot = generated.get();
} else if (interval == 0) {
if (copyUnset) {
generated = std::make_unique<metrics::MetricSnapshot>(
- _manager.getTotalMetricSnapshot(metricLock).getName(), 0,
- _manager.getActiveMetrics(metricLock).getMetrics(), true);
+ _manager.getTotalMetricSnapshot(metricLock).getName(),
+ 0,
+ _manager.getActiveMetrics(metricLock).getMetrics(),
+ true);
generated->reset(0);
- _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTimeS);
+ _manager.getTotalMetricSnapshot(metricLock).addToSnapshot(*generated, currentTime.getTime());
snapshot = generated.get();
} else {
snapshot = &_manager.getTotalMetricSnapshot(metricLock);
@@ -122,7 +136,7 @@ StatusMetricConsumer::reportStatus(std::ostream& out,
_manager.getActiveMetrics(metricLock).getMetrics(), true);
generated->reset(0);
_manager.getMetricSnapshot(metricLock, interval, temporarySnap)
- .addToSnapshot(*generated, currentTimeS);
+ .addToSnapshot(*generated, currentTime.getTime());
snapshot = generated.get();
} else {
snapshot = &_manager.getMetricSnapshot(metricLock, interval, temporarySnap);
diff --git a/storage/src/vespa/storage/common/statusmetricconsumer.h b/storage/src/vespa/storage/common/statusmetricconsumer.h
index b25c2d5db48..337c3ea7ff0 100644
--- a/storage/src/vespa/storage/common/statusmetricconsumer.h
+++ b/storage/src/vespa/storage/common/statusmetricconsumer.h
@@ -11,18 +11,27 @@
#include "storagecomponent.h"
#include <vespa/storageframework/generic/status/statusreporter.h>
-#include <vespa/vespalib/util/jsonstream.h>
+#include <vespa/storageframework/generic/metric/metricupdatehook.h>
+#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/metrics/metricmanager.h>
+#include <map>
namespace vespalib { class StringTokenizer; }
namespace metrics { class MetricManager; }
namespace storage {
+namespace framework { class MemoryToken; }
+
class StatusMetricConsumer : public framework::StatusReporter,
+ private framework::MetricUpdateHook,
private vespalib::JsonStreamTypes
{
public:
- StatusMetricConsumer(StorageComponentRegister&, metrics::MetricManager&, const std::string& name = "status");
+ StatusMetricConsumer(
+ StorageComponentRegister&,
+ metrics::MetricManager&,
+ const std::string& name = "status");
~StatusMetricConsumer() override;
// Metric reporting requires the "vespa.content.metrics_api" capability
@@ -31,11 +40,14 @@ public:
}
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
bool reportStatus(std::ostream& out, const framework::HttpUrlPath&) const override;
+ void updateMetrics(const MetricLockGuard & guard) override;
private:
metrics::MetricManager& _manager;
StorageComponent _component;
std::string _name;
mutable std::mutex _lock;
+ framework::SecondTime _startTime;
+ framework::SecondTime _processedTime;
};
} // storage
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 2f150cf7250..cb1c935082e 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -2,6 +2,7 @@
#include "blockingoperationstarter.h"
#include "distributor_bucket_space.h"
+#include "distributor_status.h"
#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
@@ -852,7 +853,7 @@ DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& ne
// Note: this assumes that std::chrono::system_clock and the framework
// system clock have the same epoch, which should be a reasonable
// assumption.
- TimePoint now = _component.getClock().getSystemTime();
+ const auto now = TimePoint(std::chrono::milliseconds(_component.getClock().getTimeInMillis().getTime()));
_externalOperationHandler.rejectFeedBeforeTimeReached(_ownershipSafeTimeCalc->safeTimePoint(now));
}
_bucketDBUpdater.handle_activated_cluster_state_bundle(); // Triggers resending of queued requests
diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp
index 7b7c9f431f7..81512393c5b 100644
--- a/storage/src/vespa/storage/distributor/operationowner.cpp
+++ b/storage/src/vespa/storage/distributor/operationowner.cpp
@@ -31,7 +31,7 @@ OperationOwner::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
std::shared_ptr<Operation> cb = _sentMessageMap.pop(reply->getMsgId());
- if (cb) {
+ if (cb.get() != 0) {
Sender sender(*this, _sender, cb);
cb->receive(sender, reply);
return true;
@@ -41,11 +41,13 @@ OperationOwner::handleReply(const std::shared_ptr<api::StorageReply>& reply)
}
bool
-OperationOwner::start(const std::shared_ptr<Operation>& operation, Priority)
+OperationOwner::start(const std::shared_ptr<Operation>& operation,
+ Priority priority)
{
+ (void) priority;
LOG(spam, "Starting operation %s", operation->toString().c_str());
Sender sender(*this, _sender, operation);
- operation->start(sender, _clock.getSystemTime());
+ operation->start(sender, _clock.getTimeInMillis());
return true;
}
@@ -61,7 +63,7 @@ OperationOwner::onClose()
while (true) {
std::shared_ptr<Operation> cb = _sentMessageMap.pop();
- if (cb) {
+ if (cb.get()) {
Sender sender(*this, _sender, std::shared_ptr<Operation>());
cb->onClose(sender);
} else {
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 55fe2e039e1..2acd6068e1a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -203,7 +203,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorStripeMessageSender& sen
(_node_ctx, _op_ctx, _bucketSpace, _updateCmd, std::move(entries), _updateMetric);
UpdateOperation & op = *updateOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
- op.start(intermediate, _node_ctx.clock().getSystemTime());
+ op.start(intermediate, _node_ctx.clock().getTimeInMillis());
transitionTo(SendState::UPDATES_SENT);
if (intermediate._reply.get()) {
@@ -223,7 +223,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorStripeMessageSender& sen
GetOperation& op = *get_operation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(get_operation), sender);
_replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time
- op.start(intermediate, _node_ctx.clock().getSystemTime());
+ op.start(intermediate, _node_ctx.clock().getTimeInMillis());
transitionTo(_use_initial_cheap_metadata_fetch_phase
? SendState::METADATA_GETS_SENT
@@ -322,7 +322,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric);
PutOperation & op = *putOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender);
- op.start(intermediate, _node_ctx.clock().getSystemTime());
+ op.start(intermediate, _node_ctx.clock().getTimeInMillis());
transitionTo(SendState::PUTS_SENT);
LOG(debug, "Update(%s): sending Puts at timestamp %" PRIu64, update_doc_id().c_str(), putTimestamp);
@@ -601,7 +601,8 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
_bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries);
std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now;
- for (const auto & e : entries) {
+ for (uint32_t j = 0; j < entries.size(); ++j) {
+ const auto& e = entries[j];
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
const auto& copy = e->getNodeRef(i);
replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 667afbf67a0..6aa243d5e99 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -13,8 +13,6 @@
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.operation.idealstate.merge");
-using vespalib::to_utc;
-using vespalib::to_string;
namespace storage::distributor {
MergeOperation::~MergeOperation() = default;
@@ -25,7 +23,7 @@ MergeOperation::getStatus() const
return
Operation::getStatus() +
vespalib::make_string(" . Sent MergeBucketCommand at %s",
- to_string(to_utc(_sentMessageTime)).c_str());
+ _sentMessageTime.toString().c_str());
}
void
@@ -35,11 +33,11 @@ MergeOperation::addIdealNodes(
std::vector<MergeMetaData>& result)
{
// Add all ideal nodes first. These are never marked source-only.
- for (unsigned short idealNode : idealNodes) {
+ for (uint32_t i = 0; i < idealNodes.size(); i++) {
const MergeMetaData* entry = nullptr;
- for (const auto & node : nodes) {
- if (idealNode == node._nodeIndex) {
- entry = &node;
+ for (uint32_t j = 0; j < nodes.size(); j++) {
+ if (idealNodes[i] == nodes[j]._nodeIndex) {
+ entry = &nodes[j];
break;
}
}
@@ -52,20 +50,21 @@ MergeOperation::addIdealNodes(
}
void
-MergeOperation::addCopiesNotAlreadyAdded(uint16_t redundancy,
- const std::vector<MergeMetaData>& nodes,
- std::vector<MergeMetaData>& result)
+MergeOperation::addCopiesNotAlreadyAdded(
+ uint16_t redundancy,
+ const std::vector<MergeMetaData>& nodes,
+ std::vector<MergeMetaData>& result)
{
- for (auto node : nodes) {
+ for (uint32_t i = 0; i < nodes.size(); i++) {
bool found = false;
- for (const auto & mergeData : result) {
- if (mergeData._nodeIndex == node._nodeIndex) {
+ for (uint32_t j = 0; j < result.size(); j++) {
+ if (result[j]._nodeIndex == nodes[i]._nodeIndex) {
found = true;
}
}
if (!found) {
- result.push_back(node);
+ result.push_back(nodes[i]);
result.back()._sourceOnly = (result.size() > redundancy);
}
}
@@ -79,7 +78,8 @@ MergeOperation::generateSortedNodeList(
MergeLimiter& limiter,
std::vector<MergeMetaData>& nodes)
{
- std::vector<uint16_t> idealNodes(distribution.getIdealStorageNodes(state, bucketId, "ui"));
+ std::vector<uint16_t> idealNodes(
+ distribution.getIdealStorageNodes(state, bucketId, "ui"));
std::vector<MergeMetaData> result;
const uint16_t redundancy = distribution.getRedundancy();
@@ -123,25 +123,31 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
std::vector<std::unique_ptr<BucketCopy> > newCopies;
std::vector<MergeMetaData> nodes;
- for (unsigned short node : getNodes()) {
- const BucketCopy* copy = entry->getNode(node);
+ for (uint32_t i = 0; i < getNodes().size(); ++i) {
+ const BucketCopy* copy = entry->getNode(getNodes()[i]);
if (copy == nullptr) { // New copies?
- newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, node)));
+ newCopies.emplace_back(std::make_unique<BucketCopy>(BucketCopy::recentlyCreatedCopy(0, getNodes()[i])));
copy = newCopies.back().get();
}
- nodes.emplace_back(node, *copy);
+ nodes.emplace_back(getNodes()[i], *copy);
}
_infoBefore = entry.getBucketInfo();
- generateSortedNodeList(_bucketSpace->getDistribution(), clusterState, getBucketId(), _limiter, nodes);
+ generateSortedNodeList(_bucketSpace->getDistribution(),
+ clusterState,
+ getBucketId(),
+ _limiter,
+ nodes);
for (const auto& node : nodes) {
_mnodes.emplace_back(node._nodeIndex, node._sourceOnly);
}
if (_mnodes.size() > 1) {
- auto msg = std::make_shared<api::MergeBucketCommand>(getBucket(), _mnodes,
- _manager->operation_context().generate_unique_timestamp(),
- clusterState.getVersion());
+ auto msg = std::make_shared<api::MergeBucketCommand>(
+ getBucket(),
+ _mnodes,
+ _manager->operation_context().generate_unique_timestamp(),
+ clusterState.getVersion());
const bool may_send_unordered = (_manager->operation_context().distributor_config().use_unordered_merge_chaining()
&& all_involved_nodes_support_unordered_merge_chaining());
if (!may_send_unordered) {
@@ -163,7 +169,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
sender.sendToNode(lib::NodeType::STORAGE, _mnodes[0].index, msg);
- _sentMessageTime = _manager->node_context().clock().getMonotonicTime();
+ _sentMessageTime = _manager->node_context().clock().getTimeInSeconds();
} else {
LOGBP(debug,
"Unable to merge bucket %s, since only one copy is available. System state %s",
@@ -178,20 +184,28 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge(
const BucketDatabase::Entry& currentState) const
{
assert(currentState.valid());
- for (auto mnode : _mnodes) {
- const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index));
+ for (size_t i = 0; i < _mnodes.size(); ++i) {
+ const BucketCopy* copyBefore(_infoBefore.getNode(_mnodes[i].index));
if (!copyBefore) {
continue;
}
- const BucketCopy* copyAfter(currentState->getNode(mnode.index));
+ const BucketCopy* copyAfter(currentState->getNode(_mnodes[i].index));
if (!copyAfter) {
LOG(debug, "Copy of %s on node %u removed during merge. Was %s",
- getBucketId().toString().c_str(), mnode.index, copyBefore->toString().c_str());
+ getBucketId().toString().c_str(),
+ _mnodes[i].index,
+ copyBefore->toString().c_str());
continue;
}
- if (mnode.sourceOnly && !copyBefore->consistentWith(*copyAfter)){
- LOG(debug, "Source-only copy of %s on node %u changed from %s to %s during the course of the merge. Failing it.",
- getBucketId().toString().c_str(), mnode.index, copyBefore->toString().c_str(), copyAfter->toString().c_str());
+ if (_mnodes[i].sourceOnly
+ && !copyBefore->consistentWith(*copyAfter))
+ {
+ LOG(debug, "Source-only copy of %s on node %u changed from "
+ "%s to %s during the course of the merge. Failing it.",
+ getBucketId().toString().c_str(),
+ _mnodes[i].index,
+ copyBefore->toString().c_str(),
+ copyAfter->toString().c_str());
return true;
}
}
@@ -206,22 +220,25 @@ MergeOperation::deleteSourceOnlyNodes(
{
assert(currentState.valid());
std::vector<uint16_t> sourceOnlyNodes;
- for (auto & mnode : _mnodes) {
- const uint16_t nodeIndex = mnode.index;
+ for (uint32_t i = 0; i < _mnodes.size(); ++i) {
+ const uint16_t nodeIndex = _mnodes[i].index;
const BucketCopy* copy = currentState->getNode(nodeIndex);
if (!copy) {
continue; // No point in deleting what's not even there now.
}
- if (mnode.sourceOnly) {
+ if (_mnodes[i].sourceOnly) {
sourceOnlyNodes.push_back(nodeIndex);
}
}
LOG(debug, "Attempting to delete %zu source only copies for %s",
- sourceOnlyNodes.size(), getBucketId().toString().c_str());
+ sourceOnlyNodes.size(),
+ getBucketId().toString().c_str());
if (!sourceOnlyNodes.empty()) {
- _removeOperation = std::make_unique<RemoveBucketOperation>(_manager->node_context(), BucketAndNodes(getBucket(), sourceOnlyNodes));
+ _removeOperation = std::make_unique<RemoveBucketOperation>(
+ _manager->node_context(),
+ BucketAndNodes(getBucket(), sourceOnlyNodes));
// Must not send removes to source only copies if something has caused
// pending load to the copy after the merge was sent!
if (_removeOperation->isBlocked(_manager->operation_context(), sender.operation_sequencer())) {
@@ -251,7 +268,8 @@ MergeOperation::deleteSourceOnlyNodes(
}
void
-MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
+MergeOperation::onReceive(DistributorStripeMessageSender& sender,
+ const std::shared_ptr<api::StorageReply> & msg)
{
if (_removeOperation) {
if (_removeOperation->onReceiveInternal(msg)) {
@@ -269,14 +287,18 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::sha
}
auto& reply = dynamic_cast<api::MergeBucketReply&>(*msg);
- LOG(debug, "Merge operation for bucket %s finished", getBucketId().toString().c_str());
+ LOG(debug,
+ "Merge operation for bucket %s finished",
+ getBucketId().toString().c_str());
api::ReturnCode result = reply.getResult();
_ok = result.success();
if (_ok) {
- BucketDatabase::Entry entry(_bucketSpace->getBucketDatabase().get(getBucketId()));
+ BucketDatabase::Entry entry(
+ _bucketSpace->getBucketDatabase().get(getBucketId()));
if (!entry.valid()) {
- LOG(debug, "Bucket %s no longer exists after merge", getBucketId().toString().c_str());
+ LOG(debug, "Bucket %s no longer exists after merge",
+ getBucketId().toString().c_str());
done(); // Nothing more we can do.
return;
}
@@ -293,8 +315,11 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::sha
return;
} else if (result.isBusy()) {
} else if (result.isCriticalForMaintenance()) {
- LOGBP(warning, "Merging failed for %s: %s with error '%s'",
- getBucketId().toString().c_str(), msg->toString().c_str(), msg->getResult().toString().c_str());
+ LOGBP(warning,
+ "Merging failed for %s: %s with error '%s'",
+ getBucketId().toString().c_str(),
+ msg->toString().c_str(),
+ msg->getResult().toString().c_str());
} else {
LOG(debug, "Merge failed for %s with non-critical failure: %s",
getBucketId().toString().c_str(), result.toString().c_str());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 44449633559..5416df3a43d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -20,7 +20,7 @@ class MergeOperation : public IdealStateOperation
protected:
bool sourceOnlyCopyChangedDuringMerge(const BucketDatabase::Entry&) const;
- vespalib::steady_time _sentMessageTime;
+ framework::SecondTime _sentMessageTime;
std::vector<api::MergeBucketCommand::Node> _mnodes;
std::unique_ptr<RemoveBucketOperation> _removeOperation;
BucketInfo _infoBefore;
@@ -30,7 +30,7 @@ public:
MergeOperation(const BucketAndNodes& nodes, uint16_t maxNodes = 16)
: IdealStateOperation(nodes),
- _sentMessageTime(),
+ _sentMessageTime(0),
_limiter(maxNodes)
{}
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index 4d82de170ae..a48fb53a7ce 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -12,7 +12,7 @@ LOG_SETUP(".distributor.callback");
namespace storage::distributor {
Operation::Operation()
- : _startTime()
+ : _startTime(0)
{
}
@@ -21,23 +21,19 @@ Operation::~Operation() = default;
std::string
Operation::getStatus() const
{
- return vespalib::make_string("%s (started %s)", getName(), vespalib::to_string(_startTime).c_str());
+ return vespalib::make_string("%s (started %s)",
+ getName(), _startTime.toString().c_str());
}
void
-Operation::start(DistributorStripeMessageSender& sender, vespalib::system_time startTime)
+Operation::start(DistributorStripeMessageSender& sender,
+ framework::MilliSecTime startTime)
{
_startTime = startTime;
onStart(sender);
}
void
-Operation::start(DistributorStripeMessageSender& sender)
-{
- start(sender, vespalib::system_time());
-}
-
-void
Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCommand& target)
{
target.getTrace().setLevel(source.getTrace().getLevel());
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index 8bb81b8d365..e24aa976221 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -45,25 +45,24 @@ public:
onReceive(sender, msg);
}
- [[nodiscard]] virtual const char* getName() const noexcept = 0;
+ virtual const char* getName() const noexcept = 0;
- [[nodiscard]] virtual std::string getStatus() const;
+ virtual std::string getStatus() const;
- [[nodiscard]] virtual std::string toString() const {
+ virtual std::string toString() const {
return std::string(getName());
}
/**
Starts the callback, sending any messages etc. Sets _startTime to current time
*/
- virtual void start(DistributorStripeMessageSender& sender, vespalib::system_time startTime);
- void start(DistributorStripeMessageSender& sender);
+ virtual void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime);
/**
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
- [[nodiscard]] virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const {
+ virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const {
return false;
}
@@ -78,6 +77,11 @@ public:
virtual void on_throttled();
/**
+ Returns the timestamp on which the first message was sent from this callback.
+ */
+ framework::MilliSecTime getStartTime() const { return _startTime; }
+
+ /**
Transfers message settings such as priority, timeout, etc. from one message to another.
*/
static void copyMessageSettings(const api::StorageCommand& source,
@@ -93,7 +97,7 @@ private:
const std::shared_ptr<api::StorageReply> & msg) = 0;
protected:
- vespalib::system_time _startTime;
+ framework::MilliSecTime _startTime;
};
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index c86254cb69a..c03b211d1aa 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -10,6 +10,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/xmlstream.hpp>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <climits>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".pendingclusterstate");
@@ -249,7 +250,9 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request
api::ReturnCode result(reply->getResult());
if (!result.success()) {
- _delayedRequests.emplace_back(_clock.getMonotonicTime() + 100ms, bucketSpaceAndNode);
+ framework::MilliSecTime resendTime(_clock);
+ resendTime += framework::MilliSecTime(100);
+ _delayedRequests.emplace_back(resendTime, bucketSpaceAndNode);
_sentMessages.erase(iter);
update_reply_failure_statistics(result, bucketSpaceAndNode);
return true;
@@ -270,9 +273,9 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request
void
PendingClusterState::resendDelayedMessages() {
if (_delayedRequests.empty()) return; // Don't fetch time if not needed
- vespalib::steady_time currentTime = _clock.getMonotonicTime();
+ framework::MilliSecTime currentTime(_clock);
while (!_delayedRequests.empty()
- && (currentTime >= _delayedRequests.front().first))
+ && currentTime >= _delayedRequests.front().first)
{
requestNode(_delayedRequests.front().second);
_delayedRequests.pop_front();
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 8af08e1ba4d..24b31e45cbb 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -193,7 +193,8 @@ private:
struct BucketSpaceAndNode {
document::BucketSpace bucketSpace;
uint16_t node;
- BucketSpaceAndNode(document::BucketSpace bucketSpace_, uint16_t node_)
+ BucketSpaceAndNode(document::BucketSpace bucketSpace_,
+ uint16_t node_)
: bucketSpace(bucketSpace_),
node(node_)
{
@@ -217,7 +218,7 @@ private:
void update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply);
using SentMessages = std::map<uint64_t, BucketSpaceAndNode>;
- using DelayedRequests = std::deque<std::pair<vespalib::steady_time , BucketSpaceAndNode>>;
+ using DelayedRequests = std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode>>;
using PendingTransitions = std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash>;
using NodeFeatures = vespalib::hash_map<uint16_t, NodeSupportedFeatures>;
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index 8618d570685..533493a79a2 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -3,6 +3,7 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <map>
+#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".pendingmessages");
@@ -14,7 +15,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr, u
vespalib::make_string("Pending messages to storage nodes (stripe %u)", stripe_index)),
_component(cr, "pendingmessagetracker"),
_nodeInfo(_component.getClock()),
- _nodeBusyDuration(60s),
+ _nodeBusyDuration(60),
_deferred_read_tasks(),
_lock()
{
@@ -37,7 +38,7 @@ vespalib::string
PendingMessageTracker::MessageEntry::toHtml() const {
vespalib::asciistream ss;
ss << "<li><i>Node " << nodeIdx << "</i>: "
- << "<b>" << vespalib::to_string(timeStamp) << "</b> "
+ << "<b>" << framework::MilliSecTime(timeStamp.count()).toString() << "</b> "
<< api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n";
return ss.str();
}
@@ -45,7 +46,7 @@ PendingMessageTracker::MessageEntry::toHtml() const {
PendingMessageTracker::TimePoint
PendingMessageTracker::currentTime() const
{
- return _component.getClock().getSystemTime();
+ return TimePoint(_component.getClock().getTimeInMillis().getTime());
}
namespace {
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index fb672d5ee31..93238b5a83f 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -68,7 +68,13 @@ public:
virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0;
};
- using TimePoint = vespalib::system_time;
+ /**
+ * Time point represented as the millisecond interval from the framework
+ * clock's epoch to a given point in time. Note that it'd be more
+ * semantically correct to use std::chrono::time_point, but it is bound
+ * to specific chrono clock types, their epochs and duration resolution.
+ */
+ using TimePoint = std::chrono::milliseconds;
PendingMessageTracker(framework::ComponentRegister&, uint32_t stripe_index);
~PendingMessageTracker() override;
@@ -113,8 +119,8 @@ public:
*/
std::vector<uint64_t> clearMessagesForNode(uint16_t node);
- void setNodeBusyDuration(vespalib::duration duration) noexcept {
- _nodeBusyDuration = duration;
+ void setNodeBusyDuration(std::chrono::seconds secs) noexcept {
+ _nodeBusyDuration = secs;
}
void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task);
@@ -130,7 +136,7 @@ private:
MessageEntry(TimePoint timeStamp, uint32_t msgType, uint32_t priority,
uint64_t msgId, document::Bucket bucket, uint16_t nodeIdx) noexcept;
- [[nodiscard]] vespalib::string toHtml() const;
+ vespalib::string toHtml() const;
};
struct MessageIdKey : boost::multi_index::member<MessageEntry, uint64_t, &MessageEntry::msgId> {};
@@ -181,7 +187,7 @@ private:
Messages _messages;
framework::Component _component;
NodeInfo _nodeInfo;
- vespalib::duration _nodeBusyDuration;
+ std::chrono::seconds _nodeBusyDuration;
DeferredBucketTaskMap _deferred_read_tasks;
// Since distributor is currently single-threaded, this will only
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
index f69f9e3d427..5a584f7c332 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
@@ -440,7 +440,9 @@ StripeBucketDBUpdater::handleSingleBucketInfoFailure(
req.targetNode, repl->getResult().toString().c_str());
if (req.bucket.getBucketId() != document::BucketId(0)) {
- _delayedRequests.emplace_back(_node_ctx.clock().getMonotonicTime() + 100ms, req);
+ framework::MilliSecTime sendTime(_node_ctx.clock());
+ sendTime += framework::MilliSecTime(100);
+ _delayedRequests.emplace_back(sendTime, req);
}
}
@@ -450,7 +452,7 @@ StripeBucketDBUpdater::resendDelayedMessages()
if (_delayedRequests.empty()) {
return; // Don't fetch time if not needed
}
- vespalib::steady_time currentTime(_node_ctx.clock().getMonotonicTime());
+ framework::MilliSecTime currentTime(_node_ctx.clock());
while (!_delayedRequests.empty()
&& currentTime >= _delayedRequests.front().first)
{
@@ -642,7 +644,7 @@ void
StripeBucketDBUpdater::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const
{
for (const auto& entry : _delayedRequests) {
- entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", vespalib::count_ms(vespalib::to_utc(entry.first).time_since_epoch())));
+ entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime()));
}
}
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 6339283f963..2f6e665be14 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -231,7 +231,7 @@ private:
using DbGuards = std::unordered_map<document::BucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard>,
document::BucketSpace::hash>;
- using DelayedRequestsQueue = std::deque<std::pair<vespalib::steady_time, BucketRequest>>;
+ using DelayedRequestsQueue = std::deque<std::pair<framework::MilliSecTime, BucketRequest>>;
const DistributorNodeContext& _node_ctx;
DistributorStripeOperationContext& _op_ctx;
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
index 8b6ade7e7d1..a0613c60fa4 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
@@ -13,9 +13,9 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow
class ThrottlingOperation : public Operation
{
public:
- ThrottlingOperation(Operation::SP operation,
+ ThrottlingOperation(const Operation::SP& operation,
ThrottlingOperationStarter& operationStarter)
- : _operation(std::move(operation)),
+ : _operation(operation),
_operationStarter(operationStarter)
{}
@@ -30,21 +30,24 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow
void onClose(DistributorStripeMessageSender& sender) override {
_operation->onClose(sender);
}
- [[nodiscard]] const char* getName() const noexcept override {
+ const char* getName() const noexcept override {
return _operation->getName();
}
- [[nodiscard]] std::string getStatus() const override {
+ std::string getStatus() const override {
return _operation->getStatus();
}
- [[nodiscard]] std::string toString() const override {
+ std::string toString() const override {
return _operation->toString();
}
- void start(DistributorStripeMessageSender& sender, vespalib::system_time startTime) override {
+ void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime) override {
_operation->start(sender, startTime);
}
void receive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override {
_operation->receive(sender, msg);
}
+ framework::MilliSecTime getStartTime() const {
+ return _operation->getStartTime();
+ }
void onStart(DistributorStripeMessageSender&) override {
// Should never be called directly on the throttled operation
// instance, but rather on its wrapped implementation.
@@ -58,7 +61,7 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow
OperationStarter& _starterImpl;
public:
- explicit ThrottlingOperationStarter(OperationStarter& starterImpl)
+ ThrottlingOperationStarter(OperationStarter& starterImpl)
: _starterImpl(starterImpl),
_minPending(0),
_maxPending(UINT32_MAX),
@@ -68,9 +71,9 @@ public:
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
- [[nodiscard]] bool may_allow_operation_with_priority(Priority priority) const noexcept override;
+ bool may_allow_operation_with_priority(Priority priority) const noexcept override;
- [[nodiscard]] bool canStart(uint32_t currentOperationCount, Priority priority) const;
+ bool canStart(uint32_t currentOperationCount, Priority priority) const;
void setMaxPendingRange(uint32_t minPending, uint32_t maxPending) {
_minPending = minPending;
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
index b0702ac7bf0..54dedbebbfe 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
@@ -531,7 +531,8 @@ TopLevelDistributor::on_cluster_state_bundle_activated(const lib::ClusterStateBu
}
if (has_bucket_ownership_transfer && _maintenance_safe_time_delay.count() > 0) {
OwnershipTransferSafeTimePointCalculator safe_time_calc(_maintenance_safe_time_delay);
- const auto now = _component.getClock().getSystemTime();
+ using TimePoint = OwnershipTransferSafeTimePointCalculator::TimePoint;
+ const auto now = TimePoint(std::chrono::milliseconds(_component.getClock().getTimeInMillis().getTime()));
_maintenance_safe_time_point = safe_time_calc.safeTimePoint(now);
// All stripes are in a waiting pattern and will observe this on their next tick.
// Memory visibility enforced by all stripes being held under a mutex by our caller.
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ec22d7c064e..0680c10ab29 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -9,6 +9,7 @@
#include <vespa/slobrok/sbmirror.h>
#include <vespa/storage/common/bucket_resolver.h>
#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/storageserver/configurable_bucket_resolver.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h>
@@ -48,14 +49,13 @@ CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageRepl
}
namespace {
+ vespalib::string getNodeId(StorageComponent& sc) {
+ vespalib::asciistream ost;
+ ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex();
+ return ost.str();
+ }
-vespalib::string getNodeId(StorageComponent& sc) {
- vespalib::asciistream ost;
- ost << sc.cluster_context().cluster_name() << "/" << sc.getNodeType() << "/" << sc.getIndex();
- return ost.str();
-}
-
-vespalib::duration TEN_MINUTES = 600s;
+ framework::SecondTime TEN_MINUTES(600);
}
@@ -151,7 +151,8 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply)
std::shared_ptr<api::StorageCommand> originalCommand;
{
std::lock_guard lock(_messageBusSentLock);
- auto iter(_messageBusSent.find(reply->getContext().value.UINT64));
+ using MessageMap = std::map<api::StorageMessage::Id, api::StorageCommand::SP>;
+ MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64));
if (iter != _messageBusSent.end()) {
originalCommand.swap(iter->second);
_messageBusSent.erase(iter);
@@ -192,13 +193,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space(
namespace {
struct PlaceHolderBucketResolver : public BucketResolver {
- [[nodiscard]] document::Bucket bucketFromId(const document::DocumentId &) const override {
- return {FixedBucketSpaces::default_space(), document::BucketId(0)};
+ document::Bucket bucketFromId(const document::DocumentId &) const override {
+ return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0));
}
- [[nodiscard]] document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
+ document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
return FixedBucketSpaces::default_space();
}
- [[nodiscard]] vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
+ vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
assert(bucketSpace == FixedBucketSpaces::default_space());
return FixedBucketSpaces::to_string(bucketSpace);
}
@@ -437,7 +438,7 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- process(msg);
+ process(std::move(msg));
}
void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) {
@@ -450,7 +451,7 @@ CommunicationManager::onUp(const std::shared_ptr<api::StorageMessage> & msg)
{
MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString());
if (msg->getType().isReply()) {
- const auto & m = static_cast<const api::StorageReply&>(*msg);
+ const api::StorageReply & m = static_cast<const api::StorageReply&>(*msg);
if (m.getResult().failed()) {
LOG(debug, "Request %s failed: %s", msg->getType().toString().c_str(), m.getResult().toString().c_str());
}
@@ -603,7 +604,7 @@ CommunicationManager::sendDirectRPCReply(
request.addReturnString(m.data(), m.size());
if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) {
- auto& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, false);
request.addReturnString(ns.str().c_str());
@@ -692,9 +693,9 @@ CommunicationManager::run(framework::ThreadHandle& thread)
process(msg);
}
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
- for (auto it(_earlierGenerations.begin());
+ for (EarlierProtocols::iterator it(_earlierGenerations.begin());
!_earlierGenerations.empty() &&
- ((it->first + TEN_MINUTES) < _component.getClock().getMonotonicTime());
+ ((it->first + TEN_MINUTES) < _component.getClock().getTimeInSeconds());
it = _earlierGenerations.begin())
{
_earlierGenerations.erase(it);
@@ -717,10 +718,10 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string&
void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo>& repo) {
if (_mbus) {
- vespalib::steady_time now(_component.getClock().getMonotonicTime());
+ framework::SecondTime now(_component.getClock().getTimeInSeconds());
auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo);
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
- _earlierGenerations.emplace_back(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol));
+ _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)));
}
if (_message_codec_provider) {
_message_codec_provider->update_atomically(repo);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index e83a6517c45..6f953411cac 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -72,6 +72,9 @@ class CommunicationManager final
public MessageDispatcher
{
private:
+ CommunicationManager(const CommunicationManager&);
+ CommunicationManager& operator=(const CommunicationManager&);
+
StorageComponent _component;
CommunicationManagerMetrics _metrics;
@@ -82,7 +85,7 @@ private:
Queue _eventQueue;
// XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
std::unique_ptr<config::ConfigFetcher> _configFetcher;
- using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>;
+ using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>;
using EarlierProtocols = std::vector<EarlierProtocol>;
std::mutex _earlierGenerationsLock;
EarlierProtocols _earlierGenerations;
@@ -123,8 +126,6 @@ private:
friend struct CommunicationManagerTest;
public:
- CommunicationManager(const CommunicationManager&) = delete;
- CommunicationManager& operator=(const CommunicationManager&) = delete;
CommunicationManager(StorageComponentRegister& compReg,
const config::ConfigUri & configUri);
~CommunicationManager() override;
diff --git a/storage/src/vespa/storage/storageserver/opslogger.cpp b/storage/src/vespa/storage/storageserver/opslogger.cpp
index e5785968eb1..03322cb55fd 100644
--- a/storage/src/vespa/storage/storageserver/opslogger.cpp
+++ b/storage/src/vespa/storage/storageserver/opslogger.cpp
@@ -77,7 +77,7 @@ OpsLogger::onPutReply(const std::shared_ptr<api::PutReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << vespalib::to_string(_component.getClock().getSystemTime())
+ ost << _component.getClock().getTimeInSeconds().getTime()
<< "\tPUT\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
@@ -94,7 +94,7 @@ OpsLogger::onUpdateReply(const std::shared_ptr<api::UpdateReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << vespalib::to_string(_component.getClock().getSystemTime())
+ ost << _component.getClock().getTimeInSeconds().getTime()
<< "\tUPDATE\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
@@ -111,7 +111,7 @@ OpsLogger::onRemoveReply(const std::shared_ptr<api::RemoveReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << vespalib::to_string(_component.getClock().getSystemTime())
+ ost << _component.getClock().getTimeInSeconds().getTime()
<< "\tREMOVE\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
@@ -128,7 +128,7 @@ OpsLogger::onGetReply(const std::shared_ptr<api::GetReply>& msg)
{
if (_targetFile == nullptr) return false;
std::ostringstream ost;
- ost << vespalib::to_string(_component.getClock().getSystemTime())
+ ost << _component.getClock().getTimeInSeconds().getTime()
<< "\tGET\t" << msg->getDocumentId() << "\t"
<< msg->getResult() << "\n";
{
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index 647cba52bfc..81961370ed3 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -9,14 +9,16 @@
#include <vespa/metrics/metricset.h>
#include <vespa/metrics/metrictimer.h>
#include <vespa/metrics/valuemetric.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/string_escape.h>
#include <vespa/vespalib/util/stringfmt.h>
+
#include <fstream>
-#include <ranges>
+#include <unistd.h>
#include <vespa/log/log.h>
LOG_SETUP(".state.manager");
@@ -69,7 +71,7 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_requested_almost_immediate_node_state_replies(false)
{
_nodeState->setMinUsedBits(58);
- _nodeState->setStartTimestamp(_component.getClock().getSystemTime());
+ _nodeState->setStartTimestamp(_component.getClock().getTimeInSeconds().getTime());
_component.registerStatusPage(*this);
_component.registerMetric(*_metrics);
}
@@ -133,9 +135,9 @@ StateManager::reportHtmlStatus(std::ostream& out,
<< "<h1>System state history</h1>\n"
<< "<table border=\"1\"><tr>"
<< "<th>Received at time</th><th>State</th></tr>\n";
- for (const auto & it : std::ranges::reverse_view(_systemStateHistory)) {
- out << "<tr><td>" << vespalib::to_string(vespalib::to_utc(it.first)) << "</td><td>"
- << xml_content_escaped(it.second->getBaselineClusterState()->toString()) << "</td></tr>\n";
+ for (auto it = _systemStateHistory.rbegin(); it != _systemStateHistory.rend(); ++it) {
+ out << "<tr><td>" << it->first << "</td><td>"
+ << xml_content_escaped(it->second->getBaselineClusterState()->toString()) << "</td></tr>\n";
}
out << "</table>\n";
}
@@ -144,7 +146,7 @@ StateManager::reportHtmlStatus(std::ostream& out,
lib::Node
StateManager::thisNode() const
{
- return { _component.getNodeType(), _component.getIndex() };
+ return lib::Node(_component.getNodeType(), _component.getIndex());
}
lib::NodeState::CSP
@@ -296,7 +298,7 @@ StateManager::enableNextClusterState()
_reported_host_info_cluster_state_version = _systemState->getVersion();
} // else: reported version updated upon explicit activation edge
_nextSystemState.reset();
- _systemStateHistory.emplace_back(_component.getClock().getMonotonicTime(), _systemState);
+ _systemStateHistory.emplace_back(_component.getClock().getTimeInMillis(), _systemState);
}
namespace {
@@ -390,7 +392,8 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd)
{
bool sentReply = false;
if (cmd->getSourceIndex() != 0xffff) {
- sentReply = sendGetNodeStateReplies(vespalib::steady_time::max(), cmd->getSourceIndex());
+ sentReply = sendGetNodeStateReplies(framework::MilliSecTime(0),
+ cmd->getSourceIndex());
}
std::shared_ptr<api::GetNodeStateReply> reply;
{
@@ -401,13 +404,16 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd)
&& (*cmd->getExpectedState() == *_nodeState || sentReply)
&& is_up_to_date)
{
- vespalib::duration timeout = cmd->getTimeout();
+ int64_t msTimeout = vespalib::count_ms(cmd->getTimeout());
LOG(debug, "Received get node state request with timeout of "
- "%f seconds. Scheduling to be answered in "
- "%f seconds unless a node state change "
+ "%" PRId64 " milliseconds. Scheduling to be answered in "
+ "%" PRId64 " milliseconds unless a node state change "
"happens before that time.",
- vespalib::to_s(timeout), vespalib::to_s(timeout)*0.8);
- TimeStateCmdPair pair(_component.getClock().getMonotonicTime() + timeout, cmd);
+ msTimeout, msTimeout * 800 / 1000);
+ TimeStateCmdPair pair(
+ _component.getClock().getTimeInMillis()
+ + framework::MilliSecTime(msTimeout * 800 / 1000),
+ cmd);
_queuedStateRequests.emplace_back(std::move(pair));
} else {
LOG(debug, "Answered get node state request right away since it "
@@ -491,14 +497,13 @@ StateManager::tick() {
bool almost_immediate_replies = _requested_almost_immediate_node_state_replies.load(std::memory_order_relaxed);
if (almost_immediate_replies) {
_requested_almost_immediate_node_state_replies.store(false, std::memory_order_relaxed);
- sendGetNodeStateReplies();
- } else {
- sendGetNodeStateReplies(_component.getClock().getMonotonicTime());
}
+ framework::MilliSecTime time(almost_immediate_replies ? framework::MilliSecTime(0) : _component.getClock().getTimeInMillis());
+ sendGetNodeStateReplies(time);
}
bool
-StateManager::sendGetNodeStateReplies(vespalib::steady_time olderThanTime, uint16_t node)
+StateManager::sendGetNodeStateReplies(framework::MilliSecTime olderThanTime, uint16_t node)
{
std::vector<std::shared_ptr<api::GetNodeStateReply>> replies;
{
@@ -506,8 +511,9 @@ StateManager::sendGetNodeStateReplies(vespalib::steady_time olderThanTime, uint1
for (auto it = _queuedStateRequests.begin(); it != _queuedStateRequests.end();) {
if (node != 0xffff && node != it->second->getSourceIndex()) {
++it;
- } else if (it->first < olderThanTime) {
- LOG(debug, "Sending reply to msg with id %" PRIu64, it->second->getMsgId());
+ } else if (!olderThanTime.isSet() || it->first < olderThanTime) {
+ LOG(debug, "Sending reply to msg with id %" PRIu64,
+ it->second->getMsgId());
replies.emplace_back(std::make_shared<api::GetNodeStateReply>(*it->second, *_nodeState));
auto eraseIt = it++;
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 3605a0b1605..74b59875ff8 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -42,8 +42,8 @@ class StateManager : public NodeStateUpdater,
private vespalib::JsonStreamTypes
{
using ClusterStateBundle = lib::ClusterStateBundle;
- using TimeStateCmdPair = std::pair<vespalib::steady_time, api::GetNodeStateCommand::SP>;
- using TimeSysStatePair = std::pair<vespalib::steady_time, std::shared_ptr<const ClusterStateBundle>>;
+ using TimeStateCmdPair = std::pair<framework::MilliSecTime, api::GetNodeStateCommand::SP>;
+ using TimeSysStatePair = std::pair<framework::MilliSecTime, std::shared_ptr<const ClusterStateBundle>>;
struct StateManagerMetrics;
@@ -109,7 +109,7 @@ private:
void notifyStateListeners();
bool sendGetNodeStateReplies(
- vespalib::steady_time olderThanTime = vespalib::steady_time::max(),
+ framework::MilliSecTime olderThanTime = framework::MilliSecTime(0),
uint16_t index = 0xffff);
void mark_controller_as_having_observed_explicit_node_state(const std::unique_lock<std::mutex> &, uint16_t controller_index);
diff --git a/storage/src/vespa/storage/storageserver/statereporter.cpp b/storage/src/vespa/storage/storageserver/statereporter.cpp
index 373cd186708..b2337ae1223 100644
--- a/storage/src/vespa/storage/storageserver/statereporter.cpp
+++ b/storage/src/vespa/storage/storageserver/statereporter.cpp
@@ -29,7 +29,9 @@ StateReporter::StateReporter(
_component.registerStatusPage(*this);
}
-StateReporter::~StateReporter() = default;
+StateReporter::~StateReporter()
+{
+}
vespalib::string
StateReporter::getReportContentType(
@@ -82,7 +84,7 @@ StateReporter::getMetrics(const vespalib::string &consumer)
snapshot.reset(0);
_manager.getMetricSnapshot(guard, interval).addToSnapshot(
- snapshot, vespalib::count_s(_component.getClock().getSystemTime().time_since_epoch()));
+ snapshot, _component.getClock().getTimeInSeconds().getTime());
vespalib::asciistream json;
vespalib::JsonStream stream(json);
@@ -104,7 +106,7 @@ StateReporter::getHealth() const
lib::NodeState cns(*_component.getStateUpdater().getCurrentNodeState());
bool up = cns.getState().oneOf("u");
std::string message = up ? "" : "Node state: " + cns.toString(true);
- return { up, message };
+ return vespalib::HealthProducer::Health(up, message);
}
void
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 5ece2a12f71..3987827a264 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -372,7 +372,7 @@ StorageNode::shutdown()
_chain->flush();
}
- if ( !_pidFile.empty() ) {
+ if (_pidFile != "") {
LOG(debug, "Removing pid file");
removePidFile(_pidFile);
}
@@ -510,8 +510,10 @@ StorageNode::updateMetrics(const MetricLockGuard &) {
}
void
-StorageNode::waitUntilInitialized(vespalib::duration timeout) {
- vespalib::steady_time doom = vespalib::steady_clock::now();
+StorageNode::waitUntilInitialized(uint32_t timeout) {
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(1000 * timeout));
while (true) {
{
NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
@@ -519,7 +521,7 @@ StorageNode::waitUntilInitialized(vespalib::duration timeout) {
if (nodeState.getState() == lib::State::UP) break;
}
std::this_thread::sleep_for(10ms);
- if (vespalib::steady_clock::now() >= doom) {
+ if (clock.getTimeInMillis() >= endTime) {
std::ostringstream ost;
ost << "Storage server not initialized after waiting timeout of "
<< timeout << " seconds.";
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 19b930c184f..0e420f206e2 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -78,7 +78,7 @@ public:
virtual const lib::NodeType& getNodeType() const = 0;
bool attemptedStopped() const;
void notifyDoneInitializing() override;
- void waitUntilInitialized(vespalib::duration timeout = 15s);
+ void waitUntilInitialized(uint32_t timeoutSeconds = 15);
void updateMetrics(const MetricLockGuard & guard) override;
/** Updates the document type repo. */
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index 6d36abc896e..91f304ad9a0 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -121,9 +121,12 @@ Visitor::VisitorTarget::metaForMessageId(uint64_t msgId)
void
Visitor::VisitorTarget::discardQueuedMessages()
{
- for (const auto & entry : _queuedMessages) {
- LOG(spam, "Erasing queued message with id %" PRIu64, entry.second);
- releaseMetaForMessageId(entry.second);
+ for (MessageQueue::iterator
+ it(_queuedMessages.begin()), e(_queuedMessages.end());
+ it != e; ++it)
+ {
+ LOG(spam, "Erasing queued message with id %" PRIu64, it->second);
+ releaseMetaForMessageId(it->second);
}
_queuedMessages.clear();
}
@@ -307,14 +310,17 @@ Visitor::getStateName(VisitorState s)
return "COMPLETED";
default:
assert(!"Unknown visitor state");
- return nullptr;
+ return NULL;
}
}
Visitor::VisitorState
Visitor::transitionTo(VisitorState newState)
{
- LOG(debug, "Visitor '%s' state transition %s -> %s", _id.c_str(), getStateName(_state), getStateName(newState));
+ LOG(debug, "Visitor '%s' state transition %s -> %s",
+ _id.c_str(),
+ getStateName(_state),
+ getStateName(newState));
VisitorState oldState = _state;
_state = newState;
return oldState;
@@ -333,10 +339,12 @@ Visitor::mayTransitionToCompleted() const
void
Visitor::forceClose()
{
- for (auto * state : _bucketStates) {
+ for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin();
+ it != _bucketStates.end(); ++it)
+ {
// Reset iterator id so no destroy iterator will be sent
- state->setIteratorId(spi::IteratorId(0));
- delete state;
+ (*it)->setIteratorId(spi::IteratorId(0));
+ delete *it;
}
_bucketStates.clear();
transitionTo(STATE_COMPLETED);
@@ -350,7 +358,7 @@ Visitor::sendReplyOnce()
std::shared_ptr<api::StorageReply> reply(_initiatingCmd->makeReply());
_hitCounter->updateVisitorStatistics(_visitorStatistics);
- dynamic_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics);
+ static_cast<api::CreateVisitorReply*>(reply.get())->setVisitorStatistics(_visitorStatistics);
if (shouldAddMbusTrace()) {
_trace.moveTraceTo(reply->getTrace());
}
@@ -365,15 +373,17 @@ void
Visitor::finalize()
{
if (_state != STATE_COMPLETED) {
- LOG(error, "Attempting to finalize non-completed visitor %s", _id.c_str());
+ LOG(error, "Attempting to finalize non-completed visitor %s",
+ _id.c_str());
assert(false);
}
assert(_bucketStates.empty());
if (_result.success()) {
- if (_messageSession->pending() > 0) {
+ if (_messageSession->pending() > 0)
+ {
_result = api::ReturnCode(api::ReturnCode::ABORTED);
- try {
+ try{
abortedVisiting();
} catch (std::exception& e) {
LOG(warning, "Visitor %s had a problem in abortVisiting(). As "
@@ -394,31 +404,43 @@ Visitor::finalize()
void
Visitor::discardAllNoPendingBucketStates()
{
- for (auto it = _bucketStates.begin(); it !=_bucketStates.end();) {
+ for (BucketStateList::iterator
+ it(_bucketStates.begin()), e(_bucketStates.end());
+ it != e;)
+ {
BucketIterationState& bstate(**it);
if (bstate.hasPendingControlCommand() || bstate.hasPendingIterators()) {
- LOG(debug, "Visitor '%s' not discarding bucket state %s since it has pending operations",
- _id.c_str(), bstate.toString().c_str());
+ LOG(debug,
+ "Visitor '%s' not discarding bucket state %s "
+ "since it has pending operations",
+ _id.c_str(),
+ bstate.toString().c_str());
++it;
continue;
}
- LOG(debug, "Visitor '%s' discarding bucket state %s", _id.c_str(), bstate.toString().c_str());
+ LOG(debug, "Visitor '%s' discarding bucket state %s",
+ _id.c_str(), bstate.toString().c_str());
delete *it;
it = _bucketStates.erase(it);
}
}
void
-Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError)
+Visitor::fail(const api::ReturnCode& reason,
+ bool overrideExistingError)
{
assert(_state != STATE_COMPLETED);
if (_result.getResult() < reason.getResult() || overrideExistingError) {
- LOG(debug, "Setting result of visitor '%s' to %s", _id.c_str(), reason.toString().c_str());
+ LOG(debug, "Setting result of visitor '%s' to %s",
+ _id.c_str(), reason.toString().c_str());
_result = reason;
}
if (_visitorTarget.hasQueuedMessages()) {
- LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s since visitor has failed",
- _id.c_str(), _visitorTarget._queuedMessages.size(), _controlDestination->toString().c_str());
+ LOG(debug, "Visitor '%s' dropping %zu queued messages bound to %s "
+ "since visitor has failed",
+ _id.c_str(),
+ _visitorTarget._queuedMessages.size(),
+ _controlDestination->toString().c_str());
_visitorTarget.discardQueuedMessages();
}
discardAllNoPendingBucketStates();
@@ -426,7 +448,8 @@ Visitor::fail(const api::ReturnCode& reason, bool overrideExistingError)
}
bool
-Visitor::shouldReportProblemToClient(const api::ReturnCode& code, size_t retryCount)
+Visitor::shouldReportProblemToClient(const api::ReturnCode& code,
+ size_t retryCount) const
{
// Report _once_ per message if we reach a certain retry threshold.
if (retryCount == TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY) {
@@ -498,7 +521,7 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId,
_visitorOptions._fromTime = fromTimestamp;
_visitorOptions._toTime = toTimestamp;
_currentBucket = 0;
- _hitCounter = std::make_unique<HitCounter>();
+ _hitCounter.reset(new HitCounter());
_messageSession = std::move(messageSession);
_documentPriority = documentPriority;
@@ -589,7 +612,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
uint64_t messageId = reply->getContext().value.UINT64;
uint32_t removed = _visitorTarget._pendingMessages.erase(messageId);
- LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(), reply->toString().c_str(), messageId);
+ LOG(spam, "Visitor '%s' reply %s for message ID %" PRIu64, _id.c_str(),
+ reply->toString().c_str(), messageId);
assert(removed == 1);
(void) removed;
@@ -610,16 +634,20 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
metrics.visitorDestinationFailureReplies.inc();
if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) {
- LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str());
- api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
- reply->getError(0).getMessage());
+ LOG(debug, "Aborting visitor as we failed to talk to controller: %s",
+ reply->getError(0).toString().c_str());
+ api::ReturnCode returnCode(
+ static_cast<api::ReturnCode::Result>(
+ reply->getError(0).getCode()),
+ reply->getError(0).getMessage());
fail(returnCode, true);
close();
return;
}
- api::ReturnCode returnCode(static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
- reply->getError(0).getMessage());
+ api::ReturnCode returnCode(
+ static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
+ reply->getError(0).getMessage());
const bool should_fail = remap_docapi_message_error_code(returnCode);
if (should_fail) {
// Abort - something is wrong with target.
@@ -629,7 +657,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
}
if (failed()) {
- LOG(debug, "Failed to send message from visitor '%s', due to %s. Not resending since visitor has failed",
+ LOG(debug, "Failed to send message from visitor '%s', due to "
+ "%s. Not resending since visitor has failed",
_id.c_str(), returnCode.toString().c_str());
return;
}
@@ -680,7 +709,8 @@ Visitor::onCreateIteratorReply(
if (reply->getResult().failed()) {
LOG(debug, "Failed to create iterator for bucket %s: %s",
- bucketId.toString().c_str(), reply->getResult().toString().c_str());
+ bucketId.toString().c_str(),
+ reply->getResult().toString().c_str());
fail(reply->getResult());
delete *it;
_bucketStates.erase((++it).base());
@@ -688,14 +718,17 @@ Visitor::onCreateIteratorReply(
}
bucketState.setIteratorId(reply->getIteratorId());
if (failed()) {
- LOG(debug, "Create iterator for bucket %s is OK, but visitor has failed: %s",
- bucketId.toString().c_str(), _result.toString().c_str());
+ LOG(debug, "Create iterator for bucket %s is OK, "
+ "but visitor has failed: %s",
+ bucketId.toString().c_str(),
+ _result.toString().c_str());
delete *it;
_bucketStates.erase((++it).base());
return;
}
- LOG(debug, "Visitor '%s' starting to visit bucket %s.", _id.c_str(), bucketId.toString().c_str());
+ LOG(debug, "Visitor '%s' starting to visit bucket %s.",
+ _id.c_str(), bucketId.toString().c_str());
auto cmd = std::make_shared<GetIterCommand>(bucket, bucketState.getIteratorId(), _docBlockSize);
cmd->getTrace().setLevel(_traceLevel);
cmd->setPriority(_priority);
@@ -704,10 +737,13 @@ Visitor::onCreateIteratorReply(
}
void
-Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThreadMetrics& metrics)
+Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply,
+ VisitorThreadMetrics& metrics)
{
LOG(debug, "Visitor '%s' got get iter reply for bucket %s: %s",
- _id.c_str(), reply->getBucketId().toString().c_str(), reply->getResult().toString().c_str());
+ _id.c_str(),
+ reply->getBucketId().toString().c_str(),
+ reply->getResult().toString().c_str());
auto it = _bucketStates.rbegin();
// New requests will be pushed on end of list.. So searching
@@ -727,8 +763,10 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThrea
!reply->getResult().isShutdownRelated() &&
!reply->getResult().isBucketDisappearance())
{
- LOG(warning, "Failed to talk to persistence layer for bucket %s. Aborting visitor '%s': %s",
- reply->getBucketId().toString().c_str(), _id.c_str(), reply->getResult().toString().c_str());
+ LOG(warning, "Failed to talk to persistence layer for bucket "
+ "%s. Aborting visitor '%s': %s",
+ reply->getBucketId().toString().c_str(),
+ _id.c_str(), reply->getResult().toString().c_str());
}
fail(reply->getResult());
BucketIterationState& bucketState(**it);
@@ -745,14 +783,17 @@ Visitor::onGetIterReply(const std::shared_ptr<GetIterReply>& reply, VisitorThrea
bucketState.setCompleted(reply->isCompleted());
--bucketState._pendingIterators;
if (!reply->getEntries().empty()) {
- LOG(debug, "Processing documents in handle given from bucket %s.", reply->getBucketId().toString().c_str());
+ LOG(debug, "Processing documents in handle given from bucket %s.",
+ reply->getBucketId().toString().c_str());
// While handling documents we should not keep locks, such
// that visitor may process several things at once.
if (isRunning()) {
MBUS_TRACE(reply->getTrace(), 5,
vespalib::make_string("Visitor %s handling block of %zu documents.",
_id.c_str(), reply->getEntries().size()));
- LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), reply->getEntries().size());
+ LOG(debug, "Visitor %s handling block of %zu documents.",
+ _id.c_str(),
+ reply->getEntries().size());
try {
framework::MilliSecTimer processingTimer(_component.getClock());
handleDocuments(reply->getBucketId(), reply->getEntries(), *_hitCounter);
@@ -872,11 +913,15 @@ Visitor::continueVisitor()
}
}
- LOG(debug, "No pending messages, tagging visitor '%s' complete", _id.c_str());
+ LOG(debug, "No pending messages, tagging visitor '%s' complete",
+ _id.c_str());
transitionTo(STATE_COMPLETED);
} else {
- LOG(debug, "Visitor %s waiting for all commands to be replied to (pending=%zu, queued=%zu)",
- _id.c_str(), _visitorTarget._pendingMessages.size(), _visitorTarget._queuedMessages.size());
+ LOG(debug, "Visitor %s waiting for all commands to be replied to "
+ "(pending=%zu, queued=%zu)",
+ _id.c_str(),
+ _visitorTarget._pendingMessages.size(),
+ _visitorTarget._queuedMessages.size());
}
return false;
} else {
@@ -936,14 +981,14 @@ Visitor::getStatus(std::ostream& out, bool verbose) const
<< (_visitorOptions._visitRemoves ? "true" : "false")
<< "</td></tr>\n";
out << "<tr><td>Control destination</td><td>";
- if (_controlDestination) {
+ if (_controlDestination.get()) {
out << xml_content_escaped(_controlDestination->toString());
} else {
out << "nil";
}
out << "</td></tr>\n";
out << "<tr><td>Data destination</td><td>";
- if (_dataDestination) {
+ if (_dataDestination.get()) {
out << xml_content_escaped(_dataDestination->toString());
} else {
out << "nil";
@@ -1033,13 +1078,17 @@ Visitor::getStatus(std::ostream& out, bool verbose) const
bool
Visitor::getIterators()
{
- LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, _currentBucket = %d",
- _id.c_str(), _buckets.size(), _bucketStates.size(), _currentBucket);
+ LOG(debug, "getIterators, visitor %s, _buckets = %zu , _bucketStates = %zu, "
+ "_currentBucket = %d",
+ _id.c_str(), _buckets.size(),
+ _bucketStates.size(), _currentBucket);
// Don't send any further GetIters if we're closing
if (!isRunning()) {
if (hasPendingIterators()) {
- LOG(debug, "Visitor has failed but waiting for %zu buckets to finish processing", _bucketStates.size());
+ LOG(debug, "Visitor has failed but waiting for %zu "
+ "buckets to finish processing",
+ _bucketStates.size());
return true;
} else {
return false;
@@ -1048,10 +1097,13 @@ Visitor::getIterators()
// Go through buckets found. Take the first that doesn't have requested
// state and request a new piece.
- for (auto it = _bucketStates.begin();it != _bucketStates.end();) {
+ for (std::list<BucketIterationState*>::iterator it = _bucketStates.begin();
+ it != _bucketStates.end();)
+ {
assert(*it);
BucketIterationState& bucketState(**it);
- if ((bucketState._pendingIterators >= _visitorOptions._maxParallelOneBucket)
+ if ((bucketState._pendingIterators
+ >= _visitorOptions._maxParallelOneBucket)
|| bucketState.hasPendingControlCommand())
{
++it;
@@ -1066,17 +1118,20 @@ Visitor::getIterators()
}
try{
completedBucket(bucketState.getBucketId(), *_hitCounter);
- _visitorStatistics.setBucketsVisited(_visitorStatistics.getBucketsVisited() + 1);
+ _visitorStatistics.setBucketsVisited(
+ _visitorStatistics.getBucketsVisited() + 1);
} catch (std::exception& e) {
std::ostringstream ost;
- ost << "Visitor fail to run completedBucket() notification: " << e.what();
+ ost << "Visitor fail to run completedBucket() notification: "
+ << e.what();
reportProblem(ost.str());
}
delete *it;
it = _bucketStates.erase(it);
continue;
}
- auto cmd = std::make_shared<GetIterCommand>(bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize);
+ auto cmd = std::make_shared<GetIterCommand>(
+ bucketState.getBucket(), bucketState.getIteratorId(), _docBlockSize);
cmd->getTrace().setLevel(_traceLevel);
cmd->setPriority(_priority);
_messageHandler->send(cmd, *this);
@@ -1088,7 +1143,7 @@ Visitor::getIterators()
}
// If there aren't anymore buckets to iterate, we're done
- if (_bucketStates.empty() && _currentBucket >= _buckets.size()) {
+ if (_bucketStates.size() == 0 && _currentBucket >= _buckets.size()) {
LOG(debug, "No more buckets to visit for visitor '%s'.", _id.c_str());
return false;
}
@@ -1102,13 +1157,17 @@ Visitor::getIterators()
_currentBucket < _buckets.size())
{
document::Bucket bucket(_bucketSpace, _buckets[_currentBucket]);
- auto newBucketState = std::make_unique<BucketIterationState>(*this, *_messageHandler, bucket);
+ std::unique_ptr<BucketIterationState> newBucketState(
+ new BucketIterationState(*this, *_messageHandler, bucket));
LOG(debug, "Visitor '%s': Sending create iterator for bucket %s.",
_id.c_str(), bucket.getBucketId().toString().c_str());
- spi::Selection selection = spi::Selection(spi::DocumentSelection(_documentSelectionString));
- selection.setFromTimestamp(spi::Timestamp(_visitorOptions._fromTime.getTime()));
- selection.setToTimestamp(spi::Timestamp(_visitorOptions._toTime.getTime()));
+ spi::Selection selection
+ = spi::Selection(spi::DocumentSelection(_documentSelectionString));
+ selection.setFromTimestamp(
+ spi::Timestamp(_visitorOptions._fromTime.getTime()));
+ selection.setToTimestamp(
+ spi::Timestamp(_visitorOptions._toTime.getTime()));
auto cmd = std::make_shared<CreateIteratorCommand>(bucket, selection,_visitorOptions._fieldSet,
_visitorOptions._visitRemoves
@@ -1125,7 +1184,8 @@ Visitor::getIterators()
}
if (sentCount == 0) {
if (LOG_WOULD_LOG(debug)) {
- LOG(debug, "Enough iterators being processed. Doing nothing for visitor '%s' bucketStates = %zu.",
+ LOG(debug, "Enough iterators being processed. Doing nothing for "
+ "visitor '%s' bucketStates = %zu.",
_id.c_str(), _bucketStates.size());
for (const auto& state : _bucketStates) {
LOG(debug, "Existing: %s", state->toString().c_str());
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 9b6d8e348b9..0737c5612c0 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -136,24 +136,28 @@ private:
{}
/** Sends DestroyIterator over _messageHandler if _iteratorId != 0 */
- ~BucketIterationState() override;
+ ~BucketIterationState();
void setCompleted(bool completed = true) { _completed = completed; }
- [[nodiscard]] bool isCompleted() const { return _completed; }
+ bool isCompleted() const { return _completed; }
- [[nodiscard]] document::Bucket getBucket() const { return _bucket; }
- [[nodiscard]] document::BucketId getBucketId() const { return _bucket.getBucketId(); }
+ document::Bucket getBucket() const { return _bucket; }
+ document::BucketId getBucketId() const { return _bucket.getBucketId(); }
void setIteratorId(spi::IteratorId iteratorId) {
_iteratorId = iteratorId;
}
- [[nodiscard]] spi::IteratorId getIteratorId() const { return _iteratorId; }
+ spi::IteratorId getIteratorId() const { return _iteratorId; }
- [[nodiscard]] bool hasPendingControlCommand() const {
+ void setPendingControlCommand() {
+ _iteratorId = spi::IteratorId(0);
+ }
+
+ bool hasPendingControlCommand() const {
return _iteratorId == spi::IteratorId(0);
}
- [[nodiscard]] bool hasPendingIterators() const { return _pendingIterators > 0; }
+ bool hasPendingIterators() const { return _pendingIterators > 0; }
void print(std::ostream& out, bool, const std::string& ) const override {
out << "BucketIterationState("
@@ -243,10 +247,12 @@ private:
MessageMeta releaseMetaForMessageId(uint64_t msgId);
void reinsertMeta(MessageMeta);
- [[nodiscard]] bool hasQueuedMessages() const { return !_queuedMessages.empty(); }
+ bool hasQueuedMessages() const { return !_queuedMessages.empty(); }
void discardQueuedMessages();
- [[nodiscard]] uint32_t getMemoryUsage() const noexcept { return _memoryUsage; }
+ uint32_t getMemoryUsage() const noexcept {
+ return _memoryUsage;
+ }
VisitorTarget();
~VisitorTarget();
@@ -320,9 +326,9 @@ protected:
std::string _documentSelectionString;
vdslib::VisitorStatistics _visitorStatistics;
- [[nodiscard]] bool isCompletedCalled() const { return _calledCompletedVisitor; }
+ bool isCompletedCalled() const { return _calledCompletedVisitor; }
- [[nodiscard]] uint32_t traceLevel() const noexcept { return _traceLevel; }
+ uint32_t traceLevel() const noexcept { return _traceLevel; }
/**
* Attempts to add the given trace message to the internal, memory bounded
@@ -333,7 +339,7 @@ protected:
*/
bool addBoundedTrace(uint32_t level, const vespalib::string& message);
- [[nodiscard]] const vdslib::Parameters& visitor_parameters() const noexcept;
+ const vdslib::Parameters& visitor_parameters() const noexcept;
// Possibly modifies the ReturnCode parameter in-place if its return code should
// be changed based on visitor subclass-specific behavior.
@@ -411,7 +417,7 @@ public:
* The consistency level provided here is propagated through the SPI
* Context object for createIterator calls.
*/
- [[nodiscard]] virtual spi::ReadConsistency getRequiredReadConsistency() const {
+ virtual spi::ReadConsistency getRequiredReadConsistency() const {
return spi::ReadConsistency::STRONG;
}
@@ -422,7 +428,8 @@ public:
/**
* Used to silence transient errors that can happen during normal operation.
*/
- [[nodiscard]] static bool shouldReportProblemToClient(const api::ReturnCode&, size_t retryCount) ;
+ bool shouldReportProblemToClient(const api::ReturnCode&,
+ size_t retryCount) const;
/** Called to send report to client of potential non-critical problems. */
void reportProblem(const std::string& problem);
@@ -485,16 +492,18 @@ public:
void getStatus(std::ostream& out, bool verbose) const;
- void setMaxParallel(uint32_t maxParallel) { _visitorOptions._maxParallel = maxParallel; }
- void setMaxParallelPerBucket(uint32_t max) { _visitorOptions._maxParallelOneBucket = max; }
+ void setMaxParallel(uint32_t maxParallel)
+ { _visitorOptions._maxParallel = maxParallel; }
+ void setMaxParallelPerBucket(uint32_t max)
+ { _visitorOptions._maxParallelOneBucket = max; }
/**
* Sends a message to the data handler for this visitor.
*/
void sendMessage(std::unique_ptr<documentapi::DocumentMessage> documentMessage);
- [[nodiscard]] bool isRunning() const { return _state == STATE_RUNNING; }
- [[nodiscard]] bool isCompleted() const { return _state == STATE_COMPLETED; }
+ bool isRunning() const { return _state == STATE_RUNNING; }
+ bool isCompleted() const { return _state == STATE_COMPLETED; }
private:
/**
@@ -533,9 +542,11 @@ private:
void sendReplyOnce();
- [[nodiscard]] bool hasFailedVisiting() const { return _result.failed(); }
- [[nodiscard]] bool hasPendingIterators() const { return !_bucketStates.empty(); }
- [[nodiscard]] bool mayTransitionToCompleted() const;
+ bool hasFailedVisiting() const { return _result.failed(); }
+
+ bool hasPendingIterators() const { return !_bucketStates.empty(); }
+
+ bool mayTransitionToCompleted() const;
void discardAllNoPendingBucketStates();
@@ -554,7 +565,9 @@ private:
*
* Precondition: attach() must have been called on `this`.
*/
- [[nodiscard]] bool shouldAddMbusTrace() const noexcept { return _traceLevel != 0; }
+ bool shouldAddMbusTrace() const noexcept {
+ return _traceLevel != 0;
+ }
/**
* Set internal state to the given state value.
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index 07938002746..a03b9a9a8a3 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -187,8 +187,9 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
for (int32_t i=0; i<config->visitorthreads; ++i) {
_visitorThread.emplace_back(
// Naked new due to a lot of private inheritance in VisitorThread and VisitorManager
- std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory,
- _visitorFactories, *_metrics->threads[i], *this)),
+ std::shared_ptr<VisitorThread>(
+ new VisitorThread(i, _componentRegister, _messageSessionFactory,
+ _visitorFactories, *_metrics->threads[i], *this)),
std::map<api::VisitorId, std::string>());
}
}
@@ -449,7 +450,8 @@ VisitorManager::processReply(const std::shared_ptr<api::StorageReply>& reply)
}
void
-VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& visitor)
+VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd,
+ Visitor& visitor)
{
assert(cmd->getType() == api::MessageType::INTERNAL);
// Only add to internal state if not destroy iterator command, as
@@ -458,7 +460,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& v
if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) {
MessageInfo inf;
inf.id = visitor.getVisitorId();
- inf.timestamp = _component.getClock().getSystemTime();
+ inf.timestamp = _component.getClock().getTimeInSeconds().getTime();
inf.timeout = cmd->getTimeout();
if (cmd->getAddress()) {
@@ -621,7 +623,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
out << "<tr>"
<< "<td>" << entry.first << "</td>"
<< "<td>" << entry.second.id << "</td>"
- << "<td>" << vespalib::to_string(entry.second.timestamp) << "</td>"
+ << "<td>" << entry.second.timestamp << "</td>"
<< "<td>" << vespalib::count_ms(entry.second.timeout) << "</td>"
<< "<td>" << xml_content_escaped(entry.second.destination) << "</td>"
<< "</tr>\n";
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 3e331e1c9a2..33703b392bc 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -57,7 +57,7 @@ private:
struct MessageInfo {
api::VisitorId id;
- vespalib::system_time timestamp;
+ time_t timestamp;
vespalib::duration timeout;
std::string destination;
};
@@ -168,7 +168,9 @@ private:
* by the formula: fixed + variable * ((255 - priority) / 255)
*/
uint32_t maximumConcurrent(const api::CreateVisitorCommand& cmd) const {
- return _maxFixedConcurrentVisitors + static_cast<uint32_t>(_maxVariableConcurrentVisitors * ((255.0 - cmd.getPriority()) / 255.0));
+ return _maxFixedConcurrentVisitors + static_cast<uint32_t>(
+ _maxVariableConcurrentVisitors
+ * ((255.0 - cmd.getPriority()) / 255.0));
}
void updateMetrics(const MetricLockGuard &) override;
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index e3ebef3a3ef..55ef83ba658 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -126,10 +126,10 @@ VisitorThread::shutdown()
if (event._message.get()) {
if (!event._message->getType().isReply()
&& (event._message->getType() != api::MessageType::INTERNAL
- || dynamic_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
+ || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
{
std::shared_ptr<api::StorageReply> reply(
- dynamic_cast<api::StorageCommand&>(*event._message).makeReply());
+ static_cast<api::StorageCommand&>(*event._message).makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
_messageSender.send(reply);
}
@@ -197,7 +197,7 @@ VisitorThread::run(framework::ThreadHandle& thread)
// disappear when no visiting is done)
if (entry._message.get() &&
(entry._message->getType() != api::MessageType::INTERNAL
- || dynamic_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID))
+ || static_cast<api::InternalCommand&>(*entry._message).getType() != PropagateVisitorConfig::ID))
{
entry._timer.stop(_metrics.averageQueueWaitingTime);
}
@@ -290,7 +290,7 @@ VisitorThread::close()
} else {
_metrics.completedVisitors.inc(1);
}
- vespalib::steady_time currentTime(_component.getClock().getMonotonicTime());
+ framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
trimRecentlyCompletedList(currentTime);
_recentlyCompleted.emplace_back(_currentlyRunningVisitor->first, currentTime);
_visitors.erase(_currentlyRunningVisitor);
@@ -298,9 +298,9 @@ VisitorThread::close()
}
void
-VisitorThread::trimRecentlyCompletedList(vespalib::steady_time currentTime)
+VisitorThread::trimRecentlyCompletedList(framework::SecondTime currentTime)
{
- vespalib::steady_time recentLimit(currentTime - 30s);
+ framework::SecondTime recentLimit(currentTime - framework::SecondTime(30));
// Dump all elements that aren't recent anymore
while (!_recentlyCompleted.empty()
&& _recentlyCompleted.front().second < recentLimit)
@@ -313,7 +313,8 @@ void
VisitorThread::handleNonExistingVisitorCall(const Event& entry, ReturnCode& code)
{
// Get current time. Set the time that is the oldest still recent.
- trimRecentlyCompletedList(_component.getClock().getMonotonicTime());
+ framework::SecondTime currentTime(_component.getClock().getTimeInSeconds());
+ trimRecentlyCompletedList(currentTime);
// Go through all recent visitors. Ignore request if recent
for (const auto& e : _recentlyCompleted) {
@@ -343,7 +344,7 @@ VisitorThread::createVisitor(vespalib::stringref libName,
auto it = _visitorFactories.find(str);
if (it == _visitorFactories.end()) {
error << "Visitor library " << str << " not found.";
- return {};
+ return std::shared_ptr<Visitor>();
}
auto libIter = _libs.find(str);
@@ -362,7 +363,7 @@ VisitorThread::createVisitor(vespalib::stringref libName,
} catch (std::exception& e) {
error << "Failed to create visitor instance of type " << libName
<< ": " << e.what();
- return {};
+ return std::shared_ptr<Visitor>();
}
}
@@ -689,7 +690,7 @@ VisitorThread::getStatus(vespalib::asciistream& out,
}
for (const auto& cv : _recentlyCompleted) {
out << "<li> Visitor " << cv.first << " done at "
- << vespalib::to_string(vespalib::to_utc(cv.second)) << "\n";
+ << cv.second.getTime() << "\n";
}
out << "</ul>\n";
out << "<h3>Current queue size: " << _queue.size() << "</h3>\n";
@@ -735,10 +736,12 @@ VisitorThread::getStatus(vespalib::asciistream& out,
if (_visitors.empty()) {
out << "None\n";
}
- for (const auto & v : _visitors) {
- out << "<a href=\"?visitor=" << v.first
+ for (VisitorMap::const_iterator it = _visitors.begin();
+ it != _visitors.end(); ++it)
+ {
+ out << "<a href=\"?visitor=" << it->first
<< (verbose ? "&verbose" : "") << "\">Visitor "
- << v.first << "</a><br>\n";
+ << it->first << "</a><br>\n";
}
}
}
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index 56e40328fda..226e7c0631b 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -38,7 +38,7 @@ class VisitorThread : public framework::Runnable,
using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>;
VisitorMap _visitors;
- std::deque<std::pair<api::VisitorId, vespalib::steady_time>> _recentlyCompleted;
+ std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted;
struct Event {
enum class Type {
@@ -118,7 +118,7 @@ private:
*/
Event popNextQueuedEventIfAvailable();
void tick();
- void trimRecentlyCompletedList(vespalib::steady_time currentTime);
+ void trimRecentlyCompletedList(framework::SecondTime currentTime);
void handleNonExistingVisitorCall(const Event& entry, api::ReturnCode& code);
std::shared_ptr<Visitor> createVisitor(vespalib::stringref libName,
diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h
index d234f432f2b..0ca28f8114d 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h
@@ -67,13 +67,9 @@ public:
framework::SecondTime getTimeInSeconds() const override {
return getTimeInMicros().getSeconds();
}
- vespalib::system_time getSystemTime() const override {
+ framework::MonotonicTimePoint getMonotonicTime() const override {
// For simplicity, assume fake monotonic time follows fake wall clock.
- return vespalib::system_time(std::chrono::microseconds(getTimeInMicros().getTime()));
- }
- vespalib::steady_time getMonotonicTime() const override {
- // For simplicity, assume fake monotonic time follows fake wall clock.
- return vespalib::steady_time(std::chrono::microseconds(getTimeInMicros().getTime()));
+ return MonotonicTimePoint(std::chrono::microseconds(getTimeInMicros().getTime()));
}
};
diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp
index df6115aa416..0303481feb5 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp
+++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp
@@ -5,36 +5,27 @@
namespace storage::framework::defaultimplementation {
-MicroSecTime
-RealClock::getTimeInMicros() const {
+MicroSecTime RealClock::getTimeInMicros() const {
struct timeval mytime;
gettimeofday(&mytime, 0);
return MicroSecTime(mytime.tv_sec * 1000000llu + mytime.tv_usec);
}
-MilliSecTime
-RealClock::getTimeInMillis() const {
+MilliSecTime RealClock::getTimeInMillis() const {
struct timeval mytime;
gettimeofday(&mytime, 0);
return MilliSecTime(
mytime.tv_sec * 1000llu + mytime.tv_usec / 1000);
}
-SecondTime
-RealClock::getTimeInSeconds() const {
+SecondTime RealClock::getTimeInSeconds() const {
struct timeval mytime;
gettimeofday(&mytime, 0);
return SecondTime(mytime.tv_sec);
}
-vespalib::steady_time
-RealClock::getMonotonicTime() const {
- return vespalib::steady_clock::now();
-}
-
-vespalib::system_time
-RealClock::getSystemTime() const {
- return vespalib::system_clock::now();
+MonotonicTimePoint RealClock::getMonotonicTime() const {
+ return std::chrono::steady_clock::now();
}
}
diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
index de176a3e402..a4b80a990c9 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
@@ -17,8 +17,7 @@ struct RealClock : public Clock {
MicroSecTime getTimeInMicros() const override;
MilliSecTime getTimeInMillis() const override;
SecondTime getTimeInSeconds() const override;
- vespalib::steady_time getMonotonicTime() const override;
- vespalib::system_time getSystemTime() const override;
+ MonotonicTimePoint getMonotonicTime() const override;
};
}
diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
index d228dace1ed..bd4afa6c9ad 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
@@ -21,14 +21,14 @@ namespace storage::framework::defaultimplementation {
class TestComponentRegister {
ComponentRegisterImpl::UP _compReg;
- FakeClock _clock;
- ThreadPoolImpl _threadPool;
+ FakeClock _clock;
+ ThreadPoolImpl _threadPool;
public:
- explicit TestComponentRegister(ComponentRegisterImpl::UP compReg);
- virtual ~TestComponentRegister();
+ TestComponentRegister(ComponentRegisterImpl::UP compReg);
+ ~TestComponentRegister();
- virtual ComponentRegisterImpl& getComponentRegister() { return *_compReg; }
+ ComponentRegisterImpl& getComponentRegister() { return *_compReg; }
FakeClock& getClock() { return _clock; }
ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; }
FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); }
diff --git a/storage/src/vespa/storageframework/generic/clock/clock.h b/storage/src/vespa/storageframework/generic/clock/clock.h
index e1f8419f069..c9b8f652bfe 100644
--- a/storage/src/vespa/storageframework/generic/clock/clock.h
+++ b/storage/src/vespa/storageframework/generic/clock/clock.h
@@ -28,8 +28,7 @@ struct Clock {
virtual SecondTime getTimeInSeconds() const = 0;
// Time point resolution is intentionally not defined here.
- virtual vespalib::steady_time getMonotonicTime() const = 0;
- virtual vespalib::system_time getSystemTime() const = 0;
+ virtual MonotonicTimePoint getMonotonicTime() const = 0;
};
}
diff --git a/storage/src/vespa/storageframework/generic/clock/time.h b/storage/src/vespa/storageframework/generic/clock/time.h
index 882ff58fb74..372110a1374 100644
--- a/storage/src/vespa/storageframework/generic/clock/time.h
+++ b/storage/src/vespa/storageframework/generic/clock/time.h
@@ -10,8 +10,8 @@ namespace vespalib {
namespace storage::framework {
-using MonotonicTimePoint = vespalib::steady_time;
-using MonotonicDuration = vespalib::duration;
+using MonotonicTimePoint = std::chrono::steady_clock::time_point;
+using MonotonicDuration = std::chrono::steady_clock::duration;
struct Clock;
@@ -111,6 +111,9 @@ struct SecondTime : public Time<SecondTime, 1000000> {
explicit SecondTime(uint64_t t = 0) : Time<SecondTime, 1000000>(t) {}
explicit SecondTime(const Clock& clock)
: Time<SecondTime, 1000000>(getRawMicroTime(clock) / 1000000) {}
+
+ [[nodiscard]] MilliSecTime getMillis() const;
+ [[nodiscard]] MicroSecTime getMicros() const;
};
/**
@@ -128,6 +131,7 @@ struct MilliSecTime : public Time<MilliSecTime, 1000> {
explicit MilliSecTime(const Clock& clock)
: Time<MilliSecTime, 1000>(getRawMicroTime(clock) / 1000) {}
+ [[nodiscard]] SecondTime getSeconds() const { return SecondTime(getTime() / 1000); }
[[nodiscard]] MicroSecTime getMicros() const;
};
@@ -150,6 +154,14 @@ struct MicroSecTime : public Time<MicroSecTime, 1> {
[[nodiscard]] SecondTime getSeconds() const { return SecondTime(getTime() / 1000000); }
};
+inline MilliSecTime SecondTime::getMillis() const {
+ return MilliSecTime(getTime() * 1000);
+}
+
+inline MicroSecTime SecondTime::getMicros() const {
+ return MicroSecTime(getTime() * 1000 * 1000);
+}
+
inline MicroSecTime MilliSecTime::getMicros() const {
return MicroSecTime(getTime() * 1000);
}
@@ -168,6 +180,13 @@ operator + (MilliSecTime a, MilliSecTime b) {
return result;
}
+inline SecondTime
+operator + (SecondTime a, SecondTime b) {
+ SecondTime result(a);
+ result += b;
+ return result;
+}
+
inline MicroSecTime
operator - (MicroSecTime a, MicroSecTime b) {
MicroSecTime result(a);
@@ -175,4 +194,11 @@ operator - (MicroSecTime a, MicroSecTime b) {
return result;
}
+inline SecondTime
+operator - (SecondTime a, SecondTime b) {
+ SecondTime result(a);
+ result -= b;
+ return result;
+}
+
}
diff --git a/vdslib/src/tests/state/nodestatetest.cpp b/vdslib/src/tests/state/nodestatetest.cpp
index c854adf3915..000542e77fe 100644
--- a/vdslib/src/tests/state/nodestatetest.cpp
+++ b/vdslib/src/tests/state/nodestatetest.cpp
@@ -20,7 +20,7 @@ TEST(NodeStateTest, test_parsing)
{
NodeState ns = NodeState("t:4");
EXPECT_EQ(std::string("s:u t:4"), ns.toString());
- EXPECT_EQ(4s, ns.getStartTimestamp().time_since_epoch());
+ EXPECT_EQ(uint64_t(4), ns.getStartTimestamp());
}
{
NodeState ns = NodeState("s:u c:2.4 b:12");
diff --git a/vdslib/src/vespa/vdslib/state/nodestate.cpp b/vdslib/src/vespa/vdslib/state/nodestate.cpp
index 0dd7f5abb4c..a7c5476456a 100644
--- a/vdslib/src/vespa/vdslib/state/nodestate.cpp
+++ b/vdslib/src/vespa/vdslib/state/nodestate.cpp
@@ -8,8 +8,9 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <sstream>
-
+#include <cmath>
#include <vespa/log/log.h>
+
LOG_SETUP(".vdslib.nodestate");
namespace storage::lib {
@@ -18,16 +19,16 @@ NodeState::NodeState(const NodeState &) = default;
NodeState & NodeState::operator = (const NodeState &) = default;
NodeState::NodeState(NodeState &&) noexcept = default;
NodeState & NodeState::operator = (NodeState &&) noexcept = default;
-NodeState::~NodeState() = default;
+NodeState::~NodeState() { }
NodeState::NodeState()
- : _type(nullptr),
- _state(nullptr),
+ : _type(0),
+ _state(0),
_description(""),
_capacity(1.0),
_initProgress(0.0),
_minUsedBits(16),
- _startTimestamp()
+ _startTimestamp(0)
{
setState(State::UP);
}
@@ -35,12 +36,12 @@ NodeState::NodeState()
NodeState::NodeState(const NodeType& type, const State& state,
vespalib::stringref description, double capacity)
: _type(&type),
- _state(nullptr),
+ _state(0),
_description(description),
_capacity(1.0),
_initProgress(0.0),
_minUsedBits(16),
- _startTimestamp()
+ _startTimestamp(0)
{
setState(state);
if (type == NodeType::STORAGE) {
@@ -55,24 +56,25 @@ NodeState::NodeState(vespalib::stringref serialized, const NodeType* type)
_capacity(1.0),
_initProgress(0.0),
_minUsedBits(16),
- _startTimestamp()
+ _startTimestamp(0)
{
vespalib::StringTokenizer st(serialized, " \t\f\r\n");
st.removeEmptyTokens();
- for (auto it : st)
+ for (vespalib::StringTokenizer::Iterator it = st.begin();
+ it != st.end(); ++it)
{
- std::string::size_type index = it.find(':');
+ std::string::size_type index = it->find(':');
if (index == std::string::npos) {
throw vespalib::IllegalArgumentException(
- "Token " + it + " does not contain ':': " + serialized,
+ "Token " + *it + " does not contain ':': " + serialized,
VESPA_STRLOC);
}
- std::string key = it.substr(0, index);
- std::string value = it.substr(index + 1);
- if (!key.empty()) switch (key[0]) {
+ std::string key = it->substr(0, index);
+ std::string value = it->substr(index + 1);
+ if (key.size() > 0) switch (key[0]) {
case 'b':
- if (_type != nullptr && *type != NodeType::STORAGE) break;
+ if (_type != 0 && *type != NodeType::STORAGE) break;
if (key.size() > 1) break;
try{
setMinUsedBits(boost::lexical_cast<uint32_t>(value));
@@ -89,7 +91,7 @@ NodeState::NodeState(vespalib::stringref serialized, const NodeType* type)
continue;
case 'c':
if (key.size() > 1) break;
- if (_type != nullptr && *type != NodeType::STORAGE) break;
+ if (_type != 0 && *type != NodeType::STORAGE) break;
try{
setCapacity(boost::lexical_cast<double>(value));
} catch (...) {
@@ -113,7 +115,7 @@ NodeState::NodeState(vespalib::stringref serialized, const NodeType* type)
case 't':
if (key.size() > 1) break;
try{
- setStartTimestamp(vespalib::system_time(std::chrono::seconds(boost::lexical_cast<uint64_t>(value))));
+ setStartTimestamp(boost::lexical_cast<uint64_t>(value));
} catch (...) {
throw vespalib::IllegalArgumentException(
"Illegal start timestamp '" + value + "'. Start "
@@ -163,7 +165,7 @@ NodeState::serialize(vespalib::asciistream & out, vespalib::stringref prefix,
SeparatorPrinter sep;
// Always give node state if not part of a system state
// to prevent empty serialization
- if (*_state != State::UP || prefix.empty()) {
+ if (*_state != State::UP || prefix.size() == 0) {
out << sep << prefix << "s:";
out << _state->serialize();
}
@@ -176,8 +178,8 @@ NodeState::serialize(vespalib::asciistream & out, vespalib::stringref prefix,
if (*_state == State::INITIALIZING) {
out << sep << prefix << "i:" << _initProgress;
}
- if (_startTimestamp != vespalib::system_time()) {
- out << sep << prefix << "t:" << vespalib::count_s(_startTimestamp.time_since_epoch());
+ if (_startTimestamp != 0) {
+ out << sep << prefix << "t:" << _startTimestamp;
}
if (includeDescription && ! _description.empty()) {
out << sep << prefix << "m:"
@@ -188,7 +190,7 @@ NodeState::serialize(vespalib::asciistream & out, vespalib::stringref prefix,
void
NodeState::setState(const State& state)
{
- if (_type != nullptr) {
+ if (_type != 0) {
// We don't know whether you want to store reported, wanted or
// current node state, so we must accept any.
if (!state.validReportedNodeState(*_type)
@@ -223,7 +225,7 @@ NodeState::setCapacity(vespalib::Double capacity)
"must be a positive floating point number";
throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC);
}
- if (_type != nullptr && *_type != NodeType::STORAGE) {
+ if (_type != 0 && *_type != NodeType::STORAGE) {
throw vespalib::IllegalArgumentException(
"Capacity only make sense for storage nodes.", VESPA_STRLOC);
}
@@ -243,7 +245,7 @@ NodeState::setInitProgress(vespalib::Double initProgress)
}
void
-NodeState::setStartTimestamp(vespalib::system_time startTimestamp)
+NodeState::setStartTimestamp(uint64_t startTimestamp)
{
_startTimestamp = startTimestamp;
}
@@ -268,10 +270,10 @@ NodeState::print(std::ostream& out, bool verbose,
if (*_state == State::INITIALIZING) {
out << ", init progress " << _initProgress;
}
- if (_startTimestamp != vespalib::system_time()) {
- out << ", start timestamp " << vespalib::to_string(_startTimestamp);
+ if (_startTimestamp != 0) {
+ out << ", start timestamp " << _startTimestamp;
}
- if (!_description.empty()) {
+ if (_description.size() > 0) {
out << ": " << _description;
}
}
@@ -315,7 +317,7 @@ NodeState::similarTo(const NodeState& other) const
void
NodeState::verifySupportForNodeType(const NodeType& type) const
{
- if (_type != nullptr && *_type == type) return;
+ if (_type != 0 && *_type == type) return;
if (!_state->validReportedNodeState(type)
&& !_state->validWantedNodeState(type))
{
@@ -355,8 +357,8 @@ NodeState::getTextualDifference(const NodeState& other) const {
}
}
if (_startTimestamp != other._startTimestamp) {
- source << ", start timestamp " << vespalib::to_string(_startTimestamp);
- target << ", start timestamp " << vespalib::to_string(other._startTimestamp);
+ source << ", start timestamp " << _startTimestamp;
+ target << ", start timestamp " << other._startTimestamp;
}
if (source.str().length() < 2 || target.str().length() < 2) {
diff --git a/vdslib/src/vespa/vdslib/state/nodestate.h b/vdslib/src/vespa/vdslib/state/nodestate.h
index 4fb035b6dcd..541395e15cb 100644
--- a/vdslib/src/vespa/vdslib/state/nodestate.h
+++ b/vdslib/src/vespa/vdslib/state/nodestate.h
@@ -13,7 +13,6 @@
#include "state.h"
#include <vespa/document/util/printable.h>
#include <vespa/vespalib/objects/floatingpointtype.h>
-#include <vespa/vespalib/util/time.h>
#include <memory>
namespace storage::lib {
@@ -26,7 +25,7 @@ class NodeState : public document::Printable
vespalib::Double _capacity;
vespalib::Double _initProgress;
uint32_t _minUsedBits;
- vespalib::system_time _startTimestamp;
+ uint64_t _startTimestamp;
public:
using CSP = std::shared_ptr<const NodeState>;
@@ -44,8 +43,8 @@ public:
vespalib::stringref description = "",
double capacity = 1.0);
/** Set type if you want to verify that content fit with the given type. */
- explicit NodeState(vespalib::stringref serialized, const NodeType* nodeType = nullptr);
- ~NodeState() override;
+ NodeState(vespalib::stringref serialized, const NodeType* nodeType = 0);
+ ~NodeState();
/**
* Setting prefix to something implies using this function to write a
@@ -55,27 +54,26 @@ public:
void serialize(vespalib::asciistream & out, vespalib::stringref prefix = "",
bool includeDescription = true) const;
- [[nodiscard]] const State& getState() const { return *_state; }
- [[nodiscard]] vespalib::Double getCapacity() const { return _capacity; }
- [[nodiscard]] uint32_t getMinUsedBits() const { return _minUsedBits; }
- [[nodiscard]] vespalib::Double getInitProgress() const { return _initProgress; }
- [[nodiscard]] const vespalib::string& getDescription() const { return _description; }
- [[nodiscard]] vespalib::system_time getStartTimestamp() const { return _startTimestamp; }
+ const State& getState() const { return *_state; }
+ vespalib::Double getCapacity() const { return _capacity; }
+ uint32_t getMinUsedBits() const { return _minUsedBits; }
+ vespalib::Double getInitProgress() const { return _initProgress; }
+ const vespalib::string& getDescription() const { return _description; }
+ uint64_t getStartTimestamp() const { return _startTimestamp; }
void setState(const State& state);
void setCapacity(vespalib::Double capacity);
void setMinUsedBits(uint32_t usedBits);
void setInitProgress(vespalib::Double initProgress);
- void setStartTimestamp(vespalib::system_time startTimestamp);
+ void setStartTimestamp(uint64_t startTimestamp);
void setDescription(vespalib::stringref desc) { _description = desc; }
void print(std::ostream& out, bool verbose,
const std::string& indent) const override;
bool operator==(const NodeState& other) const;
- bool operator!=(const NodeState& other) const {
- return !(operator==(other));
- }
- [[nodiscard]] bool similarTo(const NodeState& other) const;
+ bool operator!=(const NodeState& other) const
+ { return !(operator==(other)); }
+ bool similarTo(const NodeState& other) const;
/**
* Verify that the contents of this object fits with the given nodetype.