diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-12-03 21:45:53 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-12-04 18:26:06 +0000 |
commit | b8e151a435ccec1ecc03d98bac5b59f4f14514be (patch) | |
tree | 140efda301a7e5adc407c44061ba5b0bb41dd212 /storageapi | |
parent | 7700f411ea6f4a3e7c0599fae239ec84c18c0038 (diff) |
timeout as duration
Conflicts:
messagebus/src/vespa/messagebus/testlib/testserver.cpp
Diffstat (limited to 'storageapi')
9 files changed, 48 insertions, 60 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index b1a754bbbab..dbd79e4fcca 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -522,7 +522,7 @@ TEST_P(StorageProtocolTest, create_visitor) { cmd->getBuckets() = buckets; cmd->setFieldSet("foo,bar,vekterli"); cmd->setVisitInconsistentBuckets(); - cmd->setQueueTimeout(100); + cmd->setQueueTimeout(100ms); cmd->setPriority(149); auto cmd2 = copyCommand(cmd); EXPECT_EQ("library", cmd2->getLibraryName()); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp index 466ff85f398..b90153c9517 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp @@ -44,7 +44,7 @@ ProtocolSerialization4_2::onDecodeGetCommand(BBuf& buf) const api::GetCommand::UP msg( new api::GetCommand(bucket, did, headerOnly ? "[header]" : "[all]", beforeTimestamp)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -64,7 +64,7 @@ ProtocolSerialization4_2::onDecodeRemoveCommand(BBuf& buf) const api::Timestamp timestamp(SH::getLong(buf)); api::RemoveCommand::UP msg(new api::RemoveCommand(bucket, did, timestamp)); onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -88,7 +88,7 @@ ProtocolSerialization4_2::onDecodeRevertCommand(BBuf& buf) const } api::RevertCommand::UP msg(new api::RevertCommand(bucket, tokens)); onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -104,7 +104,7 @@ ProtocolSerialization4_2::onDecodeCreateBucketCommand(BBuf& buf) const document::Bucket bucket = getBucket(buf); api::CreateBucketCommand::UP msg(new api::CreateBucketCommand(bucket)); onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -138,7 +138,7 @@ ProtocolSerialization4_2::onDecodeMergeBucketCommand(BBuf& buf) const api::MergeBucketCommand::UP msg( new api::MergeBucketCommand(bucket, nodes, timestamp)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -187,7 +187,7 @@ ProtocolSerialization4_2::onDecodeGetBucketDiffCommand(BBuf& buf) const onDecodeDiffEntry(buf, entries[i]); } onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -259,7 +259,7 @@ ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const entries[i]._bodyBlob.size()); } onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -291,7 +291,7 @@ ProtocolSerialization4_2::onDecodeRequestBucketInfoReply(const SCmd& cmd, entry._info = getBucketInfo(buf); } onDecodeReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -326,7 +326,7 @@ ProtocolSerialization4_2::onDecodeNotifyBucketChangeReply(const SCmd& cmd, api::NotifyBucketChangeReply::UP msg(new api::NotifyBucketChangeReply( static_cast<const api::NotifyBucketChangeCommand&>(cmd))); onDecodeReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -350,7 +350,7 @@ ProtocolSerialization4_2::onDecodeSplitBucketCommand(BBuf& buf) const msg->setMinByteSize(SH::getInt(buf)); msg->setMinDocCount(SH::getInt(buf)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -403,7 +403,7 @@ ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateVisitorCommand& buf.putBoolean(msg.visitRemoves()); buf.putBoolean(msg.getFieldSet() == "[header]"); buf.putBoolean(msg.visitInconsistentBuckets()); - buf.putInt(msg.getQueueTimeout()); + buf.putInt(vespalib::count_ms(msg.getQueueTimeout())); uint32_t size = msg.getParameters().getSerializedSize(); char* docBuffer = buf.allocate(size); @@ -449,12 +449,12 @@ ProtocolSerialization4_2::onDecodeCreateVisitorCommand(BBuf& buf) const if (SH::getBoolean(buf)) { msg->setVisitInconsistentBuckets(); } - msg->setQueueTimeout(SH::getInt(buf)); + msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf))); msg->getParameters().deserialize(getTypeRepo(), buf); onDecodeCommand(buf, *msg); msg->setVisitorDispatcherVersion(42); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -471,7 +471,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorCommand(BBuf& buf) const vespalib::stringref instanceId = SH::getString(buf); api::DestroyVisitorCommand::UP msg(new api::DestroyVisitorCommand(instanceId)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -485,7 +485,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf { api::DestroyVisitorReply::UP msg(new api::DestroyVisitorReply(static_cast<const api::DestroyVisitorCommand&>(cmd))); onDecodeReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } void @@ -505,7 +505,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationCommand(BBuf& buf) const api::RemoveLocationCommand::UP msg; msg.reset(new api::RemoveLocationCommand(documentSelection, bucket)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -519,7 +519,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf { api::RemoveLocationReply::UP msg(new api::RemoveLocationReply(static_cast<const api::RemoveLocationCommand&>(cmd))); onDecodeBucketInfoReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } // Utility functions for serialization diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp index 32680b24683..b0a1685ed8c 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp @@ -132,7 +132,7 @@ ProtocolSerialization5_1::onEncode( buf.putBoolean(msg.visitRemoves()); buf.putString(msg.getFieldSet()); buf.putBoolean(msg.visitInconsistentBuckets()); - buf.putInt(msg.getQueueTimeout()); + buf.putInt(vespalib::count_ms(msg.getQueueTimeout())); uint32_t size = msg.getParameters().getSerializedSize(); char* docBuffer = buf.allocate(size); @@ -181,7 +181,7 @@ ProtocolSerialization5_1::onDecodeCreateVisitorCommand(BBuf& buf) const if (SH::getBoolean(buf)) { msg->setVisitInconsistentBuckets(); } - msg->setQueueTimeout(SH::getInt(buf)); + msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf))); msg->getParameters().deserialize(getTypeRepo(), buf); onDecodeCommand(buf, *msg); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index d0446f52893..bf56dd56db6 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -1162,7 +1162,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorComman ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId()); ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size()); ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size()); - ctrl_meta->set_queue_timeout(msg.getQueueTimeout()); + ctrl_meta->set_queue_timeout(vespalib::count_ms(msg.getQueueTimeout())); ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount()); ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor()); @@ -1211,7 +1211,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBu cmd->setControlDestination(ctrl_meta.control_destination()); cmd->setDataDestination(ctrl_meta.data_destination()); cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count()); - cmd->setQueueTimeout(ctrl_meta.queue_timeout()); + cmd->setQueueTimeout(std::chrono::milliseconds(ctrl_meta.queue_timeout())); cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor()); cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl diff --git a/storageapi/src/vespa/storageapi/message/visitor.cpp b/storageapi/src/vespa/storageapi/message/visitor.cpp index dbda2d0d0c2..aeb58f30fb4 100644 --- a/storageapi/src/vespa/storageapi/message/visitor.cpp +++ b/storageapi/src/vespa/storageapi/message/visitor.cpp @@ -34,7 +34,7 @@ CreateVisitorCommand::CreateVisitorCommand(document::BucketSpace bucketSpace, _visitRemoves(false), _fieldSet("[all]"), _visitInconsistentBuckets(false), - _queueTimeout(2000), + _queueTimeout(2000ms), _maxPendingReplyCount(2), _version(50), _maxBucketsPerVisitor(1) @@ -82,15 +82,12 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose, out << ") {"; out << "\n" << indent << " Library name: '" << _libName << "'"; out << "\n" << indent << " Instance Id: '" << _instanceId << "'"; - out << "\n" << indent << " Control Destination: '" - << _controlDestination << "'"; - out << "\n" << indent << " Data Destination: '" - << _dataDestination << "'"; + out << "\n" << indent << " Control Destination: '" << _controlDestination << "'"; + out << "\n" << indent << " Data Destination: '" << _dataDestination << "'"; out << "\n" << indent << " Doc Selection: '" << _docSelection << "'"; - out << "\n" << indent << " Max pending: '" - << _maxPendingReplyCount << "'"; - out << "\n" << indent << " Timeout: " << getTimeout(); - out << "\n" << indent << " Queue timeout: " << _queueTimeout << " ms"; + out << "\n" << indent << " Max pending: '" << _maxPendingReplyCount << "'"; + out << "\n" << indent << " Timeout: " << vespalib::count_ms(getTimeout()) << " ms"; + out << "\n" << indent << " Queue timeout: " << vespalib::count_ms(_queueTimeout) << " ms"; out << "\n" << indent << " VisitorDispatcher version: '" << _version << "'"; if (visitRemoves()) { out << "\n" << indent << " Visiting remove entries too"; @@ -109,8 +106,7 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose, } out << "\n" << indent << " "; _params.print(out, verbose, indent + " "); - out << "\n" << indent << " Max buckets: '" - << _maxBucketsPerVisitor << "'"; + out << "\n" << indent << " Max buckets: '" << _maxBucketsPerVisitor << "'"; out << "\n" << indent << "} : "; StorageCommand::print(out, verbose, indent); } else if (_buckets.size() == 2) { diff --git a/storageapi/src/vespa/storageapi/message/visitor.h b/storageapi/src/vespa/storageapi/message/visitor.h index f7dcaa63b20..7189cc67195 100644 --- a/storageapi/src/vespa/storageapi/message/visitor.h +++ b/storageapi/src/vespa/storageapi/message/visitor.h @@ -44,7 +44,7 @@ private: vespalib::string _fieldSet; bool _visitInconsistentBuckets; - uint32_t _queueTimeout; + duration _queueTimeout; uint32_t _maxPendingReplyCount; uint32_t _version; @@ -61,22 +61,17 @@ public: ~CreateVisitorCommand(); void setVisitorCmdId(uint32_t id) { _visitorCmdId = id; } - void setControlDestination(vespalib::stringref d) - { _controlDestination = d; } + void setControlDestination(vespalib::stringref d) { _controlDestination = d; } void setDataDestination(vespalib::stringref d) { _dataDestination = d; } void setParameters(const vdslib::Parameters& params) { _params = params; } - void setMaximumPendingReplyCount(uint32_t count) - { _maxPendingReplyCount = count; } - void setFieldSet(vespalib::stringref fieldSet) - { _fieldSet = fieldSet; } + void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; } + void setFieldSet(vespalib::stringref fieldSet) { _fieldSet = fieldSet; } void setVisitRemoves(bool value = true) { _visitRemoves = value; } - void setVisitInconsistentBuckets(bool visitInconsistent = true) - { _visitInconsistentBuckets = visitInconsistent; } - void addBucketToBeVisited(const document::BucketId& id) - { _buckets.push_back(id); } + void setVisitInconsistentBuckets(bool visitInconsistent = true) { _visitInconsistentBuckets = visitInconsistent; } + void addBucketToBeVisited(const document::BucketId& id) { _buckets.push_back(id); } void setVisitorId(const VisitorId id) { _visitorId = id; } void setInstanceId(vespalib::stringref id) { _instanceId = id; } - void setQueueTimeout(uint32_t milliSecs) { _queueTimeout = milliSecs; } + void setQueueTimeout(duration milliSecs) { _queueTimeout = milliSecs; } void setFromTime(Timestamp ts) { _fromTime = ts; } void setToTime(Timestamp ts) { _toTime = ts; } @@ -86,24 +81,20 @@ public: document::Bucket getBucket() const override; const vespalib::string & getLibraryName() const { return _libName; } const vespalib::string & getInstanceId() const { return _instanceId; } - const vespalib::string & getControlDestination() const - { return _controlDestination; } + const vespalib::string & getControlDestination() const { return _controlDestination; } const vespalib::string & getDataDestination() const { return _dataDestination; } const vespalib::string & getDocumentSelection() const { return _docSelection; } const vdslib::Parameters& getParameters() const { return _params; } vdslib::Parameters& getParameters() { return _params; } - uint32_t getMaximumPendingReplyCount() const - { return _maxPendingReplyCount; } - const std::vector<document::BucketId>& getBuckets() const - { return _buckets; } + uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; } + const std::vector<document::BucketId>& getBuckets() const { return _buckets; } Timestamp getFromTime() const { return _fromTime; } Timestamp getToTime() const { return _toTime; } std::vector<document::BucketId>& getBuckets() { return _buckets; } bool visitRemoves() const { return _visitRemoves; } const vespalib::string& getFieldSet() const { return _fieldSet; } bool visitInconsistentBuckets() const { return _visitInconsistentBuckets; } - // In millisec - uint32_t getQueueTimeout() const { return _queueTimeout; } + duration getQueueTimeout() const { return _queueTimeout; } void setVisitorDispatcherVersion(uint32_t version) { _version = version; } uint32_t getVisitorDispatcherVersion() const { return _version; } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp index fe33066872a..d9bbf34141a 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storagecommand.h" -#include <limits> #include <vespa/vespalib/util/exceptions.h> #include <ostream> @@ -20,13 +19,13 @@ StorageCommand::StorageCommand(const MessageType& type, Priority p) // Default timeout is unlimited. Set from mbus message. Some internal // use want unlimited timeout, (such as readbucketinfo, repair bucket // etc) - _timeout(std::numeric_limits<uint32_t>().max()), + _timeout(duration::max()), _sourceIndex(0xFFFF) { setPriority(p); } -StorageCommand::~StorageCommand() { } +StorageCommand::~StorageCommand() = default; void StorageCommand::print(std::ostream& out, bool verbose, @@ -36,7 +35,7 @@ StorageCommand::print(std::ostream& out, bool verbose, out << "StorageCommand(" << _type.getName(); if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority); if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex; - out << ", timeout = " << _timeout << " ms"; + out << ", timeout = " << vespalib::count_ms(_timeout) << " ms"; out << ")"; } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h index 2885dac3b91..c835168c5b7 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h @@ -19,7 +19,7 @@ namespace storage::api { class StorageReply; class StorageCommand : public StorageMessage { - uint32_t _timeout; /** Timeout of command in milliseconds */ + duration _timeout; /** Timeout of command in milliseconds */ /** Sets what node this message origins from. 0xFFFF is unset. */ uint16_t _sourceIndex; @@ -37,9 +37,9 @@ public: uint16_t getSourceIndex() const { return _sourceIndex; } /** Set timeout in milliseconds. */ - void setTimeout(uint32_t milliseconds) { _timeout = milliseconds; } + void setTimeout(duration milliseconds) { _timeout = milliseconds; } /** Get timeout in milliseconds. */ - uint32_t getTimeout() const { return _timeout; } + duration getTimeout() const { return _timeout; } /** Used to set a new id so the message can be resent. */ void setNewId() { StorageMessage::setNewMsgId(); } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index 8c2338a020c..e119884bd1f 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -70,6 +70,8 @@ public: \ namespace storage::api { +using duration = vespalib::duration; + /** * @class MessageType * @ingroup messageapi |