aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp49
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp72
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp9
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.cpp4
5 files changed, 135 insertions, 5 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 52a8bfc41b6..33baa572c64 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
#include <tests/distributor/distributortestutil.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
@@ -8,15 +10,19 @@
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
using namespace ::testing;
namespace storage::distributor {
+namespace {
+vespalib::string _g_storage("storage");
+}
+
struct MergeOperationTest : Test, DistributorTestUtil {
OperationSequencer _operation_sequencer;
@@ -252,8 +258,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
// at will.
auto msg = std::make_shared<api::SetBucketStateCommand>(
makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE);
- vespalib::string storage("storage");
- msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1));
+ msg->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1));
pending_message_tracker().insert(msg);
sendReply(op);
@@ -457,4 +462,42 @@ TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
_sender.getLastCommand(true));
}
+TEST_F(MergeOperationTest, merge_operation_is_blocked_by_request_bucket_info_to_any_node_in_chain) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ document::BucketId bucket_id(16, 1);
+ addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ // Not initially blocked
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
+
+ auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>(
+ makeBucketSpace(), std::vector<document::BucketId>({bucket_id}));
+ info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1)); // 1 is in chain
+ pending_message_tracker().insert(info_cmd);
+
+ // Now blocked by info request
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
+}
+
+TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info_to_unrelated_bucket) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ document::BucketId bucket_id(16, 1);
+ document::BucketId other_bucket_id(16, 2);
+ addNodesToBucketDB(bucket_id, "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ auto info_cmd = std::make_shared<api::RequestBucketInfoCommand>(
+ makeBucketSpace(), std::vector<document::BucketId>({other_bucket_id}));
+ info_cmd->setAddress(api::StorageMessageAddress::create(&_g_storage, lib::NodeType::STORAGE, 1));
+ pending_message_tracker().insert(info_cmd);
+
+ // Not blocked; bucket info request is for another bucket
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 72365c61597..20ffd216e3d 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -2,16 +2,20 @@
#include <vespa/document/base/testdocman.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <tests/common/dummystoragelink.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
using namespace ::testing;
namespace storage::distributor {
@@ -517,4 +521,72 @@ TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_stat
EXPECT_EQ(state, TaskRunState::Aborted);
}
+TEST_F(PendingMessageTrackerTest, request_bucket_info_with_no_buckets_tracked_as_null_bucket) {
+ Fixture f;
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, lib::ClusterState(), "");
+ msg->setAddress(makeStorageAddress(2));
+ f.tracker().insert(msg);
+
+ // Tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("Request bucket info -> 2\n", enumerator.str());
+ }
+
+ // Nothing to a specific bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+
+ auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply());
+ f.tracker().reply(*reply);
+
+ // No longer tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+}
+
+TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_superbucket) {
+ Fixture f;
+ document::BucketId bucket(16, 1234);
+ auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), std::vector<document::BucketId>({bucket}));
+ msg->setAddress(makeStorageAddress(3));
+ f.tracker().insert(msg);
+
+ // Not tracked as null bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+ // Tracked for superbucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator);
+ EXPECT_EQ("Request bucket info -> 3\n", enumerator.str());
+ }
+ // Not tracked for other buckets
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 2345)), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+
+ auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply());
+ f.tracker().reply(*reply);
+
+ // No longer tracked for specified bucket
+ {
+ OperationEnumerator enumerator;
+ f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator);
+ EXPECT_EQ("", enumerator.str());
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index f11d1c26da2..ad5467d9156 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -223,6 +223,12 @@ IdealStateOperation::shouldBlockThisOperation(uint32_t messageType,
return true;
}
}
+ // Also block on pending bucket-specific RequestBucketInfo since this usually
+ // means there's a semi-completed merge being processed for the bucket, but
+ // there will not be a pending merge command for it at the time.
+ if (messageType == api::MessageType::REQUESTBUCKETINFO_ID) {
+ return true;
+ }
return false;
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index f84eafe8d85..cb76b6d50b2 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -90,8 +90,13 @@ PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg)
{
std::lock_guard guard(_lock);
if (msg->getAddress()) {
+ // TODO STRIPE reevaluate if getBucket() on RequestBucketInfo msgs should transparently return superbucket..!
+ document::Bucket bucket = (msg->getType() != api::MessageType::REQUESTBUCKETINFO)
+ ? msg->getBucket()
+ : document::Bucket(msg->getBucket().getBucketSpace(),
+ dynamic_cast<api::RequestBucketInfoCommand&>(*msg).super_bucket_id());
_messages.emplace(currentTime(), msg->getType().getId(), msg->getPriority(), msg->getMsgId(),
- msg->getBucket(), msg->getAddress()->getIndex());
+ bucket, msg->getAddress()->getIndex());
_nodeInfo.incPending(msg->getAddress()->getIndex());
@@ -119,7 +124,7 @@ PendingMessageTracker::reply(const api::StorageReply& r)
if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) {
_nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration);
}
- LOG(debug, "Erased message with id %" PRIu64, msgId);
+ LOG(debug, "Erased message with id %" PRIu64 " for bucket %s", msgId, bucket.toString().c_str());
msgs.erase(msgId);
auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket);
// Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker.
diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp
index 2323a1ab0a4..0f03c63ca93 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.cpp
+++ b/storageapi/src/vespa/storageapi/message/bucket.cpp
@@ -493,6 +493,8 @@ RequestBucketInfoCommand::print(std::ostream& out, bool verbose,
if (hasSystemState()) {
out << "distributor " << _distributor << " in ";
_state->print(out, verbose, indent + " ");
+ } else if (super_bucket_id().isSet()) {
+ out << ", super bucket " << super_bucket_id() << ". ";
}
if (verbose && !_buckets.empty()) {
out << "\n" << indent << " Specified buckets:\n" << indent << " ";
@@ -529,6 +531,8 @@ RequestBucketInfoReply::print(std::ostream& out, bool verbose,
out << "RequestBucketInfoReply(" << _buckets.size();
if (_full_bucket_fetch) {
out << ", full fetch";
+ } else if (super_bucket_id().isSet()) {
+ out << ", super bucket " << super_bucket_id();
}
if (verbose) {
out << "\n" << indent << " ";