From d5b2fa84e187bf17b675f4601acdbd9b142fbc78 Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Thu, 3 Dec 2020 16:22:36 +0000 Subject: Remove dependency between ExternalOperationHandler and DistributorComponent. --- storage/src/tests/common/teststorageapp.cpp | 1 + storage/src/tests/common/teststorageapp.h | 11 ++- storage/src/tests/distributor/distributortest.cpp | 16 ++-- .../src/tests/distributor/distributortestutil.cpp | 17 ++-- .../src/tests/distributor/distributortestutil.h | 8 +- .../distributor/externaloperationhandlertest.cpp | 12 +-- storage/src/tests/distributor/getoperationtest.cpp | 4 +- .../distributor/operationtargetresolvertest.cpp | 4 +- storage/src/tests/distributor/putoperationtest.cpp | 26 +++--- .../read_for_write_visitor_operation_test.cpp | 2 +- .../src/tests/distributor/removelocationtest.cpp | 6 +- .../src/tests/distributor/removeoperationtest.cpp | 6 +- .../src/tests/distributor/statecheckerstest.cpp | 24 +++--- .../src/tests/distributor/statoperationtest.cpp | 2 +- .../distributor/twophaseupdateoperationtest.cpp | 6 +- .../src/tests/distributor/updateoperationtest.cpp | 6 +- .../src/tests/distributor/visitoroperationtest.cpp | 4 +- storage/src/vespa/storage/common/CMakeLists.txt | 3 +- storage/src/vespa/storage/common/node_identity.cpp | 16 ++++ storage/src/vespa/storage/common/node_identity.h | 28 +++++++ .../src/vespa/storage/distributor/distributor.cpp | 32 ++++---- .../src/vespa/storage/distributor/distributor.h | 15 ++-- .../distributor/distributor_operation_context.h | 10 ++- .../storage/distributor/distributorcomponent.h | 22 ++++- .../distributor/externaloperationhandler.cpp | 96 ++++++++++++---------- .../storage/distributor/externaloperationhandler.h | 20 +++-- .../storage/storageserver/distributornode.cpp | 2 +- .../vespa/storage/storageserver/storagenode.cpp | 13 +-- .../src/vespa/storage/storageserver/storagenode.h | 14 ++-- 29 files changed, 265 insertions(+), 161 deletions(-) create mode 100644 storage/src/vespa/storage/common/node_identity.cpp create mode 100644 storage/src/vespa/storage/common/node_identity.h (limited to 'storage') diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index 81ef5a2e800..04dc1a03dd3 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -41,6 +41,7 @@ TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg, _docMan(), _nodeStateUpdater(type), _configId(configId), + _node_identity("test_cluster", type, index), _initialized(false) { // Use config to adjust values diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 376b1afd9c6..5fa45fe804d 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -18,17 +18,18 @@ #pragma once #include "testnodestateupdater.h" +#include +#include +#include #include #include +#include #include -#include #include #include +#include #include #include -#include -#include -#include #include #include @@ -50,6 +51,7 @@ protected: document::TestDocMan _docMan; TestNodeStateUpdater _nodeStateUpdater; vespalib::string _configId; + NodeIdentity _node_identity; std::atomic _initialized; public: @@ -85,6 +87,7 @@ public: { return _compReg.getDistribution(); } TestNodeStateUpdater& getNodeStateUpdater() { return _nodeStateUpdater; } uint16_t getIndex() const { return _compReg.getIndex(); } + const NodeIdentity& node_identity() const { return _node_identity; } // The storage app also implements the done initializer interface, so it can // be sent to components needing this. diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 93f503c27e4..25221b9cc7d 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -120,14 +120,14 @@ struct DistributorTest : Test, DistributorTestUtil { } } - getExternalOperationHandler().removeNodesFromDB(makeDocumentBucket(document::BucketId(16, 1)), removedNodes); + distributor_component().removeNodesFromDB(makeDocumentBucket(document::BucketId(16, 1)), removedNodes); uint32_t flags(DatabaseUpdate::CREATE_IF_NONEXISTING | (resetTrusted ? DatabaseUpdate::RESET_TRUSTED : 0)); - getExternalOperationHandler().updateBucketDatabase(makeDocumentBucket(document::BucketId(16, 1)), - changedNodes, - flags); + distributor_component().updateBucketDatabase(makeDocumentBucket(document::BucketId(16, 1)), + changedNodes, + flags); } std::string retVal = dumpBucket(document::BucketId(16, 1)); @@ -562,8 +562,8 @@ TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state std::vector copies; copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); - getExternalOperationHandler().updateBucketDatabase(makeDocumentBucket(nonOwnedBucket), copies, - DatabaseUpdate::CREATE_IF_NONEXISTING); + distributor_component().updateBucketDatabase(makeDocumentBucket(nonOwnedBucket), copies, + DatabaseUpdate::CREATE_IF_NONEXISTING); EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket)); } @@ -575,8 +575,8 @@ TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_cur std::vector copies; copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); - getExternalOperationHandler().updateBucketDatabase(makeDocumentBucket(bucket), copies, - DatabaseUpdate::CREATE_IF_NONEXISTING); + distributor_component().updateBucketDatabase(makeDocumentBucket(bucket), copies, + DatabaseUpdate::CREATE_IF_NONEXISTING); BucketDatabase::Entry e(getBucket(bucket)); EXPECT_EQ(101234, e->getLastGarbageCollectionTime()); } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 17932ccffd7..2d2038b7cd0 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -1,11 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "distributortestutil.h" +#include +#include +#include #include #include -#include +#include #include -#include -#include using document::test::makeBucketSpace; using document::test::makeDocumentBucket; @@ -26,6 +27,7 @@ DistributorTestUtil::createLinks() _threadPool = framework::TickingThreadPool::createDefault("distributor"); _distributor.reset(new Distributor( _node->getComponentRegister(), + _node->node_identity(), *_threadPool, *this, true, @@ -244,7 +246,7 @@ DistributorTestUtil::removeFromBucketDB(const document::BucketId& id) void DistributorTestUtil::addIdealNodes(const document::BucketId& id) { - addIdealNodes(*getExternalOperationHandler().getClusterStateBundle().getBaselineClusterState(), id); + addIdealNodes(*distributor_component().getClusterStateBundle().getBaselineClusterState(), id); } void @@ -276,7 +278,7 @@ DistributorTestUtil::insertBucketInfo(document::BucketId id, if (active) { info2.setActive(); } - BucketCopy copy(getExternalOperationHandler().getUniqueTimestamp(), node, info2); + BucketCopy copy(distributor_component().getUniqueTimestamp(), node, info2); entry->addNode(copy.setTrusted(trusted), toVector(0)); @@ -336,6 +338,11 @@ DistributorTestUtil::getExternalOperationHandler() { return _distributor->_externalOperationHandler; } +storage::distributor::DistributorComponent& +DistributorTestUtil::distributor_component() { + return _distributor->_component; +} + bool DistributorTestUtil::tick() { framework::ThreadWaitInfo res( diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index a6bd9d5d84c..1bdb6e33512 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -2,14 +2,14 @@ #pragma once #include "distributor_message_sender_stub.h" -#include -#include #include +#include +#include #include #include #include -#include #include +#include namespace storage { @@ -21,6 +21,7 @@ class BucketDBUpdater; class Distributor; class DistributorBucketSpace; class DistributorBucketSpaceRepo; +class DistributorComponent; class IdealStateManager; class ExternalOperationHandler; class Operation; @@ -111,6 +112,7 @@ public: BucketDBUpdater& getBucketDBUpdater(); IdealStateManager& getIdealStateManager(); ExternalOperationHandler& getExternalOperationHandler(); + storage::distributor::DistributorComponent& distributor_component(); Distributor& getDistributor() { return *_distributor; diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 40516622b65..b230781a90c 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -95,19 +95,19 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "16"); EXPECT_EQ(document::BucketId(16, 0xffff), - getExternalOperationHandler().getBucketId(document::DocumentId( + distributor_component().getBucketId(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0xffff)) ).stripUnused()); EXPECT_EQ(document::BucketId(16, 0), - getExternalOperationHandler().getBucketId(document::DocumentId( + distributor_component().getBucketId(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x10000)) ).stripUnused()); EXPECT_EQ(document::BucketId(16, 0xffff), - getExternalOperationHandler().getBucketId(document::DocumentId( + distributor_component().getBucketId(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0xffff)) ).stripUnused()); EXPECT_EQ(document::BucketId(16, 0x100), - getExternalOperationHandler().getBucketId(document::DocumentId( + distributor_component().getBucketId(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x100)) ).stripUnused()); close(); @@ -116,11 +116,11 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "20"); createLinks(); EXPECT_EQ(document::BucketId(20, 0x11111), - getExternalOperationHandler().getBucketId(document::DocumentId( + distributor_component().getBucketId(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x111111)) ).stripUnused()); EXPECT_EQ(document::BucketId(20, 0x22222), - getExternalOperationHandler().getBucketId(document::DocumentId( + distributor_component().getBucketId(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x222222)) ).stripUnused()); } diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 410705bb1a0..fe87de5f18a 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -45,7 +45,7 @@ struct GetOperationTest : Test, DistributorTestUtil { createLinks(); docId = document::DocumentId("id:ns:text/html::uri"); - bucketId = getExternalOperationHandler().getBucketId(docId); + bucketId = distributor_component().getBucketId(docId); }; void TearDown() override { @@ -56,7 +56,7 @@ struct GetOperationTest : Test, DistributorTestUtil { void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) { auto msg = std::make_shared(makeDocumentBucket(BucketId(0)), docId, document::AllFields::NAME); op = std::make_unique( - getExternalOperationHandler(), getDistributorBucketSpace(), + distributor_component(), getDistributorBucketSpace(), getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(), msg, getDistributor().getMetrics().gets, consistency); diff --git a/storage/src/tests/distributor/operationtargetresolvertest.cpp b/storage/src/tests/distributor/operationtargetresolvertest.cpp index da0206cf0a4..3098d8382c8 100644 --- a/storage/src/tests/distributor/operationtargetresolvertest.cpp +++ b/storage/src/tests/distributor/operationtargetresolvertest.cpp @@ -115,7 +115,7 @@ OperationTargetResolverTest::getInstances(const BucketId& id, bool stripToRedundancy) { lib::IdealNodeCalculatorImpl idealNodeCalc; - auto &bucketSpaceRepo(getExternalOperationHandler().getBucketSpaceRepo()); + auto &bucketSpaceRepo(distributor_component().getBucketSpaceRepo()); auto &distributorBucketSpace(bucketSpaceRepo.get(makeBucketSpace())); idealNodeCalc.setDistribution(distributorBucketSpace.getDistribution()); idealNodeCalc.setClusterState(distributorBucketSpace.getClusterState()); @@ -144,7 +144,7 @@ TEST_F(OperationTargetResolverTest, simple) { TEST_F(OperationTargetResolverTest, multiple_nodes) { setupDistributor(1, 2, "storage:2 distributor:1"); - auto &bucketSpaceRepo(getExternalOperationHandler().getBucketSpaceRepo()); + auto &bucketSpaceRepo(distributor_component().getBucketSpaceRepo()); auto &distributorBucketSpace(bucketSpaceRepo.get(makeBucketSpace())); for (int i = 0; i < 100; ++i) { addNodesToBucketDB(BucketId(16, i), "0=0,1=0"); diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index e86eded7392..2a220cb9ef8 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -73,8 +73,8 @@ public: } void sendPut(std::shared_ptr msg) { - op = std::make_unique(getExternalOperationHandler(), - getExternalOperationHandler(), + op = std::make_unique(distributor_component(), + distributor_component(), getDistributorBucketSpace(), msg, getDistributor().getMetrics(). @@ -103,7 +103,7 @@ document::BucketId PutOperationTest::createAndSendSampleDocument(vespalib::duration timeout) { auto doc = std::make_shared(doc_type(), DocumentId("id:test:testdoctype1::")); - document::BucketId id = getExternalOperationHandler().getBucketId(doc->getId()); + document::BucketId id = distributor_component().getBucketId(doc->getId()); addIdealNodes(id); auto msg = std::make_shared(makeDocumentBucket(document::BucketId(0)), doc, 0); @@ -149,7 +149,7 @@ TEST_F(PutOperationTest, bucket_database_gets_special_entry_when_CreateBucket_se // Database updated before CreateBucket is sent ASSERT_EQ("BucketId(0x4000000000008f09) : " "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false)", - dumpBucket(getExternalOperationHandler().getBucketId(doc->getId()))); + dumpBucket(distributor_component().getBucketId(doc->getId()))); ASSERT_EQ("Create bucket => 0,Put => 0", _sender.getCommands(true)); } @@ -196,7 +196,7 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck "id:test:testdoctype1::, timestamp 100, size 45) => 1", _sender.getCommands(true, true)); - getExternalOperationHandler().removeNodeFromDB(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0); + distributor_component().removeNodeFromDB(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0); // If we get an ACK from the backend nodes, the operation has been persisted OK. // Even if the bucket has been removed from the DB in the meantime (usually would @@ -248,7 +248,7 @@ TEST_F(PutOperationTest, multiple_copies) { "node(idx=3,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), " "node(idx=2,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), " "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", - dumpBucket(getExternalOperationHandler().getBucketId(doc->getId()))); + dumpBucket(distributor_component().getBucketId(doc->getId()))); } TEST_F(PutOperationTest, multiple_copies_early_return_primary_required) { @@ -401,7 +401,7 @@ TEST_F(PutOperationTest, do_not_send_CreateBucket_if_already_pending) { // Manually shove sent messages into pending message tracker, since // this isn't done automatically. for (size_t i = 0; i < _sender.commands().size(); ++i) { - getExternalOperationHandler().getDistributor().getPendingMessageTracker() + distributor_component().getDistributor().getPendingMessageTracker() .insert(_sender.command(i)); } @@ -476,7 +476,7 @@ parseBucketInfoString(const std::string& nodeList) { std::string PutOperationTest::getNodes(const std::string& infoString) { Document::SP doc(createDummyDocument("test", "uri")); - document::BucketId bid(getExternalOperationHandler().getBucketId(doc->getId())); + document::BucketId bid(distributor_component().getBucketId(doc->getId())); BucketInfo entry = parseBucketInfoString(infoString); @@ -518,7 +518,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ setupDistributor(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); Document::SP doc(createDummyDocument("test", "uri")); - document::BucketId bId = getExternalOperationHandler().getBucketId(doc->getId()); + document::BucketId bId = distributor_component().getBucketId(doc->getId()); addNodesToBucketDB(bId, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); @@ -535,14 +535,14 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ ASSERT_EQ("BucketId(0x4000000000000593) : " "node(idx=0,crc=0x7,docs=8/8,bytes=9/9,trusted=true,active=false,ready=false)", - dumpBucket(getExternalOperationHandler().getBucketId(doc->getId()))); + dumpBucket(distributor_component().getBucketId(doc->getId()))); } TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) { setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "uri"); - auto bucket = getExternalOperationHandler().getBucketId(doc->getId()); + auto bucket = distributor_component().getBucketId(doc->getId()); addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); sendPut(createPut(doc)); @@ -576,7 +576,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) { setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "test"); - auto bucket = getExternalOperationHandler().getBucketId(doc->getId()); + auto bucket = distributor_component().getBucketId(doc->getId()); addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); getBucketDBUpdater().onSetSystemState( std::make_shared( @@ -596,7 +596,7 @@ TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) { "distributor:1 storage:2 .0.s:r .1.s:r"); Document::SP doc(createDummyDocument("test", "uri")); document::BucketId bucket( - getExternalOperationHandler().getBucketId(doc->getId())); + distributor_component().getBucketId(doc->getId())); addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t"); sendPut(createPut(doc)); diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index daa2ca94bb3..e8401914abf 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -62,7 +62,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { cmd->addBucketToBeVisited(BucketId()); // Will be inferred to first sub-bucket in DB } return std::make_shared( - getExternalOperationHandler(), getExternalOperationHandler(), + distributor_component(), distributor_component(), getDistributorBucketSpace(), cmd, _default_config, getDistributor().getMetrics().visits); } diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp index 47b6e436fb1..779d6f60be0 100644 --- a/storage/src/tests/distributor/removelocationtest.cpp +++ b/storage/src/tests/distributor/removelocationtest.cpp @@ -27,9 +27,9 @@ struct RemoveLocationOperationTest : Test, DistributorTestUtil { auto msg = std::make_shared(selection, makeDocumentBucket(document::BucketId(0))); op = std::make_unique( - getExternalOperationHandler(), - getExternalOperationHandler(), - getExternalOperationHandler(), + distributor_component(), + distributor_component(), + distributor_component(), getDistributorBucketSpace(), msg, getDistributor().getMetrics(). diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index 49d60292326..cc5452e2bfe 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -23,7 +23,7 @@ struct RemoveOperationTest : Test, DistributorTestUtil { createLinks(); docId = document::DocumentId("id:test:test::uri"); - bucketId = getExternalOperationHandler().getBucketId(docId); + bucketId = distributor_component().getBucketId(docId); enableDistributorClusterState("distributor:1 storage:4"); }; @@ -35,8 +35,8 @@ struct RemoveOperationTest : Test, DistributorTestUtil { auto msg = std::make_shared(makeDocumentBucket(document::BucketId(0)), dId, 100); op = std::make_unique( - getExternalOperationHandler(), - getExternalOperationHandler(), + distributor_component(), + distributor_component(), getDistributorBucketSpace(), msg, getDistributor().getMetrics(). diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index f66aab26dc9..00c1c7bb403 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -238,7 +238,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { tick(); // Trigger command processing and pending state setup. } NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), + StateChecker::Context c(distributor_component(), getBucketSpaceRepo().get(params._bucket_space), statsTracker, bucket); @@ -290,7 +290,7 @@ std::string StateCheckersTest::testSplit(uint32_t splitCount, SplitBucketStateChecker checker; NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); getConfig().setSplitSize(splitSize); getConfig().setSplitCount(splitCount); getConfig().setMinimalBucketSplit(minSplitBits); @@ -375,7 +375,7 @@ StateCheckersTest::testInconsistentSplit(const document::BucketId& bid, { SplitInconsistentStateChecker checker; NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); return testStateChecker(checker, c, true, PendingMessage(), includePriority); } @@ -433,7 +433,7 @@ StateCheckersTest::testJoin(uint32_t joinCount, getConfig().setMinimalBucketSplit(minSplitBits); NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); return testStateChecker(checker, c, true, blocker, includePriority); } @@ -586,7 +586,7 @@ StateCheckersTest::testSynchronizeAndMove(const std::string& bucketInfo, enableDistributorClusterState(clusterState); NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); return testStateChecker(checker, c, false, blocker, includePriority); } @@ -820,7 +820,7 @@ StateCheckersTest::testDeleteExtraCopies( } DeleteExtraCopiesStateChecker checker; NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); return testStateChecker(checker, c, false, blocker, includePriority); } @@ -937,7 +937,7 @@ std::string StateCheckersTest::testBucketState( BucketStateStateChecker checker; NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); return testStateChecker(checker, c, false, PendingMessage(), includePriority); } @@ -1104,7 +1104,7 @@ std::string StateCheckersTest::testBucketStatePerGroup( BucketStateStateChecker checker; NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); return testStateChecker(checker, c, false, PendingMessage(), includePriority); } @@ -1231,7 +1231,7 @@ std::string StateCheckersTest::testGarbageCollection( getConfig().setGarbageCollection("music", std::chrono::seconds(checkInterval)); getConfig().setLastGarbageCollectionChangeTime(vespalib::steady_time(std::chrono::seconds(lastChangeTime))); NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(e.getBucketId())); getClock().setAbsoluteTimeInSeconds(nowTimestamp); return testStateChecker(checker, c, false, PendingMessage(), @@ -1304,7 +1304,7 @@ TEST_F(StateCheckersTest, gc_inhibited_when_ideal_node_in_maintenance) { getConfig().setGarbageCollection("music", 3600s); getConfig().setLastGarbageCollectionChangeTime(vespalib::steady_time(vespalib::duration::zero())); NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bucket)); getClock().setAbsoluteTimeInSeconds(4000); // Would normally (in a non-maintenance case) trigger GC due to having @@ -1448,7 +1448,7 @@ TEST_F(StateCheckersTest, context_populates_ideal_state_containers) { setupDistributor(2, 100, "distributor:1 storage:4"); NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket({17, 0})); + StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket({17, 0})); ASSERT_THAT(c.idealState, ElementsAre(1, 3)); // TODO replace with UnorderedElementsAre once we can build gmock without issues @@ -1491,7 +1491,7 @@ public: // NOTE: resets the bucket database! void runFor(const document::BucketId& bid) { Checker checker; - StateChecker::Context c(_fixture.getExternalOperationHandler(), _fixture.getDistributorBucketSpace(), _statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(_fixture.distributor_component(), _fixture.getDistributorBucketSpace(), _statsTracker, makeDocumentBucket(bid)); _result = _fixture.testStateChecker( checker, c, false, StateCheckersTest::PendingMessage(), false); } diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp index 981247c0753..a80eb9533bb 100644 --- a/storage/src/tests/distributor/statoperationtest.cpp +++ b/storage/src/tests/distributor/statoperationtest.cpp @@ -73,7 +73,7 @@ TEST_F(StatOperationTest, bucket_list) { StatBucketListOperation op( getDistributorBucketSpace().getBucketDatabase(), getIdealStateManager(), - getExternalOperationHandler().getIndex(), + distributor_component().getIndex(), msg); op.start(_sender, framework::MilliSecTime(0)); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 6e3837afa43..1518c8594aa 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -302,7 +302,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, } update->setCreateIfNonExistent(options._createIfNonExistent); - document::BucketId id = getExternalOperationHandler().getBucketId(update->getId()); + document::BucketId id = distributor_component().getBucketId(update->getId()); document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId()); if (bucketState.length()) { @@ -325,9 +325,9 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, msg->setCondition(options._condition); msg->setTransportContext(std::make_unique()); - ExternalOperationHandler& handler = getExternalOperationHandler(); + auto& comp = distributor_component(); return std::make_shared( - handler, handler, handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics()); + comp, comp, comp, getDistributorBucketSpace(), msg, getDistributor().getMetrics()); } TEST_F(TwoPhaseUpdateOperationTest, simple) { diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 8c5fb6db378..00e1e832b3c 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -59,15 +59,15 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m document::DocumentId("id:ns:" + _html_type->getName() + "::1")); update->setCreateIfNonExistent(create_if_missing); - _bId = getExternalOperationHandler().getBucketId(update->getId()); + _bId = distributor_component().getBucketId(update->getId()); addNodesToBucketDB(_bId, bucketState); auto msg = std::make_shared(makeDocumentBucket(document::BucketId(0)), update, 100); - ExternalOperationHandler& handler = getExternalOperationHandler(); + auto& comp = distributor_component(); return std::make_shared( - handler, handler, getDistributorBucketSpace(), msg, + comp, comp, getDistributorBucketSpace(), msg, getDistributor().getMetrics().updates); } diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index 0af4aff02d5..f37ded7aacb 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -102,8 +102,8 @@ struct VisitorOperationTest : Test, DistributorTestUtil { const VisitorOperation::Config& config) { return std::make_unique( - getExternalOperationHandler(), - getExternalOperationHandler(), + distributor_component(), + distributor_component(), getDistributorBucketSpace(), msg, config, diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt index 29130515d2a..efbd62a45a0 100644 --- a/storage/src/vespa/storage/common/CMakeLists.txt +++ b/storage/src/vespa/storage/common/CMakeLists.txt @@ -9,13 +9,14 @@ vespa_add_library(storage_common OBJECT global_bucket_space_distribution_converter.cpp messagebucket.cpp messagesender.cpp + node_identity.cpp reindexing_constants.cpp servicelayercomponent.cpp statusmessages.cpp statusmetricconsumer.cpp + storage_chain_builder.cpp storagecomponent.cpp storagelink.cpp storagelinkqueued.cpp - storage_chain_builder.cpp DEPENDS ) diff --git a/storage/src/vespa/storage/common/node_identity.cpp b/storage/src/vespa/storage/common/node_identity.cpp new file mode 100644 index 00000000000..2ad940b7b20 --- /dev/null +++ b/storage/src/vespa/storage/common/node_identity.cpp @@ -0,0 +1,16 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "node_identity.h" + +namespace storage { + +NodeIdentity::NodeIdentity(vespalib::stringref cluster_name_in, + const lib::NodeType& node_type_in, + uint16_t node_index_in) + : _cluster_name(cluster_name_in), + _node_type(node_type_in), + _node_index(node_index_in) +{ +} + +} diff --git a/storage/src/vespa/storage/common/node_identity.h b/storage/src/vespa/storage/common/node_identity.h new file mode 100644 index 00000000000..ea2edb98cdd --- /dev/null +++ b/storage/src/vespa/storage/common/node_identity.h @@ -0,0 +1,28 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include +#include + +namespace storage { + +/** + * Class that represents the identity of a storage or distributor node. + */ +class NodeIdentity { +private: + vespalib::string _cluster_name; + const lib::NodeType& _node_type; + uint16_t _node_index; + +public: + NodeIdentity(vespalib::stringref cluster_name_in, + const lib::NodeType& node_type_in, + uint16_t node_index_in); + const vespalib::string& cluster_name() const { return _cluster_name; } + const lib::NodeType& node_type() const { return _node_type; } + uint16_t node_index() const { return _node_index; } +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 0ad8118d925..3f42e9801cc 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -1,18 +1,19 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. // -#include "distributor.h" #include "blockingoperationstarter.h" -#include "throttlingoperationstarter.h" -#include "idealstatemetricsset.h" -#include "ownership_transfer_safe_time_point_calculator.h" +#include "distributor.h" #include "distributor_bucket_space.h" #include "distributormetricsset.h" -#include -#include -#include +#include "idealstatemetricsset.h" +#include "ownership_transfer_safe_time_point_calculator.h" +#include "throttlingoperationstarter.h" +#include #include +#include +#include +#include +#include #include -#include #include #include @@ -62,6 +63,7 @@ public: }; Distributor::Distributor(DistributorComponentRegister& compReg, + const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, DoneInitializeHandler& doneInitHandler, bool manageActiveBucketCopies, @@ -71,10 +73,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg, DistributorInterface(), framework::StatusReporter("distributor", "Distributor"), _clusterStateBundle(lib::ClusterState()), - _compReg(compReg), - _component(compReg, "distributor"), - _bucketSpaceRepo(std::make_unique(_component.getIndex())), - _readOnlyBucketSpaceRepo(std::make_unique(_component.getIndex())), + _bucketSpaceRepo(std::make_unique(node_identity.node_index())), + _readOnlyBucketSpaceRepo(std::make_unique(node_identity.node_index())), + _component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"), _metrics(std::make_shared()), _operationOwner(*this, _component.getClock()), _maintenanceOperationOwner(*this, _component.getClock()), @@ -83,13 +84,14 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), - _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, - _idealStateManager, _operationOwner, compReg), + _messageSender(messageSender), + _externalOperationHandler(*this, _component, _component, + getMetrics(), getMessageSender(), _component, + _idealStateManager, _operationOwner), _threadPool(threadPool), _initializingIsUp(true), _doneInitializeHandler(doneInitHandler), _doneInitializing(false), - _messageSender(messageSender), _bucketPriorityDb(std::make_unique()), _scanner(std::make_unique(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), _throttlingStarter(std::make_unique(_maintenanceOperationOwner)), diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 6bda3e12ac7..9a74f25fffd 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -12,10 +12,10 @@ #include "pendingmessagetracker.h" #include "statusreporterdelegate.h" #include -#include #include #include #include +#include #include #include #include @@ -26,16 +26,17 @@ namespace storage { struct DoneInitializeHandler; class HostInfo; + class NodeIdentity; } namespace storage::distributor { -class DistributorBucketSpaceRepo; -class SimpleMaintenanceScanner; class BlockingOperationStarter; -class ThrottlingOperationStarter; class BucketPriorityDatabase; +class DistributorBucketSpaceRepo; class OwnershipTransferSafeTimePointCalculator; +class SimpleMaintenanceScanner; +class ThrottlingOperationStarter; class Distributor : public StorageLink, public DistributorInterface, @@ -47,6 +48,7 @@ class Distributor : public StorageLink, { public: Distributor(DistributorComponentRegister&, + const NodeIdentity& node_identity, framework::TickingThreadPool&, DoneInitializeHandler&, bool manageActiveBucketCopies, @@ -260,13 +262,12 @@ private: lib::ClusterStateBundle _clusterStateBundle; - DistributorComponentRegister& _compReg; - storage::DistributorComponent _component; std::unique_ptr _bucketSpaceRepo; // Read-only bucket space repo with DBs that only contain buckets transiently // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo // and the DBs are empty during non-transition phases. std::unique_ptr _readOnlyBucketSpaceRepo; + storage::distributor::DistributorComponent _component; std::shared_ptr _metrics; OperationOwner _operationOwner; @@ -277,6 +278,7 @@ private: StatusReporterDelegate _distributorStatusDelegate; StatusReporterDelegate _bucketDBStatusDelegate; IdealStateManager _idealStateManager; + ChainedMessageSender* _messageSender; ExternalOperationHandler _externalOperationHandler; std::shared_ptr _distribution; @@ -307,7 +309,6 @@ private: DoneInitializeHandler& _doneInitializeHandler; bool _doneInitializing; - ChainedMessageSender* _messageSender; std::unique_ptr _bucketPriorityDb; std::unique_ptr _scanner; diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index 9b2c46f0071..719779bdf0d 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -3,6 +3,7 @@ #pragma once #include "bucketownership.h" +#include "operation_routing_snapshot.h" #include #include #include @@ -15,6 +16,7 @@ namespace storage { class DistributorConfiguration; } namespace storage::distributor { class DistributorBucketSpaceRepo; +class PendingMessageTracker; /** * Interface with functionality that is used when handling distributor operations. @@ -31,11 +33,17 @@ public: uint32_t update_flags = 0) = 0; virtual void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) = 0; virtual const DistributorBucketSpaceRepo& bucket_space_repo() const = 0; + virtual DistributorBucketSpaceRepo& bucket_space_repo() = 0; + virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const = 0; + virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() = 0; + virtual document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& docId) const = 0; + virtual const DistributorConfiguration& distributor_config() const = 0; virtual void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space, const BucketDatabase::Entry& entry, uint8_t pri) = 0; - virtual const DistributorConfiguration& distributor_config() const = 0; + virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket& bucket) const = 0; + virtual PendingMessageTracker& pending_message_tracker() = 0; virtual bool has_pending_message(uint16_t node_index, const document::Bucket& bucket, uint32_t message_type) const = 0; diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index d7fffaaf271..e092aeeee4d 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -169,13 +169,31 @@ public: const DistributorBucketSpaceRepo& bucket_space_repo() const override { return getBucketSpaceRepo(); } + DistributorBucketSpaceRepo& bucket_space_repo() override { + return getBucketSpaceRepo(); + } + const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const override { + return getReadOnlyBucketSpaceRepo(); + } + DistributorBucketSpaceRepo& read_only_bucket_space_repo() override { + return getReadOnlyBucketSpaceRepo(); + } + document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& docId) const override { + return getBucketId(docId); + } + const DistributorConfiguration& distributor_config() const override { + return getDistributor().getConfig(); + } void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space, const BucketDatabase::Entry& entry, uint8_t pri) override { getDistributor().checkBucketForSplit(bucket_space, entry, pri); } - const DistributorConfiguration& distributor_config() const override { - return getDistributor().getConfig(); + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket& bucket) const override { + return getDistributor().read_snapshot_for_bucket(bucket); + } + PendingMessageTracker& pending_message_tracker() override { + return getDistributor().getPendingMessageTracker(); } bool has_pending_message(uint16_t node_index, const document::Bucket& bucket, diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index b9eaebb0bd1..3d464579824 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -54,18 +54,24 @@ public: }; ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, - DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, + DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DistributorMetricSet& metrics, + ChainedMessageSender& msg_sender, + DocumentSelectionParser& parser, const MaintenanceOperationGenerator& gen, - OperationOwner& operation_owner, - DistributorComponentRegister& compReg) - : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"), + OperationOwner& operation_owner) + : _node_ctx(node_ctx), + _op_ctx(op_ctx), + _metrics(metrics), + _msg_sender(msg_sender), + _parser(parser), _direct_dispatch_sender(std::make_unique(owner)), _operationGenerator(gen), _rejectFeedBeforeTimeReached(), // At epoch _distributor_operation_owner(operation_owner), _non_main_thread_ops_mutex(), - _non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()), + _non_main_thread_ops_owner(*_direct_dispatch_sender, _node_ctx.clock()), _concurrent_gets_enabled(false), _use_weak_internal_read_consistency_for_gets(false) { @@ -103,11 +109,11 @@ ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime) bool ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd) { - const auto now = TimePoint(std::chrono::seconds(getClock().getTimeInSeconds().getTime())); + const auto now = TimePoint(std::chrono::seconds(_node_ctx.clock().getTimeInSeconds().getTime())); if (now < _rejectFeedBeforeTimeReached) { api::StorageReply::UP reply(cmd.makeReply()); reply->setResult(makeSafeTimeRejectionResult(now)); - sendUp(std::shared_ptr(reply.release())); + _msg_sender.sendUp(std::shared_ptr(reply.release())); return false; } return true; @@ -116,7 +122,7 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd) void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result) { api::StorageReply::UP reply(cmd.makeReply()); reply->setResult(result); - sendUp(std::shared_ptr(reply.release())); + _msg_sender.sendUp(std::shared_ptr(reply.release())); } void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd, @@ -141,7 +147,7 @@ void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageComman } void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { - const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + const auto& cluster_state = _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).getClusterState(); bounce_with_wrong_distribution(cmd, cluster_state); } @@ -157,7 +163,7 @@ void ExternalOperationHandler::bounce_with_busy_during_state_transition( api::StorageReply::UP reply(cmd.makeReply()); api::ReturnCode ret(api::ReturnCode::BUSY, status_str); reply->setResult(ret); - sendUp(std::shared_ptr(reply.release())); + _msg_sender.sendUp(std::shared_ptr(reply.release())); } bool @@ -165,7 +171,7 @@ ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageComman const document::BucketId &bucketId, PersistenceOperationMetricSet& persistenceMetrics) { - auto &bucket_space(_bucketSpaceRepo.get(cmd.getBucket().getBucketSpace())); + auto &bucket_space(_op_ctx.bucket_space_repo().get(cmd.getBucket().getBucketSpace())); if (!bucket_space.owns_bucket_in_current_state(bucketId)) { document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId); LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution", @@ -179,7 +185,7 @@ ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageComman if (!pending.isOwned()) { // We return BUSY here instead of WrongDistributionReply to avoid clients potentially // ping-ponging between cluster state versions during a state transition. - auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + auto& current_state = _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).getClusterState(); auto& pending_state = pending.getNonOwnedState(); bounce_with_busy_during_state_transition(cmd, current_state, pending_state); return false; @@ -207,7 +213,7 @@ ExternalOperationHandler::makeConcurrentMutationRejectionReply(api::StorageComma } bool ExternalOperationHandler::allowMutation(const SequencingHandle& handle) const { - const auto& config(getDistributor().getConfig()); + const auto& config(_op_ctx.distributor_config()); if (!config.getSequenceMutatingOperations()) { // Sequencing explicitly disabled, so always allow. return true; @@ -222,7 +228,7 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op( PersistenceOperationMetricSet& metrics, Func func) { - auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace())); + auto &bucket_space(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace())); if (!bucket_space.owns_bucket_in_current_state(bucket.getBucketId())) { LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution", cmd.toString().c_str(), bucket.toString().c_str()); @@ -233,12 +239,12 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op( auto pending = bucket_space.check_ownership_in_pending_state(bucket.getBucketId()); if (pending.isOwned()) { - func(_bucketSpaceRepo); + func(_op_ctx.bucket_space_repo()); } else { - if (getDistributor().getConfig().allowStaleReadsDuringClusterStateTransitions()) { - func(_readOnlyBucketSpaceRepo); + if (_op_ctx.distributor_config().allowStaleReadsDuringClusterStateTransitions()) { + func(_op_ctx.read_only_bucket_space_repo()); } else { - auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + auto& current_state = _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).getClusterState(); auto& pending_state = pending.getNonOwnedState(); bounce_with_busy_during_state_transition(cmd, current_state, pending_state); } @@ -256,12 +262,12 @@ bool put_is_allowed_through_bucket_lock(const api::PutCommand& cmd) { bool ExternalOperationHandler::onPut(const std::shared_ptr& cmd) { auto& metrics = getMetrics().puts; - if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) { + if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) { return true; } if (cmd->getTimestamp() == 0) { - cmd->setTimestamp(getUniqueTimestamp()); + cmd->setTimestamp(_op_ctx.generate_unique_timestamp()); } const auto bucket_space = cmd->getBucket().getBucketSpace(); @@ -279,11 +285,11 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr& cmd } } if (allow) { - _op = std::make_shared(*this, *this, - _bucketSpaceRepo.get(bucket_space), + _op = std::make_shared(_node_ctx, _op_ctx, + _op_ctx.bucket_space_repo().get(bucket_space), std::move(cmd), getMetrics().puts, std::move(handle)); } else { - sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); + _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); } return true; @@ -292,21 +298,21 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr& cmd bool ExternalOperationHandler::onUpdate(const std::shared_ptr& cmd) { auto& metrics = getMetrics().updates; - if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) { + if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) { return true; } if (cmd->getTimestamp() == 0) { - cmd->setTimestamp(getUniqueTimestamp()); + cmd->setTimestamp(_op_ctx.generate_unique_timestamp()); } const auto bucket_space = cmd->getBucket().getBucketSpace(); auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId()); if (allowMutation(handle)) { - _op = std::make_shared(*this, *this, *this, - _bucketSpaceRepo.get(bucket_space), + _op = std::make_shared(_node_ctx, _op_ctx, _parser, + _op_ctx.bucket_space_repo().get(bucket_space), std::move(cmd), getMetrics(), std::move(handle)); } else { - sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); + _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); } return true; @@ -315,22 +321,22 @@ bool ExternalOperationHandler::onUpdate(const std::shared_ptr& cmd) { auto& metrics = getMetrics().removes; - if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) { + if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) { return true; } if (cmd->getTimestamp() == 0) { - cmd->setTimestamp(getUniqueTimestamp()); + cmd->setTimestamp(_op_ctx.generate_unique_timestamp()); } const auto bucket_space = cmd->getBucket().getBucketSpace(); auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId()); if (allowMutation(handle)) { - auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket_space)); + auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket_space)); - _op = std::make_shared(*this, *this, distributorBucketSpace, std::move(cmd), + _op = std::make_shared(_node_ctx, _op_ctx, distributorBucketSpace, std::move(cmd), getMetrics().removes, std::move(handle)); } else { - sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); + _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); } return true; @@ -338,7 +344,7 @@ bool ExternalOperationHandler::onRemove(const std::shared_ptr& cmd) { document::BucketId bid; - RemoveLocationOperation::getBucketId(*this, *this, *cmd, bid); + RemoveLocationOperation::getBucketId(_node_ctx, _parser, *cmd, bid); document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid); auto& metrics = getMetrics().removelocations; @@ -346,8 +352,8 @@ bool ExternalOperationHandler::onRemoveLocation(const std::shared_ptr(*this, *this, *this, - _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + _op = std::make_shared(_node_ctx, _op_ctx, _parser, + _op_ctx.bucket_space_repo().get(cmd->getBucket().getBucketSpace()), std::move(cmd), getMetrics().removelocations); return true; } @@ -359,9 +365,9 @@ api::InternalReadConsistency ExternalOperationHandler::desired_get_read_consiste } std::shared_ptr ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr& cmd) { - document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); + document::Bucket bucket(cmd->getBucket().getBucketSpace(), _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId())); auto& metrics = getMetrics().gets; - auto snapshot = getDistributor().read_snapshot_for_bucket(bucket); + auto snapshot = _op_ctx.read_snapshot_for_bucket(bucket); if (!snapshot.is_routable()) { const auto& ctx = snapshot.context(); if (ctx.has_pending_state_transition()) { @@ -376,7 +382,7 @@ std::shared_ptr ExternalOperationHandler::try_generate_get_operation( // The snapshot is aware of whether stale reads are enabled, so we don't have to check that here. const auto* space_repo = snapshot.bucket_space_repo(); assert(space_repo != nullptr); - return std::make_shared(*this, space_repo->get(bucket.getBucketSpace()), + return std::make_shared(_node_ctx, space_repo->get(bucket.getBucketSpace()), snapshot.steal_read_guard(), cmd, metrics, desired_get_read_consistency()); } @@ -400,21 +406,21 @@ bool ExternalOperationHandler::onGetBucketList(const std::shared_ptrgetBucket(), metrics, [&](auto& bucket_space_repo) { auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); auto& bucket_database = bucket_space.getBucketDatabase(); - _op = std::make_shared(bucket_database, _operationGenerator, getIndex(), cmd); + _op = std::make_shared(bucket_database, _operationGenerator, _node_ctx.node_index(), cmd); }); return true; } bool ExternalOperationHandler::onCreateVisitor(const std::shared_ptr& cmd) { // TODO same handling as Gets (VisitorOperation needs to change) - const DistributorConfiguration& config(getDistributor().getConfig()); + const auto& config(_op_ctx.distributor_config()); VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor()); - auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - auto visit_op = std::make_shared(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits); + auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(cmd->getBucket().getBucketSpace())); + auto visit_op = std::make_shared(_node_ctx, _op_ctx, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits); if (visit_op->is_read_for_write()) { _op = std::make_shared(std::move(visit_op), _operation_sequencer, _distributor_operation_owner, - getDistributor().getPendingMessageTracker()); + _op_ctx.pending_message_tracker()); } else { _op = std::move(visit_op); } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 00dc9a49d74..6e91f31db9c 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -25,8 +25,7 @@ class MaintenanceOperationGenerator; class DirectDispatchSender; class OperationOwner; -class ExternalOperationHandler : public DistributorComponent, - public api::MessageHandler +class ExternalOperationHandler : public api::MessageHandler { public: using Clock = std::chrono::system_clock; @@ -42,11 +41,13 @@ public: bool onGetBucketList(const std::shared_ptr&) override; ExternalOperationHandler(Distributor& owner, - DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, + DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DistributorMetricSet& metrics, + ChainedMessageSender& msg_sender, + DocumentSelectionParser& parser, const MaintenanceOperationGenerator& gen, - OperationOwner& operation_owner, - DistributorComponentRegister& compReg); + OperationOwner& operation_owner); ~ExternalOperationHandler() override; @@ -84,6 +85,11 @@ public: } private: + DistributorNodeContext& _node_ctx; + DistributorOperationContext& _op_ctx; + DistributorMetricSet& _metrics; + ChainedMessageSender& _msg_sender; + DocumentSelectionParser& _parser; std::unique_ptr _direct_dispatch_sender; const MaintenanceOperationGenerator& _operationGenerator; OperationSequencer _operation_sequencer; @@ -124,7 +130,7 @@ private: api::InternalReadConsistency desired_get_read_consistency() const noexcept; - DistributorMetricSet& getMetrics() { return getDistributor().getMetrics(); } + DistributorMetricSet& getMetrics() { return _metrics; } }; } diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index e6bee3248d4..37174903697 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -108,7 +108,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder) // manager, which is safe since the lifetime of said state manager // extends to the end of the process. builder.add(std::make_unique - (dcr, *_threadPool, getDoneInitializeHandler(), + (dcr, *_node_identity, *_threadPool, getDoneInitializeHandler(), _manageActiveBucketCopies, stateManager->getHostInfo())); diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 120965cc4fe..74052932853 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -1,21 +1,22 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "config_logging.h" -#include "storagenode.h" #include "communicationmanager.h" +#include "config_logging.h" #include "statemanager.h" #include "statereporter.h" #include "storagemetricsset.h" +#include "storagenode.h" #include "storagenodecontext.h" -#include -#include +#include +#include #include #include +#include +#include #include #include #include -#include #include #include @@ -96,6 +97,7 @@ StorageNode::StorageNode( _newDoctypesConfig(), _newBucketSpacesConfig(), _component(), + _node_identity(), _configUri(configUri), _communicationManager(nullptr), _chain_builder(std::make_unique()) @@ -143,6 +145,7 @@ StorageNode::initialize() _context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory()); _context.getComponentRegister().setDistribution(make_shared(*_distributionConfig)); _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig); + _node_identity = std::make_unique(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex); _metrics = std::make_shared(); _component = std::make_unique(_context.getComponentRegister(), "storagenode"); diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 259f48f3492..44b87f4b61c 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -30,21 +30,22 @@ namespace document { class DocumentTypeRepo; } namespace storage { -class StatusMetricConsumer; -class StateReporter; +class ApplicationGenerationFetcher; class CommunicationManager; class FileStorManager; class HostInfo; -class StateManager; +class IStorageChainBuilder; class MemoryStatusViewer; +class NodeIdentity; +class StateManager; +class StateReporter; +class StatusMetricConsumer; class StatusWebServer; +class StorageComponent; class StorageLink; struct DeadLockDetector; struct StorageMetricSet; struct StorageNodeContext; -class ApplicationGenerationFetcher; -class IStorageChainBuilder; -class StorageComponent; namespace lib { class NodeType; } @@ -156,6 +157,7 @@ protected: std::unique_ptr _newDoctypesConfig; std::unique_ptr _newBucketSpacesConfig; std::unique_ptr _component; + std::unique_ptr _node_identity; config::ConfigUri _configUri; CommunicationManager* _communicationManager; private: -- cgit v1.2.3