From 6bd81edbd0a66564569840b8f09981b9fbba4883 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 12 Jan 2023 14:47:08 +0000 Subject: We can only handle unconditional async messages in network thread. --- storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp | 2 ++ storage/src/vespa/storage/persistence/asynchandler.cpp | 6 +++--- storage/src/vespa/storage/persistence/asynchandler.h | 2 +- .../vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp | 2 +- storage/src/vespa/storageapi/message/persistence.h | 1 + storage/src/vespa/storageapi/messageapi/storagecommand.h | 3 --- storage/src/vespa/storageapi/messageapi/storagemessage.cpp | 6 ------ storage/src/vespa/storageapi/messageapi/storagemessage.h | 4 +--- 8 files changed, 9 insertions(+), 17 deletions(-) (limited to 'storage/src') diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index 09ee8e46a1a..d3036a2fad3 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -778,7 +778,9 @@ TEST_P(StorageProtocolTest, update_command_with_condition) { auto update = std::make_shared( _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); auto cmd = std::make_shared(_bucket, update, 14); + EXPECT_FALSE(cmd->hasTestAndSetCondition()); cmd->setCondition(TestAndSetCondition(CONDITION_STRING)); + EXPECT_TRUE(cmd->hasTestAndSetCondition()); auto cmd2 = copyCommand(cmd); EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index cc1bd557298..70bfb0000d1 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -337,13 +337,13 @@ AsyncHandler::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP } bool -AsyncHandler::is_async_message(api::MessageType::Id type_id) noexcept +AsyncHandler::is_async_unconditional_message(const api::StorageMessage & cmd) noexcept { - switch (type_id) { + switch (cmd.getType().getId()) { case api::MessageType::PUT_ID: case api::MessageType::UPDATE_ID: case api::MessageType::REMOVE_ID: - return true; + return ! cmd.hasTestAndSetCondition(); default: return false; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index d50ebf36c81..c5122647caa 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -34,7 +34,7 @@ public: MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrackerUP tracker) const; - static bool is_async_message(api::MessageType::Id type_id) noexcept; + static bool is_async_unconditional_message(const api::StorageMessage & cmd) noexcept; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; static bool tasConditionExists(const api::TestAndSetCommand & cmd); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 13f3ceb328b..e8cc8a499a3 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -1013,7 +1013,7 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) while ((iter != end) && operationIsInhibited(guard, iter->_bucket, *iter->_command)) { ++iter; } - if ((iter != end) && AsyncHandler::is_async_message(iter->_command->getType().getId())) { + if ((iter != end) && AsyncHandler::is_async_unconditional_message(*(iter->_command))) { // This is executed in the context of an RPC thread, so only do a _non-blocking_ // poll of the throttle policy. auto throttle_token = _owner.operation_throttler().try_acquire_one(); diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h index 1d8cfd9d277..d1709c46a6e 100644 --- a/storage/src/vespa/storageapi/message/persistence.h +++ b/storage/src/vespa/storageapi/message/persistence.h @@ -28,6 +28,7 @@ public: void setCondition(const TestAndSetCondition & condition) { _condition = condition; } const TestAndSetCondition & getCondition() const { return _condition; } + bool hasTestAndSetCondition() const noexcept override { return _condition.isPresent(); } /** * Uniform interface to get document id diff --git a/storage/src/vespa/storageapi/messageapi/storagecommand.h b/storage/src/vespa/storageapi/messageapi/storagecommand.h index 30d59e5fe4b..f1b736a28c8 100644 --- a/storage/src/vespa/storageapi/messageapi/storagecommand.h +++ b/storage/src/vespa/storageapi/messageapi/storagecommand.h @@ -39,9 +39,6 @@ public: void setTimeout(duration timeout) { _timeout = timeout; } duration getTimeout() const { return _timeout; } - /** Used to set a new id so the message can be resent. */ - void setNewId() { StorageMessage::setNewMsgId(); } - /** Overload this to get more descriptive message output. */ void print(std::ostream& out, bool verbose, const std::string& indent) const override; diff --git a/storage/src/vespa/storageapi/messageapi/storagemessage.cpp b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp index 5ffc0d57ea8..b1d68fd77e3 100644 --- a/storage/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -262,12 +262,6 @@ StorageMessage::StorageMessage(const StorageMessage& other, Id id) noexcept StorageMessage::~StorageMessage() = default; -void -StorageMessage::setNewMsgId() noexcept -{ - _msgId = generateMsgId(); -} - vespalib::string StorageMessage::getSummary() const { return toString(); diff --git a/storage/src/vespa/storageapi/messageapi/storagemessage.h b/storage/src/vespa/storageapi/messageapi/storagemessage.h index 6990423295b..282f110646d 100644 --- a/storage/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storage/src/vespa/storageapi/messageapi/storagemessage.h @@ -375,9 +375,6 @@ public: Id getMsgId() const noexcept { return _msgId; } - /** Method used by storage commands to set a new id. */ - void setNewMsgId() noexcept; - /** * Set the id of this message. Typically used to set the id to a * unique value previously generated with the generateMsgId method. @@ -429,6 +426,7 @@ public: * method in the MessageHandler interface. */ virtual bool callHandler(MessageHandler&, const StorageMessage::SP&) const = 0; + virtual bool hasTestAndSetCondition() const noexcept { return false; } mbus::Trace && steal_trace() noexcept { return std::move(_trace); } mbus::Trace& getTrace() noexcept { return _trace; } -- cgit v1.2.3