summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp20
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp19
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h2
3 files changed, 41 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 66ef13310a4..881ccb560b4 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -569,6 +569,26 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
dumpBucket(bucket));
}
+// TODO probably also do this for updates and removes
+// TODO consider if we should use the pending state verbatim for computing targets if it exists
+TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
+ setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
+ auto doc = createDummyDocument("test", "test");
+ auto bucket = getExternalOperationHandler().getBucketId(doc->getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+ getBucketDBUpdater().onSetSystemState(
+ std::make_shared<api::SetSystemStateCommand>(
+ lib::ClusterState("version:2 distributor:1 storage:4 .0.s:d .2.s:m")));
+ _sender.clear();
+
+ sendPut(createPut(doc));
+ EXPECT_EQ("", _sender.getCommands(true));
+ EXPECT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(BUSY, "
+ "One or more target content nodes are unavailable in the pending cluster state)",
+ _sender.getLastReply(true));
+}
+
TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
setupDistributor(Redundancy(2), NodeCount(2),
"distributor:1 storage:2 .0.s:r .1.s:r");
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 2b1baa1e0d6..e9348e8e8e1 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -9,6 +9,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/distribution/idealnodecalculatorimpl.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.callback.doc.put");
@@ -145,6 +146,17 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc
}
+bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const {
+ auto* pending_state = _manager.getDistributor().pendingClusterStateOrNull(_msg->getBucket().getBucketSpace());
+ if (!pending_state) {
+ return false;
+ }
+ const char* up_states = _manager.getDistributor().getStorageNodeUpStates();
+ return std::any_of(targets.begin(), targets.end(), [pending_state, up_states](const auto& target){
+ return !pending_state->getNodeState(target.getNode()).getState().oneOf(up_states);
+ });
+}
+
void
PutOperation::onStart(DistributorMessageSender& sender)
{
@@ -188,6 +200,13 @@ PutOperation::onStart(DistributorMessageSender& sender)
}
}
+ if (has_unavailable_targets_in_pending_state(targets)) {
+ _tracker.fail(sender, api::ReturnCode(
+ api::ReturnCode::BUSY, "One or more target content nodes are unavailable in "
+ "the pending cluster state"));
+ return;
+ }
+
// Mark any entries we're not feeding to as not trusted.
std::vector<BucketDatabase::Entry> entries;
_bucketSpace.getBucketDatabase().getParents(bid, entries);
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 745a4f57a35..79b3dd74b82 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -49,6 +49,8 @@ private:
bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const;
+ bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const;
+
std::shared_ptr<api::PutCommand> _msg;
DistributorComponent& _manager;
DistributorBucketSpace &_bucketSpace;