summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-12-05 08:07:39 -0800
committerGitHub <noreply@github.com>2019-12-05 08:07:39 -0800
commita20c602ab5a9ca89c80c0c7da71d623270aef8fc (patch)
treeca44b39db90c505152eaeaa92fc5d911a06bfb1a /storage
parent292bcb689af5faa739a5703f6add35cb229dcef6 (diff)
parent8c5597afbaf6cda6d47c49dfd22877dc70456330 (diff)
Merge pull request #11507 from vespa-engine/balder/use-duration-in-messagebus-and-storageapi-rebased-1
timeout as duration
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp18
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp4
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp36
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp4
-rw-r--r--storage/src/tests/storageserver/documentapiconvertertest.cpp4
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp8
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp4
-rw-r--r--storage/src/tests/visiting/commandqueuetest.cpp79
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp54
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp31
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.h10
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.h2
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/documentapiconverter.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp9
-rw-r--r--storage/src/vespa/storage/visiting/commandqueue.h6
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp47
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h2
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp2
27 files changed, 170 insertions, 194 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index d882d17841e..bd76b559490 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -47,7 +47,7 @@ public:
close();
}
- document::BucketId createAndSendSampleDocument(uint32_t timeout);
+ document::BucketId createAndSendSampleDocument(vespalib::duration timeout);
std::string getNodes(const std::string& infoString);
void sendReply(int idx = -1,
@@ -96,7 +96,7 @@ public:
PutOperationTest::~PutOperationTest() = default;
document::BucketId
-PutOperationTest::createAndSendSampleDocument(uint32_t timeout) {
+PutOperationTest::createAndSendSampleDocument(vespalib::duration timeout) {
auto doc = std::make_shared<Document>(doc_type(), DocumentId("id:test:testdoctype1::"));
document::BucketId id = getExternalOperationHandler().getBucketId(doc->getId());
@@ -119,9 +119,11 @@ using RequirePrimaryWritten = bool;
}
+const vespalib::duration TIMEOUT = 180ms;
+
TEST_F(PutOperationTest, simple) {
setupDistributor(1, 1, "storage:1 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
"id:test:testdoctype1::, timestamp 100, size 45) => 0",
@@ -182,7 +184,7 @@ TEST_F(PutOperationTest, do_not_send_inline_split_if_not_configured) {
TEST_F(PutOperationTest, node_removed_on_reply) {
setupDistributor(2, 2, "storage:2 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
"id:test:testdoctype1::, timestamp 100, size 45) => 0,"
@@ -206,7 +208,7 @@ TEST_F(PutOperationTest, node_removed_on_reply) {
TEST_F(PutOperationTest, storage_failed) {
setupDistributor(2, 1, "storage:1 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
sendReply(-1, api::ReturnCode::INTERNAL_FAILURE);
@@ -334,7 +336,7 @@ TEST_F(PutOperationTest, do_not_revert_on_failure_after_early_return) {
TEST_F(PutOperationTest, revert_successful_copies_when_one_fails) {
setupDistributor(3, 4, "storage:4 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put => 0,Put => 2,Put => 1", _sender.getCommands(true));
@@ -359,7 +361,7 @@ TEST_F(PutOperationTest, no_revert_if_revert_disabled) {
SetUp();
setupDistributor(3, 4, "storage:4 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put => 0,Put => 2,Put => 1", _sender.getCommands(true));
@@ -404,7 +406,7 @@ TEST_F(PutOperationTest, do_not_send_CreateBucket_if_already_pending) {
TEST_F(PutOperationTest, no_storage_nodes) {
setupDistributor(2, 1, "storage:0 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), "
"timestamp 100) ReturnCode(NOT_CONNECTED, "
"Can't store document: No storage nodes available)",
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 67ef3374633..788ac1960dd 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -266,7 +266,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
makeDocumentBucket(document::BucketId(0)), update, api::Timestamp(0)));
// Misc settings for checking that propagation works.
msg->getTrace().setLevel(6);
- msg->setTimeout(6789);
+ msg->setTimeout(6789ms);
msg->setPriority(99);
if (options._timestampToUpdate) {
msg->setOldTimestamp(options._timestampToUpdate);
@@ -517,7 +517,7 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
{
// Settings set in sendUpdate().
EXPECT_EQ(6, msg->getTrace().getLevel());
- EXPECT_EQ(6789, msg->getTimeout());
+ EXPECT_EQ(6789ms, msg->getTimeout());
EXPECT_EQ(99, msg->getPriority());
}
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index 3bb86eaebd9..5d7871376cb 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -47,7 +47,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil {
document::BucketId superBucket,
document::BucketId lastBucket,
uint32_t maxBuckets = 8,
- uint32_t timeoutMS = 500,
+ vespalib::duration timeout = 500ms,
bool visitInconsistentBuckets = false,
bool visitRemoves = false,
std::string libraryName = "dumpvisitor",
@@ -69,7 +69,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil {
cmd->setMaximumPendingReplyCount(VisitorOperationTest::MAX_PENDING);
cmd->setMaxBucketsPerVisitor(maxBuckets);
- cmd->setTimeout(timeoutMS);
+ cmd->setTimeout(timeout);
if (visitInconsistentBuckets) {
cmd->setVisitInconsistentBuckets();
}
@@ -178,7 +178,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState)
msg->addBucketToBeVisited(nullId);
msg->setFieldSet("[header]");
msg->setVisitRemoves();
- msg->setTimeout(1234);
+ msg->setTimeout(1234ms);
msg->getTrace().setLevel(7);
auto op = createOpWithDefaultConfig(std::move(msg));
@@ -203,7 +203,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState)
EXPECT_GT(cvc->getToTime(), 0);
EXPECT_EQ("[header]", cvc->getFieldSet());
EXPECT_TRUE(cvc->visitRemoves());
- EXPECT_EQ(1234, cvc->getTimeout());
+ EXPECT_EQ(1234ms, cvc->getTimeout());
EXPECT_EQ(7, cvc->getTrace().getLevel());
sendReply(*op);
@@ -285,7 +285,7 @@ TEST_F(VisitorOperationTest, no_resend_after_timeout_passed) {
addNodesToBucketDB(id, "0=1/1/1/t,1=1/1/1/t");
auto op = createOpWithDefaultConfig(
- createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20));
+ createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20ms));
op->start(_sender, framework::MilliSecTime(0));
@@ -331,7 +331,7 @@ TEST_F(VisitorOperationTest, user_single_bucket) {
userid,
nullId,
8,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -356,7 +356,7 @@ VisitorOperationTest::runVisitor(document::BucketId id,
id,
lastId,
maxBuckets,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -448,13 +448,7 @@ TEST_F(VisitorOperationTest, empty_buckets_visited_when_visiting_removes) {
addNodesToBucketDB(id, "0=0/0/0/1/2/t");
auto op = createOpWithDefaultConfig(
- createVisitorCommand("emptybucket",
- id,
- nullId,
- 8,
- 500,
- false,
- true));
+ createVisitorCommand("emptybucket", id, nullId, 8, 500ms, false, true));
op->start(_sender, framework::MilliSecTime(0));
@@ -534,7 +528,7 @@ TEST_F(VisitorOperationTest, timeout_does_not_override_critical_error) {
document::BucketId(16, 1),
nullId,
8,
- 500)); // ms timeout
+ 500ms)); // ms timeout
op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0,Visitor Create => 1",
@@ -607,7 +601,7 @@ TEST_F(VisitorOperationTest, bucket_high_bit_count) {
id,
nullId,
8,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -633,7 +627,7 @@ TEST_F(VisitorOperationTest, bucket_low_bit_count) {
id,
nullId,
8,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -829,7 +823,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) {
_sender.clear();
auto op = createOpWithConfig(
- createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500, true),
+ createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500ms, true),
VisitorOperation::Config(5, 4));
op->start(_sender, framework::MilliSecTime(0));
@@ -988,7 +982,7 @@ VisitorOperationTest::startOperationWith2StorageNodeVisitors(bool inconsistent)
id,
nullId,
8,
- 500,
+ 500ms,
inconsistent));
op->start(_sender, framework::MilliSecTime(0));
@@ -1040,13 +1034,13 @@ TEST_F(VisitorOperationTest, queue_timeout_is_factor_of_total_timeout) {
addNodesToBucketDB(id, "0=1/1/1/t,1=1/1/1/t");
auto op = createOpWithDefaultConfig(
- createVisitorCommand("foo", id, nullId, 8, 10000));
+ createVisitorCommand("foo", id, nullId, 8, 10000ms));
op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0));
- EXPECT_EQ(5000, cmd.getQueueTimeout());
+ EXPECT_EQ(5000ms, cmd.getQueueTimeout());
}
void
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index f907d0496e6..64306fa7c24 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -858,7 +858,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(0);
- cmd->setTimeout(50);
+ cmd->setTimeout(50ms);
filestorHandler.schedule(cmd, 0);
}
@@ -867,7 +867,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(200);
- cmd->setTimeout(10000);
+ cmd->setTimeout(10000ms);
filestorHandler.schedule(cmd, 0);
}
diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp
index 16b43828120..bc52d7508dc 100644
--- a/storage/src/tests/storageserver/documentapiconvertertest.cpp
+++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp
@@ -195,7 +195,7 @@ TEST_F(DocumentApiConverterTest, create_visitor) {
EXPECT_EQ("myinstance", cmd->getInstanceId());
EXPECT_EQ("control-dest", cmd->getControlDestination());
EXPECT_EQ("data-dest", cmd->getDataDestination());
- EXPECT_EQ(123456u, cmd->getTimeout());
+ EXPECT_EQ(123456ms, cmd->getTimeout());
auto msg = toDocumentAPI<documentapi::CreateVisitorMessage>(*cmd);
EXPECT_EQ(defaultSpaceName, msg->getBucketSpace());
@@ -210,7 +210,7 @@ TEST_F(DocumentApiConverterTest, create_visitor_high_timeout) {
EXPECT_EQ("myinstance", cmd->getInstanceId());
EXPECT_EQ("control-dest", cmd->getControlDestination());
EXPECT_EQ("data-dest", cmd->getDataDestination());
- EXPECT_EQ(std::numeric_limits<int32_t>::max(), cmd->getTimeout());
+ EXPECT_EQ(std::numeric_limits<int32_t>::max(), vespalib::count_ms(cmd->getTimeout()));
}
TEST_F(DocumentApiConverterTest, create_visitor_reply_not_ready) {
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 30ad9b58e9f..178862d8393 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -267,7 +267,7 @@ TEST_F(MergeThrottlerTest, chain) {
}
auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123);
cmd->setPriority(7);
- cmd->setTimeout(54321);
+ cmd->setTimeout(54321ms);
StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
cmd->setAddress(address);
const uint16_t distributorIndex = 123;
@@ -306,7 +306,7 @@ TEST_F(MergeThrottlerTest, chain) {
// Ensure priority, cluster state version and timeout is correctly forwarded
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
}
_topLinks[lastNodeIdx]->sendDown(fwd);
@@ -332,7 +332,7 @@ TEST_F(MergeThrottlerTest, chain) {
}
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
_topLinks[executorNode]->sendDown(fwd);
}
@@ -359,7 +359,7 @@ TEST_F(MergeThrottlerTest, chain) {
fwd = _bottomLinks[executorNode]->getAndRemoveMessage(MessageType::MERGEBUCKET);
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
auto reply = std::make_shared<MergeBucketReply>(dynamic_cast<const MergeBucketCommand&>(*fwd));
reply->setResult(ReturnCode(ReturnCode::OK, "Great success! :D-|-<"));
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index c2074c53dd7..f88b59f50a5 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -246,7 +246,7 @@ void StateManagerTest::mark_reported_node_state_up() {
void StateManagerTest::send_down_get_node_state_request(uint16_t controller_index) {
auto cmd = std::make_shared<api::GetNodeStateCommand>(
std::make_unique<NodeState>(NodeType::STORAGE, State::UP));
- cmd->setTimeout(10000000);
+ cmd->setTimeout(10000000ms);
cmd->setSourceIndex(controller_index);
_upper->sendDown(cmd);
}
@@ -320,7 +320,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat
force_current_cluster_state_version(12345);
auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
- cmd->setTimeout(10000000);
+ cmd->setTimeout(10000000ms);
cmd->setSourceIndex(0);
_upper->sendDown(cmd);
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp
index c152e4c5191..359a242ff5d 100644
--- a/storage/src/tests/visiting/commandqueuetest.cpp
+++ b/storage/src/tests/visiting/commandqueuetest.cpp
@@ -16,14 +16,13 @@ namespace storage {
namespace {
std::shared_ptr<api::CreateVisitorCommand> getCommand(
- vespalib::stringref name, int timeout,
+ vespalib::stringref name, vespalib::duration timeout,
uint8_t priority = 0)
{
vespalib::asciistream ost;
- ost << name << " t=" << timeout << " p=" << static_cast<unsigned int>(priority);
+ ost << name << " t=" << vespalib::count_ms(timeout) << " p=" << static_cast<unsigned int>(priority);
// Piggyback name in document selection
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "", "", ost.str()));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "", "", ost.str());
cmd->setQueueTimeout(timeout);
cmd->setPriority(priority);
return cmd;
@@ -43,13 +42,13 @@ TEST(CommandQueueTest, fifo) {
ASSERT_TRUE(queue.empty());
// Use all default priorities, meaning what comes out should be in the same order
// as what went in
- queue.add(getCommand("first", 1));
- queue.add(getCommand("second", 10));
- queue.add(getCommand("third", 5));
- queue.add(getCommand("fourth", 0));
- queue.add(getCommand("fifth", 3));
- queue.add(getCommand("sixth", 14));
- queue.add(getCommand("seventh", 7));
+ queue.add(getCommand("first", 1ms));
+ queue.add(getCommand("second", 10ms));
+ queue.add(getCommand("third", 5ms));
+ queue.add(getCommand("fourth", 0ms));
+ queue.add(getCommand("fifth", 3ms));
+ queue.add(getCommand("sixth", 14ms));
+ queue.add(getCommand("seventh", 7ms));
ASSERT_FALSE(queue.empty());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
@@ -74,16 +73,16 @@ TEST(CommandQueueTest, fifo_with_priorities) {
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 1, 10));
+ queue.add(getCommand("first", 1ms, 10));
EXPECT_EQ("first t=1 p=10", getCommandString(queue.peekLowestPriorityCommand()));
- queue.add(getCommand("second", 10, 22));
- queue.add(getCommand("third", 5, 9));
+ queue.add(getCommand("second", 10ms, 22));
+ queue.add(getCommand("third", 5ms, 9));
EXPECT_EQ("second t=10 p=22", getCommandString(queue.peekLowestPriorityCommand()));
- queue.add(getCommand("fourth", 0, 22));
- queue.add(getCommand("fifth", 3, 22));
+ queue.add(getCommand("fourth", 0ms, 22));
+ queue.add(getCommand("fifth", 3ms, 22));
EXPECT_EQ("fifth t=3 p=22", getCommandString(queue.peekLowestPriorityCommand()));
- queue.add(getCommand("sixth", 14, 50));
- queue.add(getCommand("seventh", 7, 0));
+ queue.add(getCommand("sixth", 14ms, 50));
+ queue.add(getCommand("seventh", 7ms, 0));
EXPECT_EQ("sixth t=14 p=50", getCommandString(queue.peekLowestPriorityCommand()));
@@ -111,19 +110,19 @@ TEST(CommandQueueTest, release_oldest) {
framework::defaultimplementation::FakeClock clock(framework::defaultimplementation::FakeClock::FAKE_ABSOLUTE);
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 10));
- queue.add(getCommand("second", 100));
- queue.add(getCommand("third", 1000));
- queue.add(getCommand("fourth", 5));
- queue.add(getCommand("fifth", 3000));
- queue.add(getCommand("sixth", 400));
- queue.add(getCommand("seventh", 700));
+ queue.add(getCommand("first", 10ms));
+ queue.add(getCommand("second", 100ms));
+ queue.add(getCommand("third", 1000ms));
+ queue.add(getCommand("fourth", 5ms));
+ queue.add(getCommand("fifth", 3000ms));
+ queue.add(getCommand("sixth", 400ms));
+ queue.add(getCommand("seventh", 700ms));
ASSERT_EQ(7u, queue.size());
using CommandEntry = CommandQueue<api::CreateVisitorCommand>::CommandEntry;
std::list<CommandEntry> timedOut(queue.releaseTimedOut());
ASSERT_TRUE(timedOut.empty());
- clock.addMilliSecondsToTime(400 * 1000);
+ clock.addMilliSecondsToTime(400);
timedOut = queue.releaseTimedOut();
ASSERT_EQ(4, timedOut.size());
std::ostringstream ost;
@@ -144,13 +143,13 @@ TEST(CommandQueueTest, release_lowest_priority) {
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 1, 10));
- queue.add(getCommand("second", 10, 22));
- queue.add(getCommand("third", 5, 9));
- queue.add(getCommand("fourth", 0, 22));
- queue.add(getCommand("fifth", 3, 22));
- queue.add(getCommand("sixth", 14, 50));
- queue.add(getCommand("seventh", 7, 0));
+ queue.add(getCommand("first", 1ms, 10));
+ queue.add(getCommand("second", 10ms, 22));
+ queue.add(getCommand("third", 5ms, 9));
+ queue.add(getCommand("fourth", 0ms, 22));
+ queue.add(getCommand("fifth", 3ms, 22));
+ queue.add(getCommand("sixth", 14ms, 50));
+ queue.add(getCommand("seventh", 7ms, 0));
ASSERT_EQ(7u, queue.size());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
@@ -177,13 +176,13 @@ TEST(CommandQueueTest, delete_iterator) {
framework::defaultimplementation::FakeClock clock;
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 10));
- queue.add(getCommand("second", 100));
- queue.add(getCommand("third", 1000));
- queue.add(getCommand("fourth", 5));
- queue.add(getCommand("fifth", 3000));
- queue.add(getCommand("sixth", 400));
- queue.add(getCommand("seventh", 700));
+ queue.add(getCommand("first", 10ms));
+ queue.add(getCommand("second", 100ms));
+ queue.add(getCommand("third", 1000ms));
+ queue.add(getCommand("fourth", 5ms));
+ queue.add(getCommand("fifth", 3000ms));
+ queue.add(getCommand("sixth", 400ms));
+ queue.add(getCommand("seventh", 700ms));
ASSERT_EQ(7u, queue.size());
CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin();
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 1275372b73b..b7eb7fee3ec 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -617,7 +617,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "InvalidVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
_top->waitForMessages(i+1, 60);
}
@@ -629,7 +629,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
}
@@ -698,7 +698,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
}
@@ -730,7 +730,7 @@ TEST_F(VisitorManagerTest, abort_on_failed_visitor_info) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
}
@@ -765,7 +765,7 @@ TEST_F(VisitorManagerTest, abort_on_field_path_error) {
makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval{bogus} == 1234");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::ILLEGAL_PARAMETERS));
@@ -782,8 +782,8 @@ TEST_F(VisitorManagerTest, visitor_queue_timeout) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(1);
- cmd->setTimeout(100 * 1000 * 1000);
+ cmd->setQueueTimeout(1ms);
+ cmd->setTimeout(100 * 1000 * 1000ms);
_top->sendDown(cmd);
_node->getClock().addSecondsToTime(1000);
@@ -807,8 +807,8 @@ TEST_F(VisitorManagerTest, visitor_processing_timeout) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
- cmd->setTimeout(100);
+ cmd->setQueueTimeout(0ms);
+ cmd->setTimeout(100ms);
_top->sendDown(cmd);
// Wait for Put before increasing the clock
@@ -825,7 +825,7 @@ namespace {
uint32_t nextVisitor = 0;
api::StorageMessage::Id
-sendCreateVisitor(uint32_t timeout, DummyStorageLink& top, uint8_t priority = 127) {
+sendCreateVisitor(vespalib::duration timeout, DummyStorageLink& top, uint8_t priority = 127) {
std::ostringstream ost;
ost << "testvis" << ++nextVisitor;
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
@@ -851,25 +851,25 @@ TEST_F(VisitorManagerTest, prioritized_visitor_queing) {
// First 4 should just start..
for (uint32_t i = 0; i < 4; ++i) {
- ids[i] = sendCreateVisitor(i, *_top, i);
+ ids[i] = sendCreateVisitor(i*1ms, *_top, i);
}
// Next ones should be queued - (Better not finish before we get here)
// Submit with higher priorities
for (uint32_t i = 0; i < 4; ++i) {
- ids[i + 4] = sendCreateVisitor(1000, *_top, 100 - i);
+ ids[i + 4] = sendCreateVisitor(1000ms, *_top, 100 - i);
}
// Queue is now full with a pri 100 visitor at its end
// Send a lower pri visitor that will be busy-returned immediately
- ids[8] = sendCreateVisitor(1000, *_top, 130);
+ ids[8] = sendCreateVisitor(1000ms, *_top, 130);
uint64_t message_id = 0;
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[8], message_id);
// Send a higher pri visitor that will take the place of pri 100 visitor
- ids[9] = sendCreateVisitor(1000, *_top, 60);
+ ids[9] = sendCreateVisitor(1000ms, *_top, 60);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[4], message_id);
@@ -917,44 +917,44 @@ TEST_F(VisitorManagerTest, prioritized_max_concurrent_visitors) {
// First 4 should just start..
for (uint32_t i = 0; i < 4; ++i) {
- ids[i] = sendCreateVisitor(i, *_top, i);
+ ids[i] = sendCreateVisitor(i*1ms, *_top, i);
}
// Low pri messages; get put into queue
for (uint32_t i = 0; i < 6; ++i) {
- ids[i + 4] = sendCreateVisitor(1000, *_top, 203 - i);
+ ids[i + 4] = sendCreateVisitor(1000ms, *_top, 203 - i);
}
// Higher pri message: fits happily into 1 extra concurrent slot
- ids[10] = sendCreateVisitor(1000, *_top, 190);
+ ids[10] = sendCreateVisitor(1000ms, *_top, 190);
// Should punch pri203 msg out of the queue -> busy
- ids[11] = sendCreateVisitor(1000, *_top, 197);
+ ids[11] = sendCreateVisitor(1000ms, *_top, 197);
uint64_t message_id = 0;
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[4], message_id);
// No concurrency slots left for this message -> busy
- ids[12] = sendCreateVisitor(1000, *_top, 204);
+ ids[12] = sendCreateVisitor(1000ms, *_top, 204);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[12], message_id);
// Gets a concurrent slot
- ids[13] = sendCreateVisitor(1000, *_top, 80);
+ ids[13] = sendCreateVisitor(1000ms, *_top, 80);
// Kicks pri 202 out of the queue -> busy
- ids[14] = sendCreateVisitor(1000, *_top, 79);
+ ids[14] = sendCreateVisitor(1000ms, *_top, 79);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[5], message_id);
// Gets a concurrent slot
- ids[15] = sendCreateVisitor(1000, *_top, 63);
+ ids[15] = sendCreateVisitor(1000ms, *_top, 63);
// Very Important Visitor(tm) gets a concurrent slot
- ids[16] = sendCreateVisitor(1000, *_top, 0);
+ ids[16] = sendCreateVisitor(1000ms, *_top, 0);
std::vector<document::Document::SP> docs;
std::vector<document::DocumentId> docIds;
@@ -1018,11 +1018,11 @@ TEST_F(VisitorManagerTest, visitor_queing_zero_queue_size) {
// First 4 should just start..
for (uint32_t i = 0; i < 4; ++i) {
- sendCreateVisitor(i, *_top, i);
+ sendCreateVisitor(i * 1ms, *_top, i);
}
// Queue size is zero, all visitors will be busy-returned
for (uint32_t i = 0; i < 5; ++i) {
- sendCreateVisitor(1000, *_top, 100 - i);
+ sendCreateVisitor(1000ms, *_top, 100 - i);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY));
}
for (uint32_t session = 0; session < 4; ++session) {
@@ -1037,8 +1037,8 @@ TEST_F(VisitorManagerTest, status_page) {
_manager->setMaxConcurrentVisitors(1, 1);
_manager->setMaxVisitorQueueSize(6);
// 1 running, 1 queued
- sendCreateVisitor(1000000, *_top, 1);
- sendCreateVisitor(1000000, *_top, 128);
+ sendCreateVisitor(1000000ms, *_top, 1);
+ sendCreateVisitor(1000000ms, *_top, 128);
{
TestVisitorMessageSession& session = getSession(0);
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index 3aadce4d18e..471f8b7bb95 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -8,9 +8,7 @@
#include <vespa/storage/distributor/operations/external/visitororder.h>
#include <vespa/storage/distributor/visitormetricsset.h>
#include <vespa/document/base/exceptions.h>
-#include <vespa/document/select/parser.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <iomanip>
#include <sstream>
#include <vespa/log/log.h>
@@ -76,9 +74,7 @@ VisitorOperation::VisitorOperation(
}
}
-VisitorOperation::~VisitorOperation()
-{
-}
+VisitorOperation::~VisitorOperation() = default;
document::BucketId
VisitorOperation::getLastBucketVisited()
@@ -121,22 +117,21 @@ VisitorOperation::getLastBucketVisited()
return newLastBucket;
}
-uint64_t
+vespalib::duration
VisitorOperation::timeLeft() const noexcept
{
const auto elapsed = _operationTimer.getElapsedTime();
- framework::MilliSecTime timeSpent(
- std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
+
LOG(spam,
- "Checking if visitor has timed out: elapsed=%" PRIu64 " ms, timeout=%u ms",
- timeSpent.getTime(),
- _msg->getTimeout());
+ "Checking if visitor has timed out: elapsed=%ld ms, timeout=%ld ms",
+ vespalib::count_ms(elapsed),
+ vespalib::count_ms(_msg->getTimeout()));
- if (timeSpent.getTime() >= _msg->getTimeout()) {
- return 0;
+ if (elapsed >= _msg->getTimeout()) {
+ return vespalib::duration::zero();
} else {
- return _msg->getTimeout() - timeSpent.getTime();
+ return _msg->getTimeout() - elapsed;
}
}
@@ -581,7 +576,7 @@ VisitorOperation::onStart(DistributorMessageSender& sender)
bool
VisitorOperation::shouldAbortDueToTimeout() const noexcept
{
- return timeLeft() == 0;
+ return timeLeft() <= vespalib::duration::zero();
}
void
@@ -629,8 +624,8 @@ VisitorOperation::startNewVisitors(DistributorMessageSender& sender)
markOperationAsFailed(
api::ReturnCode(api::ReturnCode::ABORTED,
vespalib::make_string(
- "Timeout of %u ms is running out",
- _msg->getTimeout())));
+ "Timeout of %ld ms is running out",
+ vespalib::count_ms(_msg->getTimeout()))));
}
if (maySendNewStorageVisitors()) {
@@ -782,7 +777,7 @@ VisitorOperation::sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap,
return visitorsSent;
}
-uint32_t
+vespalib::duration
VisitorOperation::computeVisitorQueueTimeoutMs() const noexcept
{
return timeLeft() / 2;
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
index ebb5ed4c6aa..fdfe60731f5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
@@ -108,17 +108,15 @@ private:
bool allowInconsistencies() const noexcept;
bool shouldAbortDueToTimeout() const noexcept;
bool assignBucketsToNodes(NodeToBucketsMap& nodeToBucketsMap);
- int getNumVisitorsToSendForNode(uint16_t node,
- uint32_t totalBucketsOnNode) const;
- uint32_t computeVisitorQueueTimeoutMs() const noexcept;
+ int getNumVisitorsToSendForNode(uint16_t node, uint32_t totalBucketsOnNode) const;
+ vespalib::duration computeVisitorQueueTimeoutMs() const noexcept;
bool sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap,
DistributorMessageSender& sender);
void sendStorageVisitor(uint16_t node,
const std::vector<document::BucketId>& buckets,
uint32_t pending,
DistributorMessageSender& sender);
- void markCompleted(const document::BucketId& bid,
- const api::ReturnCode& code);
+ void markCompleted(const document::BucketId& bid, const api::ReturnCode& code);
/**
* Operation failed and we can pin the blame on a specific node. Updates
* internal error code and augments error message with the index of the
@@ -138,7 +136,7 @@ private:
* time point. In case of the current time having passed the timeout
* point, function returns 0.
*/
- uint64_t timeLeft() const noexcept;
+ vespalib::duration timeLeft() const noexcept;
DistributorComponent& _owner;
DistributorBucketSpace &_bucketSpace;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 52a4a5c195c..130e039a43e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -77,7 +77,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
std::shared_ptr<api::JoinBucketsCommand> msg(
new api::JoinBucketsCommand(getBucket()));
msg->getSourceBuckets() = node.second;
- msg->setTimeout(INT_MAX);
+ msg->setTimeout(vespalib::duration::max());
setCommandMeta(*msg);
_tracker.queueCommand(msg, node.first);
}
@@ -90,8 +90,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP&
api::JoinBucketsReply& rep = static_cast<api::JoinBucketsReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
if (node == 0xffff) {
- LOG(debug, "Ignored reply since node was max uint16_t for unknown "
- "reasons");
+ LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons");
return;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 66ce4fc0485..445d0972937 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -155,7 +155,7 @@ MergeOperation::onStart(DistributorMessageSender& sender)
// Set timeout to one hour to prevent hung nodes that manage to keep
// connections open from stalling merges in the cluster indefinitely.
- msg->setTimeout(60 * 60 * 1000);
+ msg->setTimeout(3600s);
setCommandMeta(*msg);
sender.sendToNode(lib::NodeType::STORAGE, _mnodes[0].index, msg);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 1b40f744a80..57f8bc92316 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -20,7 +20,7 @@ SplitOperation::SplitOperation(const std::string& clusterName, const BucketAndNo
_splitCount(splitCount),
_splitSize(splitSize)
{}
-SplitOperation::~SplitOperation() {}
+SplitOperation::~SplitOperation() = default;
void
SplitOperation::onStart(DistributorMessageSender& sender)
@@ -35,7 +35,7 @@ SplitOperation::onStart(DistributorMessageSender& sender)
msg->setMaxSplitBits(_maxBits);
msg->setMinDocCount(_splitCount);
msg->setMinByteSize(_splitSize);
- msg->setTimeout(INT_MAX);
+ msg->setTimeout(vespalib::duration::max());
setCommandMeta(*msg);
_tracker.queueCommand(msg, entry->getNodeRef(i).getNode());
_ok = true;
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 8298b126690..62a520abc87 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -230,7 +230,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
distributionHash));
cmd->setPriority(api::StorageMessage::HIGH);
- cmd->setTimeout(INT_MAX);
+ cmd->setTimeout(vespalib::duration::max());
_sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index d1631c50880..f773ee774bb 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -355,7 +355,7 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
}
bool
-FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime)
+FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, vespalib::duration waitTime)
{
if (msg.getType().isReply()) {
return false; // Replies must always be processed and cannot time out.
@@ -980,7 +980,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck)
return lck;
}
- uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]));
+ std::chrono::milliseconds waitTime(uint64_t(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])));
if (!messageTimedOutInQueue(m, waitTime)) {
std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command);
@@ -1004,7 +1004,7 @@ FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) {
api::StorageMessage & m(*iter->_command);
- uint64_t waitTime(iter->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]));
+ std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])));
std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
document::Bucket bucket(iter->_bucket);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index fd6ab5e8b9a..5fc592e11cb 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -325,7 +325,7 @@ private:
* Return whether msg has timed out based on waitTime and the message's
* specified timeout.
*/
- static bool messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime);
+ static bool messageTimedOutInQueue(const api::StorageMessage& msg, vespalib::duration waitTime);
/**
* Creates and returns a reply with api::TIMEOUT return code for msg.
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
index 24f4c9cd731..1efb42a7b22 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
@@ -16,7 +16,7 @@ MergeStatus::MergeStatus(framework::Clock& clock, const metrics::LoadType& lt,
context(lt, priority, traceLevel)
{}
-MergeStatus::~MergeStatus() {}
+MergeStatus::~MergeStatus() = default;
bool
MergeStatus::removeFromDiff(
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
index 932859cd2d0..082ae053ec0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
@@ -24,7 +24,7 @@ public:
api::StorageMessage::Id pendingId;
std::shared_ptr<api::GetBucketDiffReply> pendingGetDiff;
std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff;
- uint32_t timeout;
+ vespalib::duration timeout;
framework::MilliSecTimer startTime;
spi::Context context;
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 499d7ce15ac..978d434847e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -148,7 +148,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
//TODO: Can it be moved ?
std::shared_ptr<api::StorageCommand> cmd = storMsgPtr->getCommand();
- cmd->setTimeout(storMsgPtr->getTimeRemaining().count());
+ cmd->setTimeout(storMsgPtr->getTimeRemaining());
cmd->setTrace(storMsgPtr->getTrace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr)));
@@ -567,7 +567,7 @@ CommunicationManager::sendCommand(
cmd->setContext(mbus::Context(msg->getMsgId()));
cmd->setRetryEnabled(address.retryEnabled());
- cmd->setTimeRemaining(std::chrono::milliseconds(msg->getTimeout()));
+ cmd->setTimeRemaining(msg->getTimeout());
cmd->setTrace(msg->getTrace());
sendMessageBusMessage(msg, std::move(cmd), address.getRoute());
break;
diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
index c6a16de3282..f0c987ee333 100644
--- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
+++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
@@ -20,8 +20,6 @@ LOG_SETUP(".documentapiconverter");
using document::BucketSpace;
-using std::chrono::milliseconds;
-
namespace storage {
DocumentApiConverter::DocumentApiConverter(const config::ConfigUri &configUri,
@@ -140,9 +138,12 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg)
break;
}
- if (toMsg.get() != 0) {
- milliseconds timeout = std::min(milliseconds(INT_MAX), fromMsg.getTimeRemaining());
- toMsg->setTimeout(timeout.count());
+ if (toMsg) {
+ //TODO getTimeRemainingNow ?
+ vespalib::duration cappedTimeout = (fromMsg.getTimeRemaining() < 1ms*INT_MAX)
+ ? fromMsg.getTimeRemaining()
+ : 1ms*INT_MAX;
+ toMsg->setTimeout(cappedTimeout);
toMsg->setPriority(_priConverter->toStoragePriority(fromMsg.getPriority()));
toMsg->setLoadType(fromMsg.getLoadType());
@@ -308,8 +309,8 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg)
break;
}
- if (toMsg.get()) {
- toMsg->setTimeRemaining(milliseconds(fromMsg.getTimeout()));
+ if (toMsg) {
+ toMsg->setTimeRemaining(fromMsg.getTimeout());
toMsg->setContext(mbus::Context(fromMsg.getMsgId()));
if (LOG_WOULD_LOG(spam)) {
toMsg->getTrace().setLevel(9);
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index 651686a7c6d..c5d7880d966 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -154,7 +154,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
: std::unique_ptr<lib::NodeState>()));
cmd->setPriority(api::StorageMessage::VERYHIGH);
- cmd->setTimeout(req->GetParams()->GetValue(1)._intval32);
+ cmd->setTimeout(std::chrono::milliseconds(req->GetParams()->GetValue(1)._intval32));
if (req->GetParams()->GetNumValues() > 2) {
cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32);
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index af01a880fea..9afc8b2d3a5 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -460,14 +460,15 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd)
&& (*cmd->getExpectedState() == *_nodeState || sentReply)
&& is_up_to_date)
{
+ int64_t msTimeout = vespalib::count_ms(cmd->getTimeout());
LOG(debug, "Received get node state request with timeout of "
- "%u milliseconds. Scheduling to be answered in "
- "%u milliseconds unless a node state change "
+ "%ld milliseconds. Scheduling to be answered in "
+ "%ld milliseconds unless a node state change "
"happens before that time.",
- cmd->getTimeout(), cmd->getTimeout() * 800 / 1000);
+ msTimeout, msTimeout * 800 / 1000);
TimeStatePair pair(
_component.getClock().getTimeInMillis()
- + framework::MilliSecTime(cmd->getTimeout() * 800 / 1000),
+ + framework::MilliSecTime(msTimeout * 800 / 1000),
cmd);
_queuedStateRequests.emplace_back(std::move(pair));
} else {
diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h
index d129506eb64..ce309d55803 100644
--- a/storage/src/vespa/storage/visiting/commandqueue.h
+++ b/storage/src/vespa/storage/visiting/commandqueue.h
@@ -16,6 +16,7 @@
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <vespa/vespalib/util/printable.h>
+#include <vespa/vespalib//util/time.h>
#include <vespa/fastos/timestamp.h>
#include <vespa/storageframework/generic/clock/clock.h>
#include <list>
@@ -141,11 +142,10 @@ CommandQueue<Command>::peekNextCommand() const
template<class Command>
void
-CommandQueue<Command>::add(
- const std::shared_ptr<Command>& cmd)
+CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd)
{
framework::MicroSecTime time(_clock.getTimeInMicros()
- + framework::MicroSecTime(cmd->getQueueTimeout() * 1000000));
+ + framework::MicroSecTime(vespalib::count_us(cmd->getQueueTimeout())));
_commands.insert(CommandEntry(cmd, time.getTime(), ++_sequenceId, cmd->getPriority()));
}
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index 6330b580eb9..1a1f1498578 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -309,7 +309,7 @@ VisitorManager::scheduleVisitor(
if (_enforceQueueUse || totCount >= maximumConcurrent(*cmd)) {
api::CreateVisitorCommand::SP failCommand;
- if (cmd->getQueueTimeout() != 0 && _maxVisitorQueueSize > 0) {
+ if (cmd->getQueueTimeout() > vespalib::duration::zero() && _maxVisitorQueueSize > 0) {
if (_visitorQueue.size() < _maxVisitorQueueSize) {
// Still room in the queue
_visitorQueue.add(cmd);
@@ -318,20 +318,15 @@ VisitorManager::scheduleVisitor(
// If tail of priority queue has a lower priority than
// the new visitor, evict it and insert the new one. If
// not, immediately return with a busy reply
- std::shared_ptr<api::CreateVisitorCommand> tail(
- _visitorQueue.peekLowestPriorityCommand());
+ std::shared_ptr<api::CreateVisitorCommand> tail(_visitorQueue.peekLowestPriorityCommand());
// Lower int ==> higher pri
if (cmd->getPriority() < tail->getPriority()) {
- std::pair<api::CreateVisitorCommand::SP,
- time_t> evictCommand(
- _visitorQueue.releaseLowestPriorityCommand());
+ std::pair<api::CreateVisitorCommand::SP, time_t> evictCommand(_visitorQueue.releaseLowestPriorityCommand());
assert(tail == evictCommand.first);
_visitorQueue.add(cmd);
visitorLock.signal();
- framework::MicroSecTime t(
- _component.getClock().getTimeInMicros());
- _metrics->queueEvictedWaitTime.addValue(
- t.getTime() - evictCommand.second);
+ framework::MicroSecTime t(_component.getClock().getTimeInMicros());
+ _metrics->queueEvictedWaitTime.addValue(t.getTime() - evictCommand.second);
failCommand = evictCommand.first;
} else {
failCommand = cmd;
@@ -344,11 +339,10 @@ VisitorManager::scheduleVisitor(
}
visitorLock.unlock();
- if (failCommand.get() != 0) {
- std::shared_ptr<api::CreateVisitorReply> reply(
- new api::CreateVisitorReply(*failCommand));
+ if (failCommand) {
+ auto reply = std::make_shared<api::CreateVisitorReply>(*failCommand);
std::ostringstream ost;
- if (cmd->getQueueTimeout() == 0) {
+ if (cmd->getQueueTimeout() <= vespalib::duration::zero()) {
ost << "Already running the maximum amount ("
<< maximumConcurrent(*failCommand)
<< ") of visitors for this priority ("
@@ -361,11 +355,9 @@ VisitorManager::scheduleVisitor(
<< static_cast<uint32_t>(failCommand->getPriority())
<< "), and maximum queue size is 0.";
} else {
- ost << "Queue is full and a higher priority visitor was received, "
- "taking precedence.";
+ ost << "Queue is full and a higher priority visitor was received, taking precedence.";
}
- reply->setResult(api::ReturnCode(api::ReturnCode::BUSY,
- ost.str()));
+ reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, ost.str()));
send(reply);
}
return false;
@@ -375,13 +367,11 @@ VisitorManager::scheduleVisitor(
}
while (true) {
id = ++_visitorCounter;
- std::map<api::VisitorId, std::string>& usedIds(
- _visitorThread[id % _visitorThread.size()].second);
+ std::map<api::VisitorId, std::string>& usedIds(_visitorThread[id % _visitorThread.size()].second);
if (usedIds.size() == minLoadCount &&
usedIds.find(id) == usedIds.end())
{
- newEntry = _nameToId.insert(NameIdPair(cmd->getInstanceId(),
- id));
+ newEntry = _nameToId.insert(NameIdPair(cmd->getInstanceId(), id));
if (newEntry.second) {
usedIds[id] = cmd->getInstanceId();
}
@@ -391,13 +381,11 @@ VisitorManager::scheduleVisitor(
}
visitorLock.unlock();
if (!newEntry.second) {
- std::shared_ptr<api::CreateVisitorReply> reply(
- new api::CreateVisitorReply(*cmd));
+ auto reply = std::make_shared<api::CreateVisitorReply>(*cmd);
std::ostringstream ost;
ost << "Already running a visitor named " << cmd->getInstanceId()
<< ". Not creating visitor.";
- reply->setResult(api::ReturnCode(api::ReturnCode::EXISTS,
- ost.str()));
+ reply->setResult(api::ReturnCode(api::ReturnCode::EXISTS, ost.str()));
send(reply);
return false;
}
@@ -407,8 +395,7 @@ VisitorManager::scheduleVisitor(
}
bool
-VisitorManager::onCreateVisitor(
- const std::shared_ptr<api::CreateVisitorCommand>& cmd)
+VisitorManager::onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>& cmd)
{
vespalib::MonitorGuard sync(_visitorLock);
scheduleVisitor(cmd, false, sync);
@@ -632,7 +619,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
it->_command);
assert(cmd.get());
out << "<li>" << cmd->getInstanceId() << " - "
- << cmd->getQueueTimeout() << ", remaining timeout "
+ << vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout "
<< (it->_time - time.getTime()) / 1000000 << " ms\n";
}
if (_visitorQueue.empty()) {
@@ -657,7 +644,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
<< "<td>" << it->first << "</td>"
<< "<td>" << it->second.id << "</td>"
<< "<td>" << it->second.timestamp << "</td>"
- << "<td>" << it->second.timeout << "</td>"
+ << "<td>" << vespalib::count_ms(it->second.timeout) << "</td>"
<< "<td>" << it->second.destination << "</td>"
<< "</tr>\n";
}
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 3a3e743eaf2..3675a824e1d 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -55,7 +55,7 @@ private:
struct MessageInfo {
api::VisitorId id;
time_t timestamp;
- uint64_t timeout;
+ vespalib::duration timeout;
std::string destination;
};
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index 142e7a89144..006af5edf7d 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -546,7 +546,7 @@ VisitorThread::onCreateVisitor(
std::move(messageSession),
documentPriority);
visitor->attach(cmd, *controlAddress, *dataAddress,
- framework::MilliSecTime(cmd->getTimeout()));
+ framework::MilliSecTime(vespalib::count_ms(cmd->getTimeout())));
} catch (std::exception& e) {
// We don't handle exceptions from this code, as we've
// added visitor to internal structs we'll end up calling