diff options
Diffstat (limited to 'storage')
4 files changed, 131 insertions, 5 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 52a8bfc41b6..33baa572c64 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/dummystoragelink.h> #include <tests/distributor/distributortestutil.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/idealstatemanager.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> @@ -8,15 +10,19 @@ #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/vdslib/distribution/distribution.h> -#include <vespa/document/test/make_document_bucket.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/vespalib/gtest/gtest.h> using document::test::makeDocumentBucket; +using document::test::makeBucketSpace; using namespace ::testing; namespace storage::distributor { +namespace { +vespalib::string _g_storage("storage"); +} + struct MergeOperationTest : Test, DistributorTestUtil { OperationSequencer _operation_sequencer; @@ -252,8 +258,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) { // at will. auto msg = std::make_shared<api::SetBucketStateCommand>( makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE); - vespalib::string storage("storage"); - msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1)); + msg->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1)); pending_message_tracker().insert(msg); sendReply(op); @@ -457,4 +462,42 @@ TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) { _sender.getLastCommand(true)); } +TEST_F(MergeOperationTest, merge_operation_is_blocked_by_request_bucket_info_to_any_node_in_chain) { + getClock().setAbsoluteTimeInSeconds(10); + document::BucketId bucket_id(16, 1); + addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t"); + enableDistributorClusterState("distributor:1 storage:3"); + MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + // Not initially blocked + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); + + auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>( + makeBucketSpace(), std::vector<document::BucketId>({bucket_id})); + info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1)); // 1 is in chain + pending_message_tracker().insert(info_cmd); + + // Now blocked by info request + EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer)); +} + +TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info_to_unrelated_bucket) { + getClock().setAbsoluteTimeInSeconds(10); + document::BucketId bucket_id(16, 1); + document::BucketId other_bucket_id(16, 2); + addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t"); + enableDistributorClusterState("distributor:1 storage:3"); + MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>( + makeBucketSpace(), std::vector<document::BucketId>({other_bucket_id})); + info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1)); + pending_message_tracker().insert(info_cmd); + + // Not blocked; bucket info request is for another bucket + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 72365c61597..20ffd216e3d 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -2,16 +2,20 @@ #include <vespa/document/base/testdocman.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> #include <tests/common/dummystoragelink.h> +#include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/gtest/gtest.h> #include <gmock/gmock.h> using document::test::makeDocumentBucket; +using document::test::makeBucketSpace; using namespace ::testing; namespace storage::distributor { @@ -517,4 +521,72 @@ TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_stat EXPECT_EQ(state, TaskRunState::Aborted); } +TEST_F(PendingMessageTrackerTest, request_bucket_info_with_no_buckets_tracked_as_null_bucket) { + Fixture f; + auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, lib::ClusterState(), ""); + msg->setAddress(makeStorageAddress(2)); + f.tracker().insert(msg); + + // Tracked as null bucket + { + OperationEnumerator enumerator; + f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator); + EXPECT_EQ("Request bucket info -> 2\n", enumerator.str()); + } + + // Nothing to a specific bucket + { + OperationEnumerator enumerator; + f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator); + EXPECT_EQ("", enumerator.str()); + } + + auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply()); + f.tracker().reply(*reply); + + // No longer tracked as null bucket + { + OperationEnumerator enumerator; + f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator); + EXPECT_EQ("", enumerator.str()); + } +} + +TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_superbucket) { + Fixture f; + document::BucketId bucket(16, 1234); + auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), std::vector<document::BucketId>({bucket})); + msg->setAddress(makeStorageAddress(3)); + f.tracker().insert(msg); + + // Not tracked as null bucket + { + OperationEnumerator enumerator; + f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator); + EXPECT_EQ("", enumerator.str()); + } + // Tracked for superbucket + { + OperationEnumerator enumerator; + f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator); + EXPECT_EQ("Request bucket info -> 3\n", enumerator.str()); + } + // Not tracked for other buckets + { + OperationEnumerator enumerator; + f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 2345)), enumerator); + EXPECT_EQ("", enumerator.str()); + } + + auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply()); + f.tracker().reply(*reply); + + // No longer tracked for specified bucket + { + OperationEnumerator enumerator; + f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator); + EXPECT_EQ("", enumerator.str()); + } +} + } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index f11d1c26da2..ad5467d9156 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -223,6 +223,12 @@ IdealStateOperation::shouldBlockThisOperation(uint32_t messageType, return true; } } + // Also block on pending bucket-specific RequestBucketInfo since this usually + // means there's a semi-completed merge being processed for the bucket, but + // there will not be a pending merge command for it at the time. + if (messageType == api::MessageType::REQUESTBUCKETINFO_ID) { + return true; + } return false; } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index f84eafe8d85..cb76b6d50b2 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -90,8 +90,13 @@ PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg) { std::lock_guard guard(_lock); if (msg->getAddress()) { + // TODO STRIPE reevaluate if getBucket() on RequestBucketInfo msgs should transparently return superbucket..! + document::Bucket bucket = (msg->getType() != api::MessageType::REQUESTBUCKETINFO) + ? msg->getBucket() + : document::Bucket(msg->getBucket().getBucketSpace(), + dynamic_cast<api::RequestBucketInfoCommand&>(*msg).super_bucket_id()); _messages.emplace(currentTime(), msg->getType().getId(), msg->getPriority(), msg->getMsgId(), - msg->getBucket(), msg->getAddress()->getIndex()); + bucket, msg->getAddress()->getIndex()); _nodeInfo.incPending(msg->getAddress()->getIndex()); @@ -119,7 +124,7 @@ PendingMessageTracker::reply(const api::StorageReply& r) if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) { _nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration); } - LOG(debug, "Erased message with id %" PRIu64, msgId); + LOG(debug, "Erased message with id %" PRIu64 " for bucket %s", msgId, bucket.toString().c_str()); msgs.erase(msgId); auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket); // Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker. |