aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-07 08:33:23 +0100
committerGitHub <noreply@github.com>2023-02-07 08:33:23 +0100
commit491eae8d54a7c0c1b98560f71e5fd5c7519c9512 (patch)
treeb73ce9a3daba86d37e3488ba9ebabaff5b651c95
parent3ed47fbc6265fd9b4f2c6b38f304252af414d00a (diff)
parent1a06962f48e3498ca9c8e5fe96b97b206de0064d (diff)
Merge pull request #25896 from vespa-engine/balder/use-vespalib-system_time-for-operation
Use espalib::system_time instead of framework::MilliSecTime for Opera… MERGEOK
-rw-r--r--storage/src/tests/distributor/bucketstateoperationtest.cpp8
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp24
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp3
-rw-r--r--storage/src/tests/distributor/joinbuckettest.cpp4
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp8
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp9
-rw-r--r--storage/src/tests/distributor/removelocationtest.cpp2
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp5
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp6
-rw-r--r--storage/src/tests/distributor/statoperationtest.cpp4
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp86
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp15
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp52
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h18
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.h21
19 files changed, 145 insertions, 155 deletions
diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp
index 5d11f9653ea..42ee4675e26 100644
--- a/storage/src/tests/distributor/bucketstateoperationtest.cpp
+++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp
@@ -43,7 +43,7 @@ TEST_F(BucketStateOperationTest, activate_single_node) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ(1, _sender.commands().size());
@@ -79,7 +79,7 @@ TEST_F(BucketStateOperationTest, activate_and_deactivate_nodes) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ(1, _sender.commands().size());
{
@@ -135,7 +135,7 @@ TEST_F(BucketStateOperationTest, do_not_deactivate_if_activate_fails) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ(1, _sender.commands().size());
{
@@ -178,7 +178,7 @@ TEST_F(BucketStateOperationTest, bucket_db_not_updated_on_failure) {
SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ(1, _sender.commands().size());
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 1a104727f43..c2f4836f4cb 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -139,7 +139,7 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
TEST_F(GarbageCollectionOperationTest, simple_legacy) {
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
EXPECT_FALSE(op->is_two_phase());
ASSERT_EQ(2, _sender.commands().size());
@@ -158,7 +158,7 @@ TEST_F(GarbageCollectionOperationTest, simple_legacy) {
TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) {
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
EXPECT_EQ(0u, gc_removed_documents_metric());
@@ -175,7 +175,7 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until
TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
reply_to_nth_request(*op, 0, 1234, 0);
@@ -195,20 +195,20 @@ TEST_F(GarbageCollectionOperationTest, two_phase_gc_requires_config_enabling_and
// Config enabled, but only 1 node says it supports two-phase RemoveLocation
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
EXPECT_FALSE(op->is_two_phase());
// Node 0 suddenly upgraded...!
set_node_supported_features(0, with_two_phase);
op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
EXPECT_TRUE(op->is_two_phase());
// But doesn't matter if two-phase GC is config-disabled
config_enable_two_phase_gc(false);
op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
EXPECT_FALSE(op->is_two_phase());
}
@@ -216,7 +216,7 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l
enable_two_phase_gc();
auto op = create_op();
op->setPriority(getConfig().getMaintenancePriorities().garbageCollection);
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
for (int i : {0, 1}) {
@@ -229,7 +229,7 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l
TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_union_of_returned_entries_with_feed_pri) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
@@ -256,7 +256,7 @@ TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_un
TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_results) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
@@ -272,7 +272,7 @@ TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_res
TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
@@ -314,7 +314,7 @@ struct GarbageCollectionOperationPhase1FailureTest : GarbageCollectionOperationT
enable_two_phase_gc();
_op = create_op();
- _op->start(_sender, framework::MilliSecTime(0));
+ _op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
_r1 = make_remove_location_reply(*_sender.command(0));
@@ -367,7 +367,7 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in
TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
enable_two_phase_gc();
auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 6b27af63fb7..8d188f6c005 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -4,7 +4,6 @@
#include <vespa/config/helper/configgetter.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/config/documenttypes_config_fwd.h>
-#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
@@ -62,7 +61,7 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
msg, metrics().gets,
consistency);
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
}
static constexpr uint32_t LastCommand = UINT32_MAX;
diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp
index bc87893b610..570fe24679e 100644
--- a/storage/src/tests/distributor/joinbuckettest.cpp
+++ b/storage/src/tests/distributor/joinbuckettest.cpp
@@ -46,7 +46,7 @@ TEST_F(JoinOperationTest, simple) {
document::BucketId(33, 0x100000001)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}});
@@ -103,7 +103,7 @@ TEST_F(JoinOperationTest, send_sparse_joins_to_nodes_without_both_source_buckets
document::BucketId(33, 0x100000001)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}));
ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}}));
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 9e0c89819a7..512c092d8ae 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -71,7 +71,7 @@ MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes, Pr
auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes));
op->setIdealStateManager(&getIdealStateManager());
op->setPriority(merge_pri);
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
return op;
}
@@ -291,7 +291,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
@@ -352,7 +352,7 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) {
MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
std::string merge(
"MergeBucketCommand(BucketId(0x4000000000000001), to time "
@@ -501,7 +501,7 @@ TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
const uint16_t max_merge_size = 2;
MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2, 3)), max_merge_size);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
// Must include missing node 0 and not just 2 existing replicas
EXPECT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 53773a55826..735666e5c89 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -76,7 +76,7 @@ public:
getDistributorBucketSpace(),
msg,
metrics().puts);
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
}
const document::DocumentType& doc_type() const {
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index 971ff36c833..68d86884036 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "dummy_cluster_context.h"
-#include <tests/common/dummystoragelink.h>
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -37,9 +36,9 @@ TEST_F(RemoveBucketOperationTest, simple) {
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
- toVector<uint16_t>(1,2)));
+ toVector<uint16_t>(1,2)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ("Delete bucket => 1,"
@@ -71,7 +70,7 @@ TEST_F(RemoveBucketOperationTest, bucket_info_mismatch_failure) {
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(1)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true));
ASSERT_EQ(1, _sender.commands().size());
@@ -106,7 +105,7 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) {
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(1)));
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true));
ASSERT_EQ(1, _sender.commands().size());
diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp
index 889b5c833af..b19a448199b 100644
--- a/storage/src/tests/distributor/removelocationtest.cpp
+++ b/storage/src/tests/distributor/removelocationtest.cpp
@@ -35,7 +35,7 @@ struct RemoveLocationOperationTest : Test, DistributorStripeTestUtil {
msg,
metrics().removelocations);
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
}
};
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index b3104cae623..ed24b7271b8 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -1,6 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <iomanip>
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -41,7 +40,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
msg,
metrics().removes);
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
}
void replyToMessage(RemoveOperation& callback,
@@ -55,7 +54,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
std::shared_ptr<api::StorageMessage> msg2 = _sender.command(index);
auto* removec = dynamic_cast<api::RemoveCommand*>(msg2.get());
std::unique_ptr<api::StorageReply> reply(removec->makeReply());
- auto* removeR = static_cast<api::RemoveReply*>(reply.get());
+ auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get());
removeR->setOldTimestamp(oldTimestamp);
callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release()));
}
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index 1e951029994..edb392d9532 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -65,7 +65,7 @@ TEST_F(SplitOperationTest, simple) {
splitByteSize);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
{
ASSERT_EQ(1, _sender.commands().size());
@@ -134,7 +134,7 @@ TEST_F(SplitOperationTest, multi_node_failure) {
splitByteSize);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
{
ASSERT_EQ(2, _sender.commands().size());
@@ -218,7 +218,7 @@ TEST_F(SplitOperationTest, copy_trusted_status_not_carried_over_after_split) {
splitByteSize);
op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ(3, _sender.commands().size());
diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp
index e3323d601df..ec0165dde05 100644
--- a/storage/src/tests/distributor/statoperationtest.cpp
+++ b/storage/src/tests/distributor/statoperationtest.cpp
@@ -35,7 +35,7 @@ TEST_F(StatOperationTest, bucket_info) {
std::make_shared<api::StatBucketCommand>(
makeDocumentBucket(document::BucketId(16, 5)), ""));
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ("Statbucket => 0,Statbucket => 1", _sender.getCommands(true));
@@ -76,7 +76,7 @@ TEST_F(StatOperationTest, bucket_list) {
getIdealStateManager(),
node_context().node_index(),
msg);
- op.start(_sender, framework::MilliSecTime(0));
+ op.start(_sender);
ASSERT_EQ(1, _sender.replies().size());
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 6eb15bf05e7..579fd156962 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -154,7 +154,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
return cb;
}
@@ -342,7 +342,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
TEST_F(TwoPhaseUpdateOperationTest, simple) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0", _sender.getCommands(true));
@@ -359,7 +359,7 @@ TEST_F(TwoPhaseUpdateOperationTest, simple) {
TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)",
@@ -371,7 +371,7 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0", _sender.getCommands(true));
@@ -386,7 +386,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -413,7 +413,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) {
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -434,7 +434,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found)
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_error) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -451,7 +451,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_err
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -473,7 +473,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error)
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -501,7 +501,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error)
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_started) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -526,7 +526,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_st
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsistent_split) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3", UpdateOptions().makeInconsistentSplit(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
std::string wanted("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x440000000000cac4), id:ns:testdoctype1::1) => 0");
@@ -567,7 +567,7 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_update) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0", _sender.getCommands(true));
@@ -578,7 +578,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_upd
TEST_F(TwoPhaseUpdateOperationTest, n_of_m) {
setup_stripe(2, 2, "storage:2 distributor:1", 1);
auto cb = sendUpdate("0=1/2/3,1=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
@@ -606,7 +606,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document)
setup_stripe(3, 3, "storage:3 distributor:1");
// 0,1 in sync. 2 out of sync.
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 2",
@@ -639,7 +639,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document)
TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_all_empty_gets) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 0, false);
@@ -670,7 +670,7 @@ TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_a
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 0, false);
@@ -697,7 +697,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) {
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
@@ -717,7 +717,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setup_stripe(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError());
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 50);
@@ -737,7 +737,7 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
"Reasons to start: => 0,"
@@ -766,7 +766,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_time
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().timestampToUpdate(1234));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 100);
@@ -786,7 +786,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_time
TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings_to_gets_and_puts) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
checkMessageSettingsPropagatedTo(_sender.command(0));
@@ -805,7 +805,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings
TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replies) {
setup_stripe(3, 3, "storage:3 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
@@ -832,7 +832,7 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
setup_stripe(2, 2, "storage:2 distributor:1");
// Update towards inconsistent bucket invokes safe path.
auto cb = sendUpdate("0=1/2/3,1=2/3/4");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -875,7 +875,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
// Newest doc has headerval==110, not 120.
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
@@ -894,7 +894,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_up
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==110"));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
@@ -904,7 +904,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.san==fran...cisco"));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
@@ -924,7 +924,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_w
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("langbein.headerval=1234"));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
replyToGet(*cb, _sender, 0, 100);
replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
@@ -943,7 +943,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
// Both Gets return nothing at all, nothing at all.
replyToGet(*cb, _sender, 0, 100, false);
replyToGet(*cb, _sender, 1, 110, false);
@@ -964,7 +964,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_aut
.condition("testdoctype1.headerval==120")
.createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
replyToGet(*cb, _sender, 0, 0, false);
replyToGet(*cb, _sender, 1, 0, false);
ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
@@ -999,7 +999,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
setup_stripe(1, 1, "storage:1 distributor:1");
// Only 1 replica; consistent with itself by definition.
auto cb = sendUpdate("0=1/2/3");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Update => 0", _sender.getCommands(true));
// Close the operation. This should generate a single reply that is
@@ -1017,7 +1017,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
setup_stripe(2, 2, "storage:2 distributor:1");
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Closing the operation should now only return an ABORTED reply for
@@ -1037,7 +1037,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_re
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1065,7 +1065,7 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1086,7 +1086,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
@@ -1112,7 +1112,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_foun
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
replyToGet(*cb, _sender, 0, Timestamp(0), false);
@@ -1131,7 +1131,7 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_no_initial_replic
// No replicas, technically consistent but cannot use fast path.
auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0",
_sender.getCommands(true));
}
@@ -1146,7 +1146,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
@@ -1156,7 +1156,7 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency
TEST_F(TwoPhaseUpdateOperationTest, operation_is_rejected_in_safe_path_if_feed_is_blocked) {
set_up_distributor_with_feed_blocked_state();
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas to trigger safe path
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
@@ -1275,7 +1275,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_
cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
// Add new replica to deterministic test bucket after gets have been sent
BucketId bucket(0x400000000000cac4); // Always the same in the test.
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
@@ -1299,7 +1299,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_cannot_restart_in_fast_path) {
cfg->set_update_fast_path_restart_enabled(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
reply_to_metadata_get(*cb, _sender, 0, 1000U);
@@ -1360,7 +1360,7 @@ TEST_F(ThreePhaseUpdateTest, safe_mode_is_implicitly_triggered_if_no_replicas_ex
cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
configure_stripe(cfg);
auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
"Reasons to start: => 0,"
@@ -1419,7 +1419,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_tombstone_is_no_op_without_auto_cre
cfg->set_update_fast_path_restart_enabled(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4");
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
reply_to_metadata_get(*cb, _sender, 0, 1000U);
@@ -1443,7 +1443,7 @@ TEST_F(ThreePhaseUpdateTest, single_full_get_tombstone_sends_puts_with_auto_crea
cfg->set_update_fast_path_restart_enabled(true);
configure_stripe(cfg);
auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
- cb->start(_sender, framework::MilliSecTime(0));
+ cb->start(_sender);
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
reply_to_metadata_get(*cb, _sender, 0, 1000U);
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index f0cb30368cb..d0ae31b9524 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -1,6 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/dummystoragelink.h>
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/repo/documenttyperepo.h>
@@ -91,7 +90,7 @@ TEST_F(UpdateOperationTest, simple) {
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(sender);
ASSERT_EQ("Update => 0", sender.getCommands(true));
@@ -110,7 +109,7 @@ TEST_F(UpdateOperationTest, not_found) {
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(sender);
ASSERT_EQ("Update => 0", sender.getCommands(true));
@@ -125,7 +124,7 @@ TEST_F(UpdateOperationTest, multi_node) {
setup_stripe(2, 2, "distributor:1 storage:2");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(sender);
ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
@@ -149,7 +148,7 @@ TEST_F(UpdateOperationTest, multi_node_inconsistent_timestamp) {
setup_stripe(2, 2, "distributor:1 storage:2");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(sender);
ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
@@ -169,7 +168,7 @@ TEST_F(UpdateOperationTest, test_and_set_failures_increment_tas_metric) {
setup_stripe(2, 2, "distributor:1 storage:1");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3"));
DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(sender);
ASSERT_EQ("Update => 0", sender.getCommands(true));
api::ReturnCode result(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, "bork bork");
replyToMessage(*cb, sender, 0, 1234, api::BucketInfo(), result);
@@ -198,7 +197,7 @@ TEST_F(UpdateOperationTest, create_if_missing_update_sentinel_timestamp_is_treat
setup_stripe(2, 2, "distributor:1 storage:2");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3", true));
DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(sender);
ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
@@ -220,7 +219,7 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest
setup_stripe(2, 3, "distributor:1 storage:3");
std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3", true));
DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(sender);
ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true));
replyToMessage(*cb, sender, 0, 100); // Newly created
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index 6c597b620dd..ecfa7232def 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -41,7 +41,7 @@ struct VisitorOperationTest : Test, DistributorStripeTestUtil {
document::BucketId nullId;
VisitorOperation::Config defaultConfig;
- api::CreateVisitorCommand::SP
+ static api::CreateVisitorCommand::SP
createVisitorCommand(std::string instanceId,
document::BucketId superBucket,
document::BucketId lastBucket,
@@ -122,7 +122,7 @@ struct VisitorOperationTest : Test, DistributorStripeTestUtil {
*/
std::string runEmptyVisitor(api::CreateVisitorCommand::SP msg) {
auto op = createOpWithDefaultConfig(std::move(msg));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
return _sender.getLastReply();
}
@@ -178,7 +178,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState)
auto op = createOpWithDefaultConfig(std::move(msg));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -228,7 +228,7 @@ TEST_F(VisitorOperationTest, shutdown) {
auto op = createOpWithDefaultConfig(std::move(msg));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -296,7 +296,7 @@ TEST_F(VisitorOperationTest, no_resend_after_timeout_passed) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20ms));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -346,7 +346,7 @@ TEST_F(VisitorOperationTest, user_single_bucket) {
"dumpvisitor",
"true"));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)) << _sender.getLastReply();
sendReply(*op);
@@ -371,7 +371,7 @@ VisitorOperationTest::runVisitor(document::BucketId id,
"dumpvisitor",
"true"));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
sendReply(*op);
@@ -437,7 +437,7 @@ TEST_F(VisitorOperationTest, bucket_removed_while_visitor_pending) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("removefrombucketdb", id, nullId));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -459,7 +459,7 @@ TEST_F(VisitorOperationTest, empty_buckets_visited_when_visiting_removes) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("emptybucket", id, nullId, 8, 500ms, false, true));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
// Since visitRemoves is true, the empty bucket will be visited
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -474,7 +474,7 @@ TEST_F(VisitorOperationTest, resend_to_other_storage_node_on_failure) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("emptyinconsistent", id, nullId));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -502,7 +502,7 @@ TEST_F(VisitorOperationTest, timeout_only_after_reply_from_all_storage_nodes) {
nullId,
8));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0,Visitor Create => 1",
_sender.getCommands(true));
@@ -539,7 +539,7 @@ TEST_F(VisitorOperationTest, timeout_does_not_override_critical_error) {
8,
500ms)); // ms timeout
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0,Visitor Create => 1",
_sender.getCommands(true));
@@ -614,7 +614,7 @@ TEST_F(VisitorOperationTest, bucket_high_bit_count) {
"dumpvisitor",
"true"));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
EXPECT_EQ("Visitor Create => 0", _sender.getCommands(true));
}
@@ -640,7 +640,7 @@ TEST_F(VisitorOperationTest, bucket_low_bit_count) {
"dumpvisitor",
"true"));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
"ReturnCode(WRONG_DISTRIBUTION, distributor:1 storage:1)",
_sender.getLastReply());
@@ -661,7 +661,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node) {
createVisitorCommand("multiplebuckets", id, nullId, 31),
VisitorOperation::Config(1, 4));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0,Visitor Create => 0,"
"Visitor Create => 0,Visitor Create => 0",
@@ -708,7 +708,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node) {
createVisitorCommand("multiplebuckets", id, document::BucketId(0x54000000000f0001), 31),
VisitorOperation::Config(minBucketsPerVisitor, maxVisitorsPerNode));
- op2->start(_sender, framework::MilliSecTime(0));
+ op2->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -736,7 +736,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_resend_only_failing) {
createVisitorCommand("multiplebuckets", id, nullId, 31),
VisitorOperation::Config(minBucketsPerVisitor, maxVisitorsPerNode));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0,Visitor Create => 0,"
"Visitor Create => 0,Visitor Create => 0",
@@ -775,7 +775,7 @@ TEST_F(VisitorOperationTest, parallel_visitors_to_one_storage_node_one_super_buc
createVisitorCommand("multiplebucketsonesuper", id, nullId),
VisitorOperation::Config(5, 4));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -833,7 +833,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) {
createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500ms, true),
VisitorOperation::Config(5, 4));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 1", _sender.getCommands(true));
@@ -858,7 +858,7 @@ TEST_F(VisitorOperationTest, visit_ideal_node) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("multinode", id, nullId, 8));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -889,7 +889,7 @@ TEST_F(VisitorOperationTest, no_resending_on_critical_failure) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("multinodefailurecritical", id, nullId, 8));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -913,7 +913,7 @@ TEST_F(VisitorOperationTest, failure_on_all_nodes) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("multinodefailurecritical", id, nullId, 8));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
@@ -992,7 +992,7 @@ VisitorOperationTest::startOperationWith2StorageNodeVisitors(bool inconsistent)
500ms,
inconsistent));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
assert(_sender.getCommands(true) == "Visitor Create => 0,Visitor Create => 1");
return op;
@@ -1043,7 +1043,7 @@ TEST_F(VisitorOperationTest, queue_timeout_is_factor_of_total_timeout) {
auto op = createOpWithDefaultConfig(
createVisitorCommand("foo", id, nullId, 8, 10000ms));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0));
@@ -1061,7 +1061,7 @@ VisitorOperationTest::do_visitor_roundtrip_with_statistics(
auto op = createOpWithDefaultConfig(
createVisitorCommand("metricstats", id, nullId));
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0));
auto reply = cmd.makeReply();
@@ -1109,7 +1109,7 @@ TEST_F(VisitorOperationTest, assigning_put_lock_access_token_sets_special_visito
auto op = createOpWithDefaultConfig(createVisitorCommand("metricstats", id, nullId));
op->assign_put_lock_access_token("its-a me, mario");
- op->start(_sender, framework::MilliSecTime(0));
+ op->start(_sender);
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto cmd = std::dynamic_pointer_cast<api::CreateVisitorCommand>(_sender.command(0));
ASSERT_TRUE(cmd);
diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp
index 81512393c5b..7b7c9f431f7 100644
--- a/storage/src/vespa/storage/distributor/operationowner.cpp
+++ b/storage/src/vespa/storage/distributor/operationowner.cpp
@@ -31,7 +31,7 @@ OperationOwner::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
std::shared_ptr<Operation> cb = _sentMessageMap.pop(reply->getMsgId());
- if (cb.get() != 0) {
+ if (cb) {
Sender sender(*this, _sender, cb);
cb->receive(sender, reply);
return true;
@@ -41,13 +41,11 @@ OperationOwner::handleReply(const std::shared_ptr<api::StorageReply>& reply)
}
bool
-OperationOwner::start(const std::shared_ptr<Operation>& operation,
- Priority priority)
+OperationOwner::start(const std::shared_ptr<Operation>& operation, Priority)
{
- (void) priority;
LOG(spam, "Starting operation %s", operation->toString().c_str());
Sender sender(*this, _sender, operation);
- operation->start(sender, _clock.getTimeInMillis());
+ operation->start(sender, _clock.getSystemTime());
return true;
}
@@ -63,7 +61,7 @@ OperationOwner::onClose()
while (true) {
std::shared_ptr<Operation> cb = _sentMessageMap.pop();
- if (cb.get()) {
+ if (cb) {
Sender sender(*this, _sender, std::shared_ptr<Operation>());
cb->onClose(sender);
} else {
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 2acd6068e1a..55fe2e039e1 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -203,7 +203,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorStripeMessageSender& sen
(_node_ctx, _op_ctx, _bucketSpace, _updateCmd, std::move(entries), _updateMetric);
UpdateOperation & op = *updateOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
- op.start(intermediate, _node_ctx.clock().getTimeInMillis());
+ op.start(intermediate, _node_ctx.clock().getSystemTime());
transitionTo(SendState::UPDATES_SENT);
if (intermediate._reply.get()) {
@@ -223,7 +223,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorStripeMessageSender& sen
GetOperation& op = *get_operation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(get_operation), sender);
_replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time
- op.start(intermediate, _node_ctx.clock().getTimeInMillis());
+ op.start(intermediate, _node_ctx.clock().getSystemTime());
transitionTo(_use_initial_cheap_metadata_fetch_phase
? SendState::METADATA_GETS_SENT
@@ -322,7 +322,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric);
PutOperation & op = *putOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender);
- op.start(intermediate, _node_ctx.clock().getTimeInMillis());
+ op.start(intermediate, _node_ctx.clock().getSystemTime());
transitionTo(SendState::PUTS_SENT);
LOG(debug, "Update(%s): sending Puts at timestamp %" PRIu64, update_doc_id().c_str(), putTimestamp);
@@ -601,8 +601,7 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
_bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries);
std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now;
- for (uint32_t j = 0; j < entries.size(); ++j) {
- const auto& e = entries[j];
+ for (const auto & e : entries) {
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
const auto& copy = e->getNodeRef(i);
replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode());
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index a48fb53a7ce..4d82de170ae 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -12,7 +12,7 @@ LOG_SETUP(".distributor.callback");
namespace storage::distributor {
Operation::Operation()
- : _startTime(0)
+ : _startTime()
{
}
@@ -21,19 +21,23 @@ Operation::~Operation() = default;
std::string
Operation::getStatus() const
{
- return vespalib::make_string("%s (started %s)",
- getName(), _startTime.toString().c_str());
+ return vespalib::make_string("%s (started %s)", getName(), vespalib::to_string(_startTime).c_str());
}
void
-Operation::start(DistributorStripeMessageSender& sender,
- framework::MilliSecTime startTime)
+Operation::start(DistributorStripeMessageSender& sender, vespalib::system_time startTime)
{
_startTime = startTime;
onStart(sender);
}
void
+Operation::start(DistributorStripeMessageSender& sender)
+{
+ start(sender, vespalib::system_time());
+}
+
+void
Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCommand& target)
{
target.getTrace().setLevel(source.getTrace().getLevel());
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index e24aa976221..8bb81b8d365 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -45,24 +45,25 @@ public:
onReceive(sender, msg);
}
- virtual const char* getName() const noexcept = 0;
+ [[nodiscard]] virtual const char* getName() const noexcept = 0;
- virtual std::string getStatus() const;
+ [[nodiscard]] virtual std::string getStatus() const;
- virtual std::string toString() const {
+ [[nodiscard]] virtual std::string toString() const {
return std::string(getName());
}
/**
Starts the callback, sending any messages etc. Sets _startTime to current time
*/
- virtual void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime);
+ virtual void start(DistributorStripeMessageSender& sender, vespalib::system_time startTime);
+ void start(DistributorStripeMessageSender& sender);
/**
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
- virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const {
+ [[nodiscard]] virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const {
return false;
}
@@ -77,11 +78,6 @@ public:
virtual void on_throttled();
/**
- Returns the timestamp on which the first message was sent from this callback.
- */
- framework::MilliSecTime getStartTime() const { return _startTime; }
-
- /**
Transfers message settings such as priority, timeout, etc. from one message to another.
*/
static void copyMessageSettings(const api::StorageCommand& source,
@@ -97,7 +93,7 @@ private:
const std::shared_ptr<api::StorageReply> & msg) = 0;
protected:
- framework::MilliSecTime _startTime;
+ vespalib::system_time _startTime;
};
}
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
index a0613c60fa4..8b6ade7e7d1 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
@@ -13,9 +13,9 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow
class ThrottlingOperation : public Operation
{
public:
- ThrottlingOperation(const Operation::SP& operation,
+ ThrottlingOperation(Operation::SP operation,
ThrottlingOperationStarter& operationStarter)
- : _operation(operation),
+ : _operation(std::move(operation)),
_operationStarter(operationStarter)
{}
@@ -30,24 +30,21 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow
void onClose(DistributorStripeMessageSender& sender) override {
_operation->onClose(sender);
}
- const char* getName() const noexcept override {
+ [[nodiscard]] const char* getName() const noexcept override {
return _operation->getName();
}
- std::string getStatus() const override {
+ [[nodiscard]] std::string getStatus() const override {
return _operation->getStatus();
}
- std::string toString() const override {
+ [[nodiscard]] std::string toString() const override {
return _operation->toString();
}
- void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime) override {
+ void start(DistributorStripeMessageSender& sender, vespalib::system_time startTime) override {
_operation->start(sender, startTime);
}
void receive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override {
_operation->receive(sender, msg);
}
- framework::MilliSecTime getStartTime() const {
- return _operation->getStartTime();
- }
void onStart(DistributorStripeMessageSender&) override {
// Should never be called directly on the throttled operation
// instance, but rather on its wrapped implementation.
@@ -61,7 +58,7 @@ class ThrottlingOperationStarter : public OperationStarter, public PendingWindow
OperationStarter& _starterImpl;
public:
- ThrottlingOperationStarter(OperationStarter& starterImpl)
+ explicit ThrottlingOperationStarter(OperationStarter& starterImpl)
: _starterImpl(starterImpl),
_minPending(0),
_maxPending(UINT32_MAX),
@@ -71,9 +68,9 @@ public:
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
- bool may_allow_operation_with_priority(Priority priority) const noexcept override;
+ [[nodiscard]] bool may_allow_operation_with_priority(Priority priority) const noexcept override;
- bool canStart(uint32_t currentOperationCount, Priority priority) const;
+ [[nodiscard]] bool canStart(uint32_t currentOperationCount, Priority priority) const;
void setMaxPendingRange(uint32_t minPending, uint32_t maxPending) {
_minPending = minPending;