summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-12-03 21:45:53 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-12-04 18:26:06 +0000
commitb8e151a435ccec1ecc03d98bac5b59f4f14514be (patch)
tree140efda301a7e5adc407c44061ba5b0bb41dd212 /storageapi
parent7700f411ea6f4a3e7c0599fae239ec84c18c0038 (diff)
timeout as duration
Conflicts: messagebus/src/vespa/messagebus/testlib/testserver.cpp
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp2
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp34
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp4
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp4
-rw-r--r--storageapi/src/vespa/storageapi/message/visitor.cpp18
-rw-r--r--storageapi/src/vespa/storageapi/message/visitor.h31
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp7
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagecommand.h6
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h2
9 files changed, 48 insertions, 60 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index b1a754bbbab..dbd79e4fcca 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -522,7 +522,7 @@ TEST_P(StorageProtocolTest, create_visitor) {
cmd->getBuckets() = buckets;
cmd->setFieldSet("foo,bar,vekterli");
cmd->setVisitInconsistentBuckets();
- cmd->setQueueTimeout(100);
+ cmd->setQueueTimeout(100ms);
cmd->setPriority(149);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ("library", cmd2->getLibraryName());
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
index 466ff85f398..b90153c9517 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
@@ -44,7 +44,7 @@ ProtocolSerialization4_2::onDecodeGetCommand(BBuf& buf) const
api::GetCommand::UP msg(
new api::GetCommand(bucket, did, headerOnly ? "[header]" : "[all]", beforeTimestamp));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -64,7 +64,7 @@ ProtocolSerialization4_2::onDecodeRemoveCommand(BBuf& buf) const
api::Timestamp timestamp(SH::getLong(buf));
api::RemoveCommand::UP msg(new api::RemoveCommand(bucket, did, timestamp));
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -88,7 +88,7 @@ ProtocolSerialization4_2::onDecodeRevertCommand(BBuf& buf) const
}
api::RevertCommand::UP msg(new api::RevertCommand(bucket, tokens));
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -104,7 +104,7 @@ ProtocolSerialization4_2::onDecodeCreateBucketCommand(BBuf& buf) const
document::Bucket bucket = getBucket(buf);
api::CreateBucketCommand::UP msg(new api::CreateBucketCommand(bucket));
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -138,7 +138,7 @@ ProtocolSerialization4_2::onDecodeMergeBucketCommand(BBuf& buf) const
api::MergeBucketCommand::UP msg(
new api::MergeBucketCommand(bucket, nodes, timestamp));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -187,7 +187,7 @@ ProtocolSerialization4_2::onDecodeGetBucketDiffCommand(BBuf& buf) const
onDecodeDiffEntry(buf, entries[i]);
}
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -259,7 +259,7 @@ ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const
entries[i]._bodyBlob.size());
}
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -291,7 +291,7 @@ ProtocolSerialization4_2::onDecodeRequestBucketInfoReply(const SCmd& cmd,
entry._info = getBucketInfo(buf);
}
onDecodeReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -326,7 +326,7 @@ ProtocolSerialization4_2::onDecodeNotifyBucketChangeReply(const SCmd& cmd,
api::NotifyBucketChangeReply::UP msg(new api::NotifyBucketChangeReply(
static_cast<const api::NotifyBucketChangeCommand&>(cmd)));
onDecodeReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -350,7 +350,7 @@ ProtocolSerialization4_2::onDecodeSplitBucketCommand(BBuf& buf) const
msg->setMinByteSize(SH::getInt(buf));
msg->setMinDocCount(SH::getInt(buf));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -403,7 +403,7 @@ ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateVisitorCommand&
buf.putBoolean(msg.visitRemoves());
buf.putBoolean(msg.getFieldSet() == "[header]");
buf.putBoolean(msg.visitInconsistentBuckets());
- buf.putInt(msg.getQueueTimeout());
+ buf.putInt(vespalib::count_ms(msg.getQueueTimeout()));
uint32_t size = msg.getParameters().getSerializedSize();
char* docBuffer = buf.allocate(size);
@@ -449,12 +449,12 @@ ProtocolSerialization4_2::onDecodeCreateVisitorCommand(BBuf& buf) const
if (SH::getBoolean(buf)) {
msg->setVisitInconsistentBuckets();
}
- msg->setQueueTimeout(SH::getInt(buf));
+ msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf)));
msg->getParameters().deserialize(getTypeRepo(), buf);
onDecodeCommand(buf, *msg);
msg->setVisitorDispatcherVersion(42);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -471,7 +471,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorCommand(BBuf& buf) const
vespalib::stringref instanceId = SH::getString(buf);
api::DestroyVisitorCommand::UP msg(new api::DestroyVisitorCommand(instanceId));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -485,7 +485,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf
{
api::DestroyVisitorReply::UP msg(new api::DestroyVisitorReply(static_cast<const api::DestroyVisitorCommand&>(cmd)));
onDecodeReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
void
@@ -505,7 +505,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationCommand(BBuf& buf) const
api::RemoveLocationCommand::UP msg;
msg.reset(new api::RemoveLocationCommand(documentSelection, bucket));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -519,7 +519,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf
{
api::RemoveLocationReply::UP msg(new api::RemoveLocationReply(static_cast<const api::RemoveLocationCommand&>(cmd)));
onDecodeBucketInfoReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
// Utility functions for serialization
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
index 32680b24683..b0a1685ed8c 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
@@ -132,7 +132,7 @@ ProtocolSerialization5_1::onEncode(
buf.putBoolean(msg.visitRemoves());
buf.putString(msg.getFieldSet());
buf.putBoolean(msg.visitInconsistentBuckets());
- buf.putInt(msg.getQueueTimeout());
+ buf.putInt(vespalib::count_ms(msg.getQueueTimeout()));
uint32_t size = msg.getParameters().getSerializedSize();
char* docBuffer = buf.allocate(size);
@@ -181,7 +181,7 @@ ProtocolSerialization5_1::onDecodeCreateVisitorCommand(BBuf& buf) const
if (SH::getBoolean(buf)) {
msg->setVisitInconsistentBuckets();
}
- msg->setQueueTimeout(SH::getInt(buf));
+ msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf)));
msg->getParameters().deserialize(getTypeRepo(), buf);
onDecodeCommand(buf, *msg);
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index d0446f52893..bf56dd56db6 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -1162,7 +1162,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorComman
ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId());
ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size());
ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size());
- ctrl_meta->set_queue_timeout(msg.getQueueTimeout());
+ ctrl_meta->set_queue_timeout(vespalib::count_ms(msg.getQueueTimeout()));
ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount());
ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor());
@@ -1211,7 +1211,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBu
cmd->setControlDestination(ctrl_meta.control_destination());
cmd->setDataDestination(ctrl_meta.data_destination());
cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count());
- cmd->setQueueTimeout(ctrl_meta.queue_timeout());
+ cmd->setQueueTimeout(std::chrono::milliseconds(ctrl_meta.queue_timeout()));
cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor());
cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl
diff --git a/storageapi/src/vespa/storageapi/message/visitor.cpp b/storageapi/src/vespa/storageapi/message/visitor.cpp
index dbda2d0d0c2..aeb58f30fb4 100644
--- a/storageapi/src/vespa/storageapi/message/visitor.cpp
+++ b/storageapi/src/vespa/storageapi/message/visitor.cpp
@@ -34,7 +34,7 @@ CreateVisitorCommand::CreateVisitorCommand(document::BucketSpace bucketSpace,
_visitRemoves(false),
_fieldSet("[all]"),
_visitInconsistentBuckets(false),
- _queueTimeout(2000),
+ _queueTimeout(2000ms),
_maxPendingReplyCount(2),
_version(50),
_maxBucketsPerVisitor(1)
@@ -82,15 +82,12 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose,
out << ") {";
out << "\n" << indent << " Library name: '" << _libName << "'";
out << "\n" << indent << " Instance Id: '" << _instanceId << "'";
- out << "\n" << indent << " Control Destination: '"
- << _controlDestination << "'";
- out << "\n" << indent << " Data Destination: '"
- << _dataDestination << "'";
+ out << "\n" << indent << " Control Destination: '" << _controlDestination << "'";
+ out << "\n" << indent << " Data Destination: '" << _dataDestination << "'";
out << "\n" << indent << " Doc Selection: '" << _docSelection << "'";
- out << "\n" << indent << " Max pending: '"
- << _maxPendingReplyCount << "'";
- out << "\n" << indent << " Timeout: " << getTimeout();
- out << "\n" << indent << " Queue timeout: " << _queueTimeout << " ms";
+ out << "\n" << indent << " Max pending: '" << _maxPendingReplyCount << "'";
+ out << "\n" << indent << " Timeout: " << vespalib::count_ms(getTimeout()) << " ms";
+ out << "\n" << indent << " Queue timeout: " << vespalib::count_ms(_queueTimeout) << " ms";
out << "\n" << indent << " VisitorDispatcher version: '" << _version << "'";
if (visitRemoves()) {
out << "\n" << indent << " Visiting remove entries too";
@@ -109,8 +106,7 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose,
}
out << "\n" << indent << " ";
_params.print(out, verbose, indent + " ");
- out << "\n" << indent << " Max buckets: '"
- << _maxBucketsPerVisitor << "'";
+ out << "\n" << indent << " Max buckets: '" << _maxBucketsPerVisitor << "'";
out << "\n" << indent << "} : ";
StorageCommand::print(out, verbose, indent);
} else if (_buckets.size() == 2) {
diff --git a/storageapi/src/vespa/storageapi/message/visitor.h b/storageapi/src/vespa/storageapi/message/visitor.h
index f7dcaa63b20..7189cc67195 100644
--- a/storageapi/src/vespa/storageapi/message/visitor.h
+++ b/storageapi/src/vespa/storageapi/message/visitor.h
@@ -44,7 +44,7 @@ private:
vespalib::string _fieldSet;
bool _visitInconsistentBuckets;
- uint32_t _queueTimeout;
+ duration _queueTimeout;
uint32_t _maxPendingReplyCount;
uint32_t _version;
@@ -61,22 +61,17 @@ public:
~CreateVisitorCommand();
void setVisitorCmdId(uint32_t id) { _visitorCmdId = id; }
- void setControlDestination(vespalib::stringref d)
- { _controlDestination = d; }
+ void setControlDestination(vespalib::stringref d) { _controlDestination = d; }
void setDataDestination(vespalib::stringref d) { _dataDestination = d; }
void setParameters(const vdslib::Parameters& params) { _params = params; }
- void setMaximumPendingReplyCount(uint32_t count)
- { _maxPendingReplyCount = count; }
- void setFieldSet(vespalib::stringref fieldSet)
- { _fieldSet = fieldSet; }
+ void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; }
+ void setFieldSet(vespalib::stringref fieldSet) { _fieldSet = fieldSet; }
void setVisitRemoves(bool value = true) { _visitRemoves = value; }
- void setVisitInconsistentBuckets(bool visitInconsistent = true)
- { _visitInconsistentBuckets = visitInconsistent; }
- void addBucketToBeVisited(const document::BucketId& id)
- { _buckets.push_back(id); }
+ void setVisitInconsistentBuckets(bool visitInconsistent = true) { _visitInconsistentBuckets = visitInconsistent; }
+ void addBucketToBeVisited(const document::BucketId& id) { _buckets.push_back(id); }
void setVisitorId(const VisitorId id) { _visitorId = id; }
void setInstanceId(vespalib::stringref id) { _instanceId = id; }
- void setQueueTimeout(uint32_t milliSecs) { _queueTimeout = milliSecs; }
+ void setQueueTimeout(duration milliSecs) { _queueTimeout = milliSecs; }
void setFromTime(Timestamp ts) { _fromTime = ts; }
void setToTime(Timestamp ts) { _toTime = ts; }
@@ -86,24 +81,20 @@ public:
document::Bucket getBucket() const override;
const vespalib::string & getLibraryName() const { return _libName; }
const vespalib::string & getInstanceId() const { return _instanceId; }
- const vespalib::string & getControlDestination() const
- { return _controlDestination; }
+ const vespalib::string & getControlDestination() const { return _controlDestination; }
const vespalib::string & getDataDestination() const { return _dataDestination; }
const vespalib::string & getDocumentSelection() const { return _docSelection; }
const vdslib::Parameters& getParameters() const { return _params; }
vdslib::Parameters& getParameters() { return _params; }
- uint32_t getMaximumPendingReplyCount() const
- { return _maxPendingReplyCount; }
- const std::vector<document::BucketId>& getBuckets() const
- { return _buckets; }
+ uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; }
+ const std::vector<document::BucketId>& getBuckets() const { return _buckets; }
Timestamp getFromTime() const { return _fromTime; }
Timestamp getToTime() const { return _toTime; }
std::vector<document::BucketId>& getBuckets() { return _buckets; }
bool visitRemoves() const { return _visitRemoves; }
const vespalib::string& getFieldSet() const { return _fieldSet; }
bool visitInconsistentBuckets() const { return _visitInconsistentBuckets; }
- // In millisec
- uint32_t getQueueTimeout() const { return _queueTimeout; }
+ duration getQueueTimeout() const { return _queueTimeout; }
void setVisitorDispatcherVersion(uint32_t version) { _version = version; }
uint32_t getVisitorDispatcherVersion() const { return _version; }
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp
index fe33066872a..d9bbf34141a 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "storagecommand.h"
-#include <limits>
#include <vespa/vespalib/util/exceptions.h>
#include <ostream>
@@ -20,13 +19,13 @@ StorageCommand::StorageCommand(const MessageType& type, Priority p)
// Default timeout is unlimited. Set from mbus message. Some internal
// use want unlimited timeout, (such as readbucketinfo, repair bucket
// etc)
- _timeout(std::numeric_limits<uint32_t>().max()),
+ _timeout(duration::max()),
_sourceIndex(0xFFFF)
{
setPriority(p);
}
-StorageCommand::~StorageCommand() { }
+StorageCommand::~StorageCommand() = default;
void
StorageCommand::print(std::ostream& out, bool verbose,
@@ -36,7 +35,7 @@ StorageCommand::print(std::ostream& out, bool verbose,
out << "StorageCommand(" << _type.getName();
if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority);
if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex;
- out << ", timeout = " << _timeout << " ms";
+ out << ", timeout = " << vespalib::count_ms(_timeout) << " ms";
out << ")";
}
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
index 2885dac3b91..c835168c5b7 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
@@ -19,7 +19,7 @@ namespace storage::api {
class StorageReply;
class StorageCommand : public StorageMessage {
- uint32_t _timeout; /** Timeout of command in milliseconds */
+ duration _timeout; /** Timeout of command in milliseconds */
/** Sets what node this message origins from. 0xFFFF is unset. */
uint16_t _sourceIndex;
@@ -37,9 +37,9 @@ public:
uint16_t getSourceIndex() const { return _sourceIndex; }
/** Set timeout in milliseconds. */
- void setTimeout(uint32_t milliseconds) { _timeout = milliseconds; }
+ void setTimeout(duration milliseconds) { _timeout = milliseconds; }
/** Get timeout in milliseconds. */
- uint32_t getTimeout() const { return _timeout; }
+ duration getTimeout() const { return _timeout; }
/** Used to set a new id so the message can be resent. */
void setNewId() { StorageMessage::setNewMsgId(); }
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index 8c2338a020c..e119884bd1f 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -70,6 +70,8 @@ public: \
namespace storage::api {
+using duration = vespalib::duration;
+
/**
* @class MessageType
* @ingroup messageapi