summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-05-21 13:16:08 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-05-21 13:23:15 +0000
commit20cd5dda7e3adda277b0d116075c89c3886dc26e (patch)
tree8650aa281eca845d3eee8d7f84f6bb939f366bff /storage
parentfb212d0abb8ed773106cbdadd68e9800b257e44a (diff)
Avoid resurrecting replicas for nodes that are unavailable in pending state
We previously only checked for node availability in the _active_ state without looking at the pending state. This opened up for a race condition where a reply for a previously DB-pruned node could bring a replica back in the DB iff received during a pending state window. Consider Maintenance as unavailable for this case, not just Down. Also move all `PutOperation` tests to GTest.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt2
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp29
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp514
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h5
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.cpp37
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h5
-rw-r--r--storage/src/vespa/storage/distributor/distributorinterface.h1
10 files changed, 278 insertions, 332 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index fdccf9b1394..245d54e8e69 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -24,7 +24,6 @@ vespa_add_library(storage_testdistributor TEST
ownership_transfer_safe_time_point_calculator_test.cpp
pendingmessagetrackertest.cpp
persistence_metrics_set_test.cpp
- putoperationtest.cpp
removebucketoperationtest.cpp
removelocationtest.cpp
removeoperationtest.cpp
@@ -50,6 +49,7 @@ vespa_add_library(storage_gtestdistributor TEST
bucketdatabasetest.cpp
bucketdbupdatertest.cpp
mapbucketdatabasetest.cpp
+ putoperationtest.cpp
# Fixture etc. dupes with non-gtest runner :
distributortestutil.cpp
bucket_db_prune_elision_test.cpp
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 9d063c82c69..1cfc1692edb 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -2596,7 +2596,7 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO);
- const auto& req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
+ const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.commands[bsi]);
auto sreply = std::make_shared<RequestBucketInfoReply>(req);
sreply->setAddress(storageAddress(0));
@@ -2605,7 +2605,7 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
for (uint32_t sb = 0; sb < superbuckets; ++sb) {
for (uint64_t i = 0; i < sub_buckets; ++i) {
document::BucketId bucket(48, (i << 32ULL) | sb);
- vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10,1,1)));
+ vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1)));
}
}
}
@@ -2626,4 +2626,29 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
EXPECT_EQ(size_t(0), mutable_global_db().size());
}
+TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
+ auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
+ auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");
+
+ lib::ClusterStateBundle initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}});
+ set_cluster_state_bundle(initial_bundle);
+
+ auto* state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::default_space());
+ ASSERT_TRUE(state != nullptr);
+ EXPECT_EQ(*initial_default, *state);
+
+ state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::global_space());
+ ASSERT_TRUE(state != nullptr);
+ EXPECT_EQ(*initial_baseline, *state);
+
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0));
+
+ state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::default_space());
+ EXPECT_TRUE(state == nullptr);
+
+ state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::global_space());
+ EXPECT_TRUE(state == nullptr);
+}
+
}
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index aa5fd80df91..66ef13310a4 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -1,7 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/config/helper/configgetter.h>
-#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storage/distributor/distributor.h>
@@ -12,9 +10,8 @@
#include <tests/common/dummystoragelink.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
-#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/config/helper/configgetter.hpp>
#include <iomanip>
using std::shared_ptr;
@@ -29,79 +26,26 @@ using namespace storage::lib;
using namespace std::literals::string_literals;
using document::test::makeDocumentBucket;
-namespace storage {
-
-namespace distributor {
-
-class PutOperationTest : public CppUnit::TestFixture,
- public DistributorTestUtil {
- CPPUNIT_TEST_SUITE(PutOperationTest);
- CPPUNIT_TEST(testSimple);
- CPPUNIT_TEST(testBucketDatabaseGetsSpecialEntryWhenCreateBucketSent);
- CPPUNIT_TEST(testSendInlineSplitBeforePutIfBucketTooLarge);
- CPPUNIT_TEST(testDoNotSendInlineSplitIfNotConfigured);
- CPPUNIT_TEST(testNodeRemovedOnReply);
- CPPUNIT_TEST(testDoNotSendCreateBucketIfAlreadyPending);
- CPPUNIT_TEST(testMultipleCopies);
- CPPUNIT_TEST(testMultipleCopiesEarlyReturnPrimaryNotRequired);
- CPPUNIT_TEST(testMultipleCopiesEarlyReturnPrimaryRequired);
- CPPUNIT_TEST(testMultipleCopiesEarlyReturnPrimaryRequiredNotDone);
- CPPUNIT_TEST_IGNORED(testDoNotRevertOnFailureAfterEarlyReturn);
- CPPUNIT_TEST(testStorageFailed);
- CPPUNIT_TEST(testRevertSuccessfulCopiesWhenOneFails);
- CPPUNIT_TEST(testNoRevertIfRevertDisabled);
- CPPUNIT_TEST(testNoStorageNodes);
- CPPUNIT_TEST(testUpdateCorrectBucketOnRemappedPut);
- CPPUNIT_TEST(testTargetNodes);
- CPPUNIT_TEST(testDoNotResurrectDownedNodesInBucketDB);
- CPPUNIT_TEST(sendToRetiredNodesIfNoUpNodesAvailable);
- CPPUNIT_TEST(replicaImplicitlyActivatedWhenActivationIsNotDisabled);
- CPPUNIT_TEST(replicaNotImplicitlyActivatedWhenActivationIsDisabled);
- CPPUNIT_TEST_SUITE_END();
-
- std::shared_ptr<const DocumentTypeRepo> _repo;
- const DocumentType* _html_type;
- std::unique_ptr<Operation> op;
+using namespace ::testing;
-protected:
- void testSimple();
- void testBucketDatabaseGetsSpecialEntryWhenCreateBucketSent();
- void testSendInlineSplitBeforePutIfBucketTooLarge();
- void testDoNotSendInlineSplitIfNotConfigured();
- void testNodeRemovedOnReply();
- void testDoNotSendCreateBucketIfAlreadyPending();
- void testStorageFailed();
- void testNoReply();
- void testMultipleCopies();
- void testRevertSuccessfulCopiesWhenOneFails();
- void testNoRevertIfRevertDisabled();
- void testInconsistentChecksum();
- void testNoStorageNodes();
- void testMultipleCopiesEarlyReturnPrimaryNotRequired();
- void testMultipleCopiesEarlyReturnPrimaryRequired();
- void testMultipleCopiesEarlyReturnPrimaryRequiredNotDone();
- void testDoNotRevertOnFailureAfterEarlyReturn();
- void testUpdateCorrectBucketOnRemappedPut();
- void testBucketNotFound();
- void testTargetNodes();
- void testDoNotResurrectDownedNodesInBucketDB();
- void sendToRetiredNodesIfNoUpNodesAvailable();
- void replicaImplicitlyActivatedWhenActivationIsNotDisabled();
- void replicaNotImplicitlyActivatedWhenActivationIsDisabled();
-
- void doTestCreationWithBucketActivationDisabled(bool disabled);
+namespace storage::distributor {
+class PutOperationTest : public Test,
+ public DistributorTestUtil
+{
public:
- void setUp() override {
- _repo.reset(
- new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>
- ::getConfig("config-doctypes",
- FileSpec(TEST_PATH("config-doctypes.cfg")))));
- _html_type = _repo->getDocumentType("text/html");
+ document::TestDocMan _testDocMan;
+ std::unique_ptr<Operation> op;
+
+ ~PutOperationTest() override;
+
+ void do_test_creation_with_bucket_activation_disabled(bool disabled);
+
+ void SetUp() override {
createLinks();
};
- void tearDown() override {
+ void TearDown() override {
close();
}
@@ -113,7 +57,7 @@ public:
= api::ReturnCode::OK,
api::BucketInfo info = api::BucketInfo(1,2,3,4,5))
{
- CPPUNIT_ASSERT(!_sender.commands.empty());
+ ASSERT_FALSE(_sender.commands.empty());
if (idx == -1) {
idx = _sender.commands.size() - 1;
} else if (static_cast<size_t>(idx) >= _sender.commands.size()) {
@@ -138,30 +82,25 @@ public:
op->start(_sender, framework::MilliSecTime(0));
}
- Document::SP createDummyDocument(const char* ns,
- const char* id) const
- {
- return Document::SP(
- new Document(*_html_type,
- DocumentId(DocIdString(ns, id))));
+ const document::DocumentType& doc_type() const {
+ return *_testDocMan.getTypeRepo().getDocumentType("testdoctype1");
+ }
+ Document::SP createDummyDocument(const char* ns, const char* id) const {
+ return std::make_shared<Document>(doc_type(), DocumentId(DocIdString(ns, id)));
}
- std::shared_ptr<api::PutCommand> createPut(
- const Document::SP doc) const
- {
- return std::shared_ptr<api::PutCommand>(
- new api::PutCommand(makeDocumentBucket(document::BucketId(0)), doc, 100));
+ std::shared_ptr<api::PutCommand> createPut(Document::SP doc) const {
+ return std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), std::move(doc), 100);
}
};
-CPPUNIT_TEST_SUITE_REGISTRATION(PutOperationTest);
+PutOperationTest::~PutOperationTest() = default;
document::BucketId
PutOperationTest::createAndSendSampleDocument(uint32_t timeout) {
Document::SP
- doc(new Document(*_html_type,
- DocumentId(DocIdString("test", "test"))));
+ doc(new Document(doc_type(), DocumentId(DocIdString("test", "test"))));
document::BucketId id = getExternalOperationHandler().getBucketId(doc->getId());
addIdealNodes(id);
@@ -186,44 +125,36 @@ typedef bool RequirePrimaryWritten;
}
-void
-PutOperationTest::testSimple()
-{
+TEST_F(PutOperationTest, simple) {
setupDistributor(1, 1, "storage:1 distributor:1");
createAndSendSampleDocument(180);
- CPPUNIT_ASSERT_EQUAL(std::string("Put(BucketId(0x4000000000008b13), "
- "doc:test:test, timestamp 100, size 33) => 0"),
- _sender.getCommands(true, true));
+ ASSERT_EQ("Put(BucketId(0x4000000000008b13), "
+ "doc:test:test, timestamp 100, size 36) => 0",
+ _sender.getCommands(true, true));
sendReply();
- CPPUNIT_ASSERT_EQUAL(std::string("PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(NONE)"),
- _sender.getLastReply());
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
}
-void
-PutOperationTest::testBucketDatabaseGetsSpecialEntryWhenCreateBucketSent()
-{
+TEST_F(PutOperationTest, bucket_database_gets_special_entry_when_CreateBucket_sent) {
setupDistributor(2, 1, "storage:1 distributor:1");
Document::SP doc(createDummyDocument("test", "test"));
sendPut(createPut(doc));
// Database updated before CreateBucket is sent
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000008b13) : "
- "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false)"),
- dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
+ ASSERT_EQ("BucketId(0x4000000000008b13) : "
+ "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false)",
+ dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 0,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 0,Put => 0", _sender.getCommands(true));
}
-void
-PutOperationTest::testSendInlineSplitBeforePutIfBucketTooLarge()
-{
+TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) {
setupDistributor(1, 1, "storage:1 distributor:1");
getConfig().setSplitCount(1024);
getConfig().setSplitSize(1000000);
@@ -232,19 +163,16 @@ PutOperationTest::testSendInlineSplitBeforePutIfBucketTooLarge()
sendPut(createPut(createDummyDocument("test", "uri")));
- CPPUNIT_ASSERT_EQUAL(
- std::string("SplitBucketCommand(BucketId(0x4000000000002a52)Max doc count: "
- "1024, Max total doc size: 1000000) Reasons to start: "
- "[Splitting bucket because its maximum size (10000 b, 10000 docs, 10000 meta, 10000 b total) is "
- "higher than the configured limit of (1000000, 1024)] => 0,"
- "Put(BucketId(0x4000000000002a52), doc:test:uri, timestamp 100, "
- "size 32) => 0"),
- _sender.getCommands(true, true));
+ ASSERT_EQ("SplitBucketCommand(BucketId(0x4000000000002a52)Max doc count: "
+ "1024, Max total doc size: 1000000) Reasons to start: "
+ "[Splitting bucket because its maximum size (10000 b, 10000 docs, 10000 meta, 10000 b total) is "
+ "higher than the configured limit of (1000000, 1024)] => 0,"
+ "Put(BucketId(0x4000000000002a52), doc:test:uri, timestamp 100, "
+ "size 35) => 0",
+ _sender.getCommands(true, true));
}
-void
-PutOperationTest::testDoNotSendInlineSplitIfNotConfigured()
-{
+TEST_F(PutOperationTest, do_not_send_inline_split_if_not_configured) {
setupDistributor(1, 1, "storage:1 distributor:1");
getConfig().setSplitCount(1024);
getConfig().setDoInlineSplit(false);
@@ -253,94 +181,79 @@ PutOperationTest::testDoNotSendInlineSplitIfNotConfigured()
sendPut(createPut(createDummyDocument("test", "uri")));
- CPPUNIT_ASSERT_EQUAL(
- std::string(
- "Put(BucketId(0x4000000000002a52), doc:test:uri, timestamp 100, "
- "size 32) => 0"),
- _sender.getCommands(true, true));
+ ASSERT_EQ("Put(BucketId(0x4000000000002a52), doc:test:uri, timestamp 100, "
+ "size 35) => 0",
+ _sender.getCommands(true, true));
}
-void
-PutOperationTest::testNodeRemovedOnReply()
-{
+TEST_F(PutOperationTest, node_removed_on_reply) {
setupDistributor(2, 2, "storage:2 distributor:1");
createAndSendSampleDocument(180);
- CPPUNIT_ASSERT_EQUAL(
- std::string("Put(BucketId(0x4000000000008b13), "
- "doc:test:test, timestamp 100, size 33) => 1,"
- "Put(BucketId(0x4000000000008b13), "
- "doc:test:test, timestamp 100, size 33) => 0"),
- _sender.getCommands(true, true));
+ ASSERT_EQ("Put(BucketId(0x4000000000008b13), "
+ "doc:test:test, timestamp 100, size 36) => 1,"
+ "Put(BucketId(0x4000000000008b13), "
+ "doc:test:test, timestamp 100, size 36) => 0",
+ _sender.getCommands(true, true));
getExternalOperationHandler().removeNodeFromDB(makeDocumentBucket(document::BucketId(16, 0x8b13)), 0);
sendReply(0);
sendReply(1);
- CPPUNIT_ASSERT_EQUAL(std::string(
- "PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(BUCKET_DELETED, "
- "Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000008b13)) was deleted from nodes [0] "
- "after message was sent but before it was done. "
- "Sent to [1,0])"),
- _sender.getLastReply());
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(BUCKET_DELETED, "
+ "Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000008b13)) was deleted from nodes [0] "
+ "after message was sent but before it was done. "
+ "Sent to [1,0])",
+ _sender.getLastReply());
}
-void
-PutOperationTest::testStorageFailed()
-{
+TEST_F(PutOperationTest, storage_failed) {
setupDistributor(2, 1, "storage:1 distributor:1");
createAndSendSampleDocument(180);
sendReply(-1, api::ReturnCode::INTERNAL_FAILURE);
- CPPUNIT_ASSERT_EQUAL(std::string("PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(INTERNAL_FAILURE)"),
- _sender.getLastReply(true));
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(INTERNAL_FAILURE)",
+ _sender.getLastReply(true));
}
-void
-PutOperationTest::testMultipleCopies()
-{
+TEST_F(PutOperationTest, multiple_copies) {
setupDistributor(3, 4, "storage:4 distributor:1");
Document::SP doc(createDummyDocument("test", "test"));
sendPut(createPut(doc));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 3,Create bucket => 1,"
- "Create bucket => 0,Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 3,Create bucket => 1,"
+ "Create bucket => 0,Put => 3,Put => 1,Put => 0",
+ _sender.getCommands(true));
for (uint32_t i = 0; i < 6; i++) {
sendReply(i);
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(NONE)"),
- _sender.getLastReply(true));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000008b13) : "
- "node(idx=3,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
- "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
- "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)"),
- dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
-}
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply(true));
+ ASSERT_EQ("BucketId(0x4000000000008b13) : "
+ "node(idx=3,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
+ "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
+ "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
+}
-void
-PutOperationTest::testMultipleCopiesEarlyReturnPrimaryRequired()
-{
+TEST_F(PutOperationTest, multiple_copies_early_return_primary_required) {
setupDistributor(3, 4, "storage:4 distributor:1", 2, true);
sendPut(createPut(createDummyDocument("test", "test")));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 3,Create bucket => 1,"
- "Create bucket => 0,Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 3,Create bucket => 1,"
+ "Create bucket => 0,Put => 3,Put => 1,Put => 0",
+ _sender.getCommands(true));
// Reply to 2 CreateBucket, including primary
for (uint32_t i = 0; i < 2; i++) {
@@ -351,23 +264,19 @@ PutOperationTest::testMultipleCopiesEarlyReturnPrimaryRequired()
sendReply(3 + i);
}
- CPPUNIT_ASSERT_EQUAL(
- std::string(
- "PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(NONE)"),
- _sender.getLastReply());
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
}
-void
-PutOperationTest::testMultipleCopiesEarlyReturnPrimaryNotRequired()
-{
+TEST_F(PutOperationTest, multiple_copies_early_return_primary_not_required) {
setupDistributor(3, 4, "storage:4 distributor:1", 2, false);
sendPut(createPut(createDummyDocument("test", "test")));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 3,Create bucket => 1,"
- "Create bucket => 0,Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 3,Create bucket => 1,"
+ "Create bucket => 0,Put => 3,Put => 1,Put => 0",
+ _sender.getCommands(true));
// Reply only to 2 nodes (but not the primary)
for (uint32_t i = 1; i < 3; i++) {
@@ -377,22 +286,19 @@ PutOperationTest::testMultipleCopiesEarlyReturnPrimaryNotRequired()
sendReply(3 + i); // Put
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(NONE)"),
- _sender.getLastReply());
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
}
-void
-PutOperationTest::testMultipleCopiesEarlyReturnPrimaryRequiredNotDone()
-{
+TEST_F(PutOperationTest, multiple_copies_early_return_primary_required_not_done) {
setupDistributor(3, 4, "storage:4 distributor:1", 2, true);
sendPut(createPut(createDummyDocument("test", "test")));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 3,Create bucket => 1,"
- "Create bucket => 0,Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 3,Create bucket => 1,"
+ "Create bucket => 0,Put => 3,Put => 1,Put => 0",
+ _sender.getCommands(true));
// Reply only to 2 nodes (but not the primary)
sendReply(1);
@@ -400,20 +306,18 @@ PutOperationTest::testMultipleCopiesEarlyReturnPrimaryRequiredNotDone()
sendReply(4);
sendReply(5);
- CPPUNIT_ASSERT_EQUAL(0, (int)_sender.replies.size());
+ ASSERT_EQ(0, _sender.replies.size());
}
-void
-PutOperationTest::testDoNotRevertOnFailureAfterEarlyReturn()
-{
+TEST_F(PutOperationTest, do_not_revert_on_failure_after_early_return) {
setupDistributor(Redundancy(3),NodeCount(4), "storage:4 distributor:1",
ReturnAfter(2), RequirePrimaryWritten(false));
sendPut(createPut(createDummyDocument("test", "test")));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 3,Create bucket => 1,"
- "Create bucket => 0,Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 3,Create bucket => 1,"
+ "Create bucket => 0,Put => 3,Put => 1,Put => 0",
+ _sender.getCommands(true));
for (uint32_t i = 0; i < 3; i++) {
sendReply(i); // CreateBucket
@@ -422,28 +326,23 @@ PutOperationTest::testDoNotRevertOnFailureAfterEarlyReturn()
sendReply(3 + i); // Put
}
- CPPUNIT_ASSERT_EQUAL(
- std::string(
- "PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(NONE)"),
- _sender.getLastReply());
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
sendReply(5, api::ReturnCode::INTERNAL_FAILURE);
// Should not be any revert commands sent
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 3,Create bucket => 1,"
- "Create bucket => 0,Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 3,Create bucket => 1,"
+ "Create bucket => 0,Put => 3,Put => 1,Put => 0",
+ _sender.getCommands(true));
}
-void
-PutOperationTest::testRevertSuccessfulCopiesWhenOneFails()
-{
+TEST_F(PutOperationTest, revert_successful_copies_when_one_fails) {
setupDistributor(3, 4, "storage:4 distributor:1");
createAndSendSampleDocument(180);
- CPPUNIT_ASSERT_EQUAL(std::string("Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Put => 3,Put => 1,Put => 0", _sender.getCommands(true));
for (uint32_t i = 0; i < 2; i++) {
sendReply(i);
@@ -451,28 +350,24 @@ PutOperationTest::testRevertSuccessfulCopiesWhenOneFails()
sendReply(2, api::ReturnCode::INTERNAL_FAILURE);
- CPPUNIT_ASSERT_EQUAL(std::string("PutReply(doc:test:test, "
- "BucketId(0x0000000000000000), timestamp 100) "
- "ReturnCode(INTERNAL_FAILURE)"),
- _sender.getLastReply(true));
+ ASSERT_EQ("PutReply(doc:test:test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(INTERNAL_FAILURE)",
+ _sender.getLastReply(true));
- CPPUNIT_ASSERT_EQUAL(std::string("Revert => 3,Revert => 1"),
- _sender.getCommands(true, false, 3));
+ ASSERT_EQ("Revert => 3,Revert => 1", _sender.getCommands(true, false, 3));
}
-void
-PutOperationTest::testNoRevertIfRevertDisabled()
-{
+TEST_F(PutOperationTest, no_revert_if_revert_disabled) {
close();
getDirConfig().getConfig("stor-distributormanager")
.set("enable_revert", "false");
- setUp();
+ SetUp();
setupDistributor(3, 4, "storage:4 distributor:1");
createAndSendSampleDocument(180);
- CPPUNIT_ASSERT_EQUAL(std::string("Put => 3,Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Put => 3,Put => 1,Put => 0", _sender.getCommands(true));
for (uint32_t i = 0; i < 2; i++) {
sendReply(i);
@@ -480,26 +375,23 @@ PutOperationTest::testNoRevertIfRevertDisabled()
sendReply(2, api::ReturnCode::INTERNAL_FAILURE);
- CPPUNIT_ASSERT_EQUAL(std::string("PutReply(doc:test:test, "
- "BucketId(0x0000000000000000), timestamp 100) "
- "ReturnCode(INTERNAL_FAILURE)"),
- _sender.getLastReply(true));
+ ASSERT_EQ("PutReply(doc:test:test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(INTERNAL_FAILURE)",
+ _sender.getLastReply(true));
- CPPUNIT_ASSERT_EQUAL(std::string(""),
- _sender.getCommands(true, false, 3));
+ ASSERT_EQ("", _sender.getCommands(true, false, 3));
}
-void
-PutOperationTest::testDoNotSendCreateBucketIfAlreadyPending()
-{
+TEST_F(PutOperationTest, do_not_send_CreateBucket_if_already_pending) {
setupDistributor(2, 2, "storage:2 distributor:1");
Document::SP doc(createDummyDocument("test", "uri"));
sendPut(createPut(doc));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 1,Create bucket => 0,"
- "Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,"
+ "Put => 1,Put => 0",
+ _sender.getCommands(true));
// Manually shove sent messages into pending message tracker, since
// this isn't done automatically.
@@ -510,37 +402,32 @@ PutOperationTest::testDoNotSendCreateBucketIfAlreadyPending()
sendPut(createPut(doc));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 1,Create bucket => 0,"
- "Put => 1,Put => 0,"
- "Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,"
+ "Put => 1,Put => 0,"
+ "Put => 1,Put => 0",
+ _sender.getCommands(true));
}
-void
-PutOperationTest::testNoStorageNodes()
-{
+TEST_F(PutOperationTest, no_storage_nodes) {
setupDistributor(2, 1, "storage:0 distributor:1");
createAndSendSampleDocument(180);
- CPPUNIT_ASSERT_EQUAL(std::string("PutReply(doc:test:test, BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(NOT_CONNECTED, "
- "Can't store document: No storage nodes available)"),
- _sender.getLastReply(true));
+ ASSERT_EQ("PutReply(doc:test:test, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NOT_CONNECTED, "
+ "Can't store document: No storage nodes available)",
+ _sender.getLastReply(true));
}
-void
-PutOperationTest::testUpdateCorrectBucketOnRemappedPut()
-{
+TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
setupDistributor(2, 2, "storage:2 distributor:1");
- Document::SP doc(new Document(*_html_type, DocumentId(
+ Document::SP doc(new Document(doc_type(), DocumentId(
UserDocIdString("userdoc:test:13:uri"))));
addNodesToBucketDB(document::BucketId(16,13), "0=0,1=0");
sendPut(createPut(doc));
- CPPUNIT_ASSERT_EQUAL(std::string("Put => 0,Put => 1"),
- _sender.getCommands(true));
+ ASSERT_EQ("Put => 0,Put => 1", _sender.getCommands(true));
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.commands[0];
@@ -553,15 +440,14 @@ PutOperationTest::testUpdateCorrectBucketOnRemappedPut()
sendReply(1);
- CPPUNIT_ASSERT_EQUAL(std::string("PutReply(userdoc:test:13:uri, "
- "BucketId(0x0000000000000000), "
- "timestamp 100) ReturnCode(NONE)"),
- _sender.getLastReply());
+ ASSERT_EQ("PutReply(userdoc:test:13:uri, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x440000000000000d) : "
- "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)"),
- dumpBucket(document::BucketId(17, 13)));
+ ASSERT_EQ("BucketId(0x440000000000000d) : "
+ "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(document::BucketId(17, 13)));
}
BucketInfo
@@ -612,52 +498,78 @@ PutOperationTest::getNodes(const std::string& infoString) {
return ost.str();
}
-void
-PutOperationTest::testTargetNodes()
-{
+TEST_F(PutOperationTest, target_nodes) {
setupDistributor(2, 6, "storage:6 distributor:1");
// Ideal state of bucket is 1,3.
- CPPUNIT_ASSERT_EQUAL(std::string("target( 1 3 ) create( 1 3 )"), getNodes(""));
- CPPUNIT_ASSERT_EQUAL(std::string("target( 1 3 ) create( 3 )"), getNodes("1-1-true"));
- CPPUNIT_ASSERT_EQUAL(std::string("target( 1 3 ) create( 3 )"), getNodes("1-1-false"));
- CPPUNIT_ASSERT_EQUAL(std::string("target( 3 4 5 ) create( )"), getNodes("3-1-true,4-1-true,5-1-true"));
- CPPUNIT_ASSERT_EQUAL(std::string("target( 3 4 ) create( )"), getNodes("3-2-true,4-2-true,5-1-false"));
- CPPUNIT_ASSERT_EQUAL(std::string("target( 1 3 4 ) create( )"), getNodes("3-2-true,4-2-true,1-1-false"));
- CPPUNIT_ASSERT_EQUAL(std::string("target( 4 5 ) create( )"), getNodes("4-2-false,5-1-false"));
- CPPUNIT_ASSERT_EQUAL(std::string("target( 1 4 ) create( 1 )"), getNodes("4-1-true"));
+ ASSERT_EQ("target( 1 3 ) create( 1 3 )", getNodes(""));
+ ASSERT_EQ("target( 1 3 ) create( 3 )", getNodes("1-1-true"));
+ ASSERT_EQ("target( 1 3 ) create( 3 )", getNodes("1-1-false"));
+ ASSERT_EQ("target( 3 4 5 ) create( )", getNodes("3-1-true,4-1-true,5-1-true"));
+ ASSERT_EQ("target( 3 4 ) create( )", getNodes("3-2-true,4-2-true,5-1-false"));
+ ASSERT_EQ("target( 1 3 4 ) create( )", getNodes("3-2-true,4-2-true,1-1-false"));
+ ASSERT_EQ("target( 4 5 ) create( )", getNodes("4-2-false,5-1-false"));
+ ASSERT_EQ("target( 1 4 ) create( 1 )", getNodes("4-1-true"));
}
-void
-PutOperationTest::testDoNotResurrectDownedNodesInBucketDB()
-{
- setupDistributor(2, 2, "storage:2 distributor:1");
+TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) {
+ setupDistributor(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
Document::SP doc(createDummyDocument("test", "uri"));
document::BucketId bId = getExternalOperationHandler().getBucketId(doc->getId());
- addNodesToBucketDB(bId, "0=1/2/3/t,1=1/2/3/t");
+ addNodesToBucketDB(bId, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
sendPut(createPut(doc));
- CPPUNIT_ASSERT_EQUAL(std::string("Put => 1,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Put => 1,Put => 0,Put => 2", _sender.getCommands(true));
- enableDistributorClusterState("distributor:1 storage:2 .1.s:d");
+ enableDistributorClusterState("distributor:1 storage:3 .1.s:d .2.s:m");
addNodesToBucketDB(bId, "0=1/2/3/t"); // This will actually remove node #1.
- sendReply(0, api::ReturnCode::OK, api::BucketInfo(9,9,9));
- sendReply(1, api::ReturnCode::OK, api::BucketInfo(5,6,7));
+ sendReply(0, api::ReturnCode::OK, api::BucketInfo(9, 9, 9));
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9));
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000002a52) : "
- "node(idx=0,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)"),
- dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
+ ASSERT_EQ("BucketId(0x4000000000002a52) : "
+ "node(idx=0,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)",
+ dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
}
-void
-PutOperationTest::sendToRetiredNodesIfNoUpNodesAvailable()
-{
+TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
+ setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = getExternalOperationHandler().getBucketId(doc->getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Put => 1,Put => 0,Put => 2", _sender.getCommands(true));
+ // Trigger a pending (but not completed) cluster state transition where content
+ // node 0 is down. This will prune its replica from the DB. We assume that the
+ // downed node managed to send off a reply to the Put before it went down, and
+ // this reply must not recreate the replica in the bucket database. This is because
+ // we have an invariant that the DB shall not contain replicas on nodes that are
+ // not available.
+ // We also let node 2 be in maintenance to ensure we also consider this an unavailable state.
+ // Note that we have to explicitly trigger a transition that requires an async bucket
+ // fetch here; if we just set a node down the cluster state would be immediately applied
+ // and the distributor's "clear pending messages for downed nodes" logic would kick in
+ // and hide the problem.
+ getBucketDBUpdater().onSetSystemState(
+ std::make_shared<api::SetSystemStateCommand>(
+ lib::ClusterState("version:2 distributor:1 storage:4 .0.s:d .2.s:m")));
+
+ sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7));
+
+ ASSERT_EQ("BucketId(0x4000000000002a52) : "
+ "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)",
+ dumpBucket(bucket));
+}
+
+TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
setupDistributor(Redundancy(2), NodeCount(2),
"distributor:1 storage:2 .0.s:r .1.s:r");
Document::SP doc(createDummyDocument("test", "uri"));
@@ -667,41 +579,31 @@ PutOperationTest::sendToRetiredNodesIfNoUpNodesAvailable()
sendPut(createPut(doc));
- CPPUNIT_ASSERT_EQUAL("Put => 0,Put => 1"s,
- _sender.getCommands(true));
+ ASSERT_EQ("Put => 0,Put => 1", _sender.getCommands(true));
}
-void
-PutOperationTest::doTestCreationWithBucketActivationDisabled(bool disabled)
-{
+void PutOperationTest::do_test_creation_with_bucket_activation_disabled(bool disabled) {
setupDistributor(Redundancy(2), NodeCount(2), "distributor:1 storage:1");
disableBucketActivationInConfig(disabled);
Document::SP doc(createDummyDocument("test", "uri"));
sendPut(createPut(doc));
- CPPUNIT_ASSERT_EQUAL(std::string("Create bucket => 0,Put => 0"),
- _sender.getCommands(true));
+ ASSERT_EQ("Create bucket => 0,Put => 0", _sender.getCommands(true));
auto cmd = _sender.commands[0];
auto createCmd = std::dynamic_pointer_cast<api::CreateBucketCommand>(cmd);
- CPPUNIT_ASSERT(createCmd.get() != nullptr);
+ ASSERT_TRUE(createCmd.get() != nullptr);
// There's only 1 content node, so if activation were not disabled, it
// should always be activated.
- CPPUNIT_ASSERT_EQUAL(!disabled, createCmd->getActive());
+ ASSERT_EQ(!disabled, createCmd->getActive());
}
-void
-PutOperationTest::replicaImplicitlyActivatedWhenActivationIsNotDisabled()
-{
- doTestCreationWithBucketActivationDisabled(false);
-}
-
-void
-PutOperationTest::replicaNotImplicitlyActivatedWhenActivationIsDisabled()
-{
- doTestCreationWithBucketActivationDisabled(true);
+TEST_F(PutOperationTest, replica_implicitly_activated_when_activation_is_not_disabled) {
+ do_test_creation_with_bucket_activation_disabled(false);
}
+TEST_F(PutOperationTest, replica_not_implicitly_activated_when_activation_is_disabled) {
+ do_test_creation_with_bucket_activation_disabled(true);
}
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 5482696b945..b089d3ab83f 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -71,7 +71,7 @@ BucketOwnership
BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const
{
if (hasPendingClusterState()) {
- const lib::ClusterState& state(*_pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(b.getBucketSpace()));
+ const auto& state(*_pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(b.getBucketSpace()));
if (!_distributorComponent.ownsBucketInState(state, b)) {
return BucketOwnership::createNotOwnedInState(state);
}
@@ -79,6 +79,13 @@ BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const
return BucketOwnership::createOwned();
}
+const lib::ClusterState*
+BucketDBUpdater::pendingClusterStateOrNull(const document::BucketSpace& space) const {
+ return (hasPendingClusterState()
+ ? _pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(space).get()
+ : nullptr);
+}
+
void
BucketDBUpdater::sendRequestBucketInfo(
uint16_t node,
@@ -850,6 +857,7 @@ BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e)
for (uint16_t i = 0; i < e->getNodeCount(); i++) {
Node n(NodeType::STORAGE, e->getNodeRef(i).getNode());
+ // TODO replace with intersection hash set of config and cluster state
if (_state.getNodeState(n).getState().oneOf(_upStates)) {
remainingCopies.push_back(e->getNodeRef(i));
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index e9877d37e67..dbcf6699bdc 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -37,10 +37,13 @@ public:
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorMessageSender& sender,
DistributorComponentRegister& compReg);
- ~BucketDBUpdater();
+ ~BucketDBUpdater() override;
void flush();
+ // If there is a pending state, returns ownership state of bucket in it.
+ // Otherwise always returns "is owned", i.e. it must also be checked in the current state.
BucketOwnership checkOwnershipInPendingState(const document::Bucket&) const;
+ const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const;
void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index c92dfbdc14e..771baa04fca 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -143,6 +143,11 @@ Distributor::checkOwnershipInPendingState(const document::Bucket &b) const
return _bucketDBUpdater.checkOwnershipInPendingState(b);
}
+const lib::ClusterState*
+Distributor::pendingClusterStateOrNull(const document::BucketSpace& space) const {
+ return _bucketDBUpdater.pendingClusterStateOrNull(space);
+}
+
void
Distributor::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd)
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index cd24b91eba2..3237b4b4d71 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -74,6 +74,8 @@ public:
BucketOwnership checkOwnershipInPendingState(const document::Bucket &bucket) const override;
+ const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;
+
/**
* Enables a new cluster state. Called after the bucket db updater has
* retrieved all bucket info related to the change.
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
index 9bd215b9644..1c84376dea6 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
@@ -194,29 +194,28 @@ DistributorComponent::removeNodesFromDB(const document::Bucket &bucket,
}
}
-std::vector<uint16_t>
-DistributorComponent::enumerateDownNodes(
+void
+DistributorComponent::enumerateUnavailableNodes(
+ std::vector<uint16_t>& unavailableNodes,
const lib::ClusterState& s,
- const document::Bucket &bucket,
+ const document::Bucket& bucket,
const std::vector<BucketCopy>& candidates) const
{
- std::vector<uint16_t> downNodes;
+ const auto* up_states = _distributor.getStorageNodeUpStates();
for (uint32_t i = 0; i < candidates.size(); ++i) {
const BucketCopy& copy(candidates[i]);
const lib::NodeState& ns(
- s.getNodeState(lib::Node(lib::NodeType::STORAGE,
- copy.getNode())));
- if (ns.getState() == lib::State::DOWN) {
+ s.getNodeState(lib::Node(lib::NodeType::STORAGE, copy.getNode())));
+ if (!ns.getState().oneOf(up_states)) {
LOG(debug,
"Trying to add a bucket copy to %s whose node is marked as "
"down in the cluster state: %s. Ignoring it since no zombies "
"are allowed!",
bucket.toString().c_str(),
copy.toString().c_str());
- downNodes.push_back(copy.getNode());
+ unavailableNodes.emplace_back(copy.getNode());
}
}
- return downNodes;
}
void
@@ -236,8 +235,6 @@ DistributorComponent::updateBucketDatabase(
"cluster state '%s' - ignoring!",
bucket.toString().c_str(),
ownership.getNonOwnedState().toString().c_str());
- LOG_BUCKET_OPERATION_NO_LOCK(bucketId, "Ignoring database insert since "
- "we do not own the bucket");
return;
}
@@ -258,22 +255,24 @@ DistributorComponent::updateBucketDatabase(
}
// Ensure that we're not trying to bring any zombie copies into the
- // bucket database (i.e. copies on nodes that are actually down).
- std::vector<uint16_t> downNodes(
- enumerateDownNodes(bucketSpace.getClusterState(), bucket, changedNodes));
+ // bucket database (i.e. copies on nodes that are actually unavailable).
+ std::vector<uint16_t> unavailableNodes;
+ enumerateUnavailableNodes(unavailableNodes, bucketSpace.getClusterState(), bucket, changedNodes);
+ if (auto* pending_state = _distributor.pendingClusterStateOrNull(bucket.getBucketSpace())) {
+ enumerateUnavailableNodes(unavailableNodes, *pending_state, bucket, changedNodes);
+ }
// Optimize for common case where we don't have to create a new
// bucket copy vector
- if (downNodes.empty()) {
+ if (unavailableNodes.empty()) {
dbentry->addNodes(changedNodes, getIdealNodes(bucket));
} else {
std::vector<BucketCopy> upNodes;
for (uint32_t i = 0; i < changedNodes.size(); ++i) {
const BucketCopy& copy(changedNodes[i]);
- if (std::find(downNodes.begin(), downNodes.end(),
- copy.getNode())
- == downNodes.end())
+ if (std::find(unavailableNodes.begin(), unavailableNodes.end(), copy.getNode())
+ == unavailableNodes.end())
{
- upNodes.push_back(copy);
+ upNodes.emplace_back(copy);
}
}
dbentry->addNodes(upNodes, getIdealNodes(bucket));
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index f2aea89d47c..df25efb3b00 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -175,9 +175,10 @@ public:
bool initializing() const;
private:
- std::vector<uint16_t> enumerateDownNodes(
+ void enumerateUnavailableNodes(
+ std::vector<uint16_t>& unavailableNodes,
const lib::ClusterState& s,
- const document::Bucket &bucket,
+ const document::Bucket& bucket,
const std::vector<BucketCopy>& candidates) const;
DistributorInterface& _distributor;
diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h
index 17c300fa0a9..d9f037bb8f1 100644
--- a/storage/src/vespa/storage/distributor/distributorinterface.h
+++ b/storage/src/vespa/storage/distributor/distributorinterface.h
@@ -24,6 +24,7 @@ public:
virtual DistributorMetricSet& getMetrics() = 0;
virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0;
virtual BucketOwnership checkOwnershipInPendingState(const document::Bucket &bucket) const = 0;
+ virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0;
virtual void notifyDistributionChangeEnabled() = 0;
/**