diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-03-22 14:15:16 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-03-22 15:05:38 +0000 |
commit | be8d412fedaa421b4433e4262bd5c6b5cf36233d (patch) | |
tree | 410983ce311a14b62080b2ef8ba140e045ce9ec0 /storage/src | |
parent | ad0c199d5907794aae0e5b11068efcf338ee0c6d (diff) |
Remove functions from DistributorStripeComponent that are part of DistributorOperationContext interface.
Diffstat (limited to 'storage/src')
17 files changed, 116 insertions, 134 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 61c74a263cf..6abe594388f 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -127,9 +127,9 @@ struct DistributorTest : Test, DistributorTestUtil { uint32_t flags(DatabaseUpdate::CREATE_IF_NONEXISTING | (resetTrusted ? DatabaseUpdate::RESET_TRUSTED : 0)); - distributor_component().updateBucketDatabase(makeDocumentBucket(document::BucketId(16, 1)), - changedNodes, - flags); + operation_context().update_bucket_database(makeDocumentBucket(document::BucketId(16, 1)), + changedNodes, + flags); } std::string retVal = dumpBucket(document::BucketId(16, 1)); @@ -572,8 +572,8 @@ TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state std::vector<BucketCopy> copies; copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); - distributor_component().updateBucketDatabase(makeDocumentBucket(nonOwnedBucket), copies, - DatabaseUpdate::CREATE_IF_NONEXISTING); + operation_context().update_bucket_database(makeDocumentBucket(nonOwnedBucket), copies, + DatabaseUpdate::CREATE_IF_NONEXISTING); EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket)); } @@ -585,8 +585,8 @@ TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_cur std::vector<BucketCopy> copies; copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); - distributor_component().updateBucketDatabase(makeDocumentBucket(bucket), copies, - DatabaseUpdate::CREATE_IF_NONEXISTING); + operation_context().update_bucket_database(makeDocumentBucket(bucket), copies, + DatabaseUpdate::CREATE_IF_NONEXISTING); BucketDatabase::Entry e(getBucket(bucket)); EXPECT_EQ(101234, e->getLastGarbageCollectionTime()); } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 5d7a61692f6..7929cc1c906 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -259,7 +259,7 @@ void DistributorTestUtil::addIdealNodes(const document::BucketId& id) { // TODO STRIPE roundabout way of getting state bundle..! - addIdealNodes(*distributor_component().getClusterStateBundle().getBaselineClusterState(), id); + addIdealNodes(*operation_context().cluster_state_bundle().getBaselineClusterState(), id); } void @@ -357,6 +357,11 @@ DistributorTestUtil::distributor_component() { return _distributor->distributor_component(); } +storage::distributor::DistributorOperationContext& +DistributorTestUtil::operation_context() { + return _distributor->distributor_component(); +} + bool DistributorTestUtil::tick() { framework::ThreadWaitInfo res( diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 809a87d3bb7..d3c0445d5b5 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -21,10 +21,11 @@ class BucketDBUpdater; class Distributor; class DistributorBucketSpace; class DistributorBucketSpaceRepo; -class DistributorStripeComponent; +class DistributorOperationContext; class DistributorStripe; -class IdealStateManager; +class DistributorStripeComponent; class ExternalOperationHandler; +class IdealStateManager; class Operation; // TODO STRIPE rename to DistributorStripeTestUtil? @@ -115,6 +116,7 @@ public: IdealStateManager& getIdealStateManager(); ExternalOperationHandler& getExternalOperationHandler(); storage::distributor::DistributorStripeComponent& distributor_component(); + storage::distributor::DistributorOperationContext& operation_context(); Distributor& getDistributor() { return *_distributor; diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 1829808990a..6d8086696b8 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -99,19 +99,19 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "16"); EXPECT_EQ(document::BucketId(16, 0xffff), - distributor_component().getBucketId(document::DocumentId( + operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0xffff)) ).stripUnused()); EXPECT_EQ(document::BucketId(16, 0), - distributor_component().getBucketId(document::DocumentId( + operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x10000)) ).stripUnused()); EXPECT_EQ(document::BucketId(16, 0xffff), - distributor_component().getBucketId(document::DocumentId( + operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0xffff)) ).stripUnused()); EXPECT_EQ(document::BucketId(16, 0x100), - distributor_component().getBucketId(document::DocumentId( + operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x100)) ).stripUnused()); close(); @@ -120,11 +120,11 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "20"); createLinks(); EXPECT_EQ(document::BucketId(20, 0x11111), - distributor_component().getBucketId(document::DocumentId( + operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x111111)) ).stripUnused()); EXPECT_EQ(document::BucketId(20, 0x22222), - distributor_component().getBucketId(document::DocumentId( + operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( vespalib::make_string("id:ns:test:n=%d::", 0x222222)) ).stripUnused()); } diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 1123c354ef4..cb671bb07f5 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -46,7 +46,7 @@ struct GetOperationTest : Test, DistributorTestUtil { createLinks(); docId = document::DocumentId("id:ns:text/html::uri"); - bucketId = distributor_component().getBucketId(docId); + bucketId = operation_context().make_split_bit_constrained_bucket_id(docId); }; void TearDown() override { diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index c510e08ab2a..ffd07ad9d60 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -104,7 +104,7 @@ document::BucketId PutOperationTest::createAndSendSampleDocument(vespalib::duration timeout) { auto doc = std::make_shared<Document>(doc_type(), DocumentId("id:test:testdoctype1::")); - document::BucketId id = distributor_component().getBucketId(doc->getId()); + document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); addIdealNodes(id); auto msg = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), doc, 0); @@ -150,7 +150,7 @@ TEST_F(PutOperationTest, bucket_database_gets_special_entry_when_CreateBucket_se // Database updated before CreateBucket is sent ASSERT_EQ("BucketId(0x4000000000008f09) : " "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false)", - dumpBucket(distributor_component().getBucketId(doc->getId()))); + dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); ASSERT_EQ("Create bucket => 0,Put => 0", _sender.getCommands(true)); } @@ -197,7 +197,7 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck "id:test:testdoctype1::, timestamp 100, size 45) => 1", _sender.getCommands(true, true)); - distributor_component().removeNodeFromDB(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0); + operation_context().remove_node_from_bucket_database(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0); // If we get an ACK from the backend nodes, the operation has been persisted OK. // Even if the bucket has been removed from the DB in the meantime (usually would @@ -249,7 +249,7 @@ TEST_F(PutOperationTest, multiple_copies) { "node(idx=3,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), " "node(idx=2,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), " "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", - dumpBucket(distributor_component().getBucketId(doc->getId()))); + dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); } TEST_F(PutOperationTest, multiple_copies_early_return_primary_required) { @@ -477,7 +477,7 @@ parseBucketInfoString(const std::string& nodeList) { std::string PutOperationTest::getNodes(const std::string& infoString) { Document::SP doc(createDummyDocument("test", "uri")); - document::BucketId bid(distributor_component().getBucketId(doc->getId())); + document::BucketId bid(operation_context().make_split_bit_constrained_bucket_id(doc->getId())); BucketInfo entry = parseBucketInfoString(infoString); @@ -519,7 +519,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ setupDistributor(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); Document::SP doc(createDummyDocument("test", "uri")); - document::BucketId bId = distributor_component().getBucketId(doc->getId()); + document::BucketId bId = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); addNodesToBucketDB(bId, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); @@ -536,14 +536,14 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ ASSERT_EQ("BucketId(0x4000000000000593) : " "node(idx=0,crc=0x7,docs=8/8,bytes=9/9,trusted=true,active=false,ready=false)", - dumpBucket(distributor_component().getBucketId(doc->getId()))); + dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); } TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) { setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "uri"); - auto bucket = distributor_component().getBucketId(doc->getId()); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); sendPut(createPut(doc)); @@ -577,7 +577,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) { setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "test"); - auto bucket = distributor_component().getBucketId(doc->getId()); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); getBucketDBUpdater().onSetSystemState( std::make_shared<api::SetSystemStateCommand>( @@ -597,7 +597,7 @@ TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) { "distributor:1 storage:2 .0.s:r .1.s:r"); Document::SP doc(createDummyDocument("test", "uri")); document::BucketId bucket( - distributor_component().getBucketId(doc->getId())); + operation_context().make_split_bit_constrained_bucket_id(doc->getId())); addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t"); sendPut(createPut(doc)); diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index de76379e854..c4892f342e7 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -24,7 +24,7 @@ struct RemoveOperationTest : Test, DistributorTestUtil { createLinks(); docId = document::DocumentId("id:test:test::uri"); - bucketId = distributor_component().getBucketId(docId); + bucketId = operation_context().make_split_bit_constrained_bucket_id(docId); enableDistributorClusterState("distributor:1 storage:4"); }; diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 58556832f2d..dae94e41b46 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -309,7 +309,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, } update->setCreateIfNonExistent(options._createIfNonExistent); - document::BucketId id = distributor_component().getBucketId(update->getId()); + document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId()); document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId()); if (bucketState.length()) { diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index ea9f0d86ac4..6620cf58571 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -60,7 +60,7 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m document::DocumentId("id:ns:" + _html_type->getName() + "::1")); update->setCreateIfNonExistent(create_if_missing); - _bId = distributor_component().getBucketId(update->getId()); + _bId = operation_context().make_split_bit_constrained_bucket_id(update->getId()); addNodesToBucketDB(_bId, bucketState); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp index 5e9f717fc7f..4aab82ee6f8 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp @@ -41,12 +41,6 @@ DistributorStripeComponent::sendUp(const api::StorageMessage::SP& msg) _distributor.getMessageSender().sendUp(msg); } -const lib::ClusterStateBundle& -DistributorStripeComponent::getClusterStateBundle() const -{ - return _distributor.getClusterStateBundle(); -} - bool DistributorStripeComponent::checkDistribution(api::StorageCommand &cmd, const document::Bucket &bucket) { @@ -178,10 +172,10 @@ UpdateBucketDatabaseProcessor::process_entry(BucketDatabase::Entry &entry) const } void -DistributorStripeComponent::updateBucketDatabase( - const document::Bucket &bucket, - const std::vector<BucketCopy>& changedNodes, - uint32_t updateFlags) +DistributorStripeComponent::update_bucket_database( + const document::Bucket& bucket, + const std::vector<BucketCopy>& changed_nodes, + uint32_t update_flags) { auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); assert(!(bucket.getBucketId() == document::BucketId())); @@ -200,7 +194,7 @@ DistributorStripeComponent::updateBucketDatabase( // bucket database (i.e. copies on nodes that are actually unavailable). const auto& available_nodes = bucketSpace.get_available_nodes(); bool found_down_node = false; - for (const auto& copy : changedNodes) { + for (const auto& copy : changed_nodes) { if (copy.getNode() >= available_nodes.size() || !available_nodes[copy.getNode()]) { found_down_node = true; break; @@ -210,18 +204,18 @@ DistributorStripeComponent::updateBucketDatabase( // bucket copy vector std::vector<BucketCopy> up_nodes; if (found_down_node) { - up_nodes.reserve(changedNodes.size()); - for (uint32_t i = 0; i < changedNodes.size(); ++i) { - const BucketCopy& copy(changedNodes[i]); + up_nodes.reserve(changed_nodes.size()); + for (uint32_t i = 0; i < changed_nodes.size(); ++i) { + const BucketCopy& copy(changed_nodes[i]); if (copy.getNode() < available_nodes.size() && available_nodes[copy.getNode()]) { up_nodes.emplace_back(copy); } } } - UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changedNodes, bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(), (updateFlags & DatabaseUpdate::RESET_TRUSTED) != 0); + UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changed_nodes, bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(), (update_flags & DatabaseUpdate::RESET_TRUSTED) != 0); - bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (updateFlags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0); + bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (update_flags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0); } void @@ -231,24 +225,6 @@ DistributorStripeComponent::recheckBucketInfo(uint16_t nodeIdx, const document:: } document::BucketId -DistributorStripeComponent::getBucketId(const document::DocumentId& docId) const -{ - document::BucketId id(getBucketIdFactory().getBucketId(docId)); - - id.setUsedBits(_distributor.getConfig().getMinimalBucketSplit()); - return id.stripUnused(); -} - -bool -DistributorStripeComponent::storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const -{ - const lib::NodeState& ns = getClusterStateBundle().getDerivedClusterState(bucketSpace)->getNodeState( - lib::Node(lib::NodeType::STORAGE, nodeIndex)); - - return ns.getState().oneOf(_distributor.getStorageNodeUpStates()); -} - -document::BucketId DistributorStripeComponent::getSibling(const document::BucketId& bid) const { document::BucketId zeroBucket; document::BucketId oneBucket; @@ -293,6 +269,15 @@ DistributorStripeComponent::node_address(uint16_t node_index) const noexcept // Implements DistributorOperationContext +document::BucketId +DistributorStripeComponent::make_split_bit_constrained_bucket_id(const document::DocumentId& doc_id) const +{ + document::BucketId id(getBucketIdFactory().getBucketId(doc_id)); + + id.setUsedBits(_distributor.getConfig().getMinimalBucketSplit()); + return id.stripUnused(); +} + bool DistributorStripeComponent::has_pending_message(uint16_t node_index, const document::Bucket& bucket, @@ -302,6 +287,21 @@ DistributorStripeComponent::has_pending_message(uint16_t node_index, return sender.getPendingMessageTracker().hasPendingMessage(node_index, bucket, message_type); } +const lib::ClusterStateBundle& +DistributorStripeComponent::cluster_state_bundle() const +{ + return _distributor.getClusterStateBundle(); +} + +bool +DistributorStripeComponent::storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const +{ + const lib::NodeState& ns = cluster_state_bundle().getDerivedClusterState(bucket_space)->getNodeState( + lib::Node(lib::NodeType::STORAGE, node_index)); + + return ns.getState().oneOf(_distributor.getStorageNodeUpStates()); +} + std::unique_ptr<document::select::Node> DistributorStripeComponent::parse_selection(const vespalib::string& selection) const { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.h b/storage/src/vespa/storage/distributor/distributor_stripe_component.h index 892d3c8cb62..ae54eacf9fd 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_component.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.h @@ -43,17 +43,6 @@ public: ~DistributorStripeComponent() override; /** - * Returns a reference to the current cluster state bundle. Valid until the - * next time the distributor main thread processes its message queue. - */ - const lib::ClusterStateBundle& getClusterStateBundle() const; - - /** - * Returns true if the given storage node is in an "up state". - */ - bool storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const; - - /** * Verifies that the given command has been received at the * correct distributor based on the current system state. */ @@ -68,46 +57,11 @@ public: const std::vector<uint16_t>& nodes); /** - * Removes a copy from the given bucket from the bucket database. - * If the resulting bucket is empty afterwards, removes the entire - * bucket entry from the bucket database. - */ - void removeNodeFromDB(const document::Bucket &bucket, uint16_t node) { - removeNodesFromDB(bucket, toVector<uint16_t>(node)); - } - - /** - * Adds the given copies to the bucket database. - */ - void updateBucketDatabase( - const document::Bucket &bucket, - const std::vector<BucketCopy>& changedNodes, - uint32_t updateFlags = 0); - - /** - * Simple API for the common case of modifying a single node. - */ - void updateBucketDatabase( - const document::Bucket &bucket, - const BucketCopy& changedNode, - uint32_t updateFlags = 0) - { - updateBucketDatabase(bucket, - toVector<BucketCopy>(changedNode), - updateFlags); - } - - /** * Fetch bucket info about the given bucket from the given node. * Used when we get BUCKET_NOT_FOUND. */ void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket); - /** - * Returns the bucket id corresponding to the given document id. - */ - document::BucketId getBucketId(const document::DocumentId& docId) const; - void sendDown(const api::StorageMessage::SP&); void sendUp(const api::StorageMessage::SP&); @@ -148,34 +102,46 @@ public: // Implements DistributorOperationContext api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); } + + /** + * Simple API for the common case of modifying a single node. + */ void update_bucket_database(const document::Bucket& bucket, const BucketCopy& changed_node, uint32_t update_flags = 0) override { - updateBucketDatabase(bucket, changed_node, update_flags); + update_bucket_database(bucket, + toVector<BucketCopy>(changed_node), + update_flags); } + + /** + * Adds the given copies to the bucket database. + */ virtual void update_bucket_database(const document::Bucket& bucket, const std::vector<BucketCopy>& changed_nodes, - uint32_t update_flags = 0) override { - updateBucketDatabase(bucket, changed_nodes, update_flags); - } + uint32_t update_flags = 0) override; + + /** + * Removes a copy from the given bucket from the bucket database. + * If the resulting bucket is empty afterwards, removes the entire + * bucket entry from the bucket database. + */ void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) override { - removeNodeFromDB(bucket, node_index); + removeNodesFromDB(bucket, toVector<uint16_t>(node_index)); } const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override { - return getBucketSpaceRepo(); + return _bucketSpaceRepo; } DistributorBucketSpaceRepo& bucket_space_repo() noexcept override { - return getBucketSpaceRepo(); + return _bucketSpaceRepo; } const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override { - return getReadOnlyBucketSpaceRepo(); + return _readOnlyBucketSpaceRepo; } DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override { - return getReadOnlyBucketSpaceRepo(); - } - document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& docId) const override { - return getBucketId(docId); + return _readOnlyBucketSpaceRepo; } + document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& doc_id) const override; const DistributorConfiguration& distributor_config() const noexcept override { return getDistributor().getConfig(); } @@ -196,12 +162,18 @@ public: const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override { return getDistributor().pendingClusterStateOrNull(bucket_space); } - const lib::ClusterStateBundle& cluster_state_bundle() const override { - return getClusterStateBundle(); - } - bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override { - return storageNodeIsUp(bucket_space, node_index); - } + + /** + * Returns a reference to the current cluster state bundle. Valid until the + * next time the distributor main thread processes its message queue. + */ + const lib::ClusterStateBundle& cluster_state_bundle() const override; + + /** + * Returns true if the given storage node is in an "up state". + */ + bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override; + const char* storage_node_up_states() const override { return getDistributor().getStorageNodeUpStates(); } diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp index a7ea1f9400c..84fef955feb 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp @@ -69,7 +69,7 @@ IdealStateManager::iAmUp() const { Node node(NodeType::DISTRIBUTOR, _distributorComponent.getIndex()); // Assume that derived cluster states agree on distributor node being up - const auto &state = *_distributorComponent.getClusterStateBundle().getBaselineClusterState(); + const auto &state = *operation_context().cluster_state_bundle().getBaselineClusterState(); const lib::State &nodeState = state.getNodeState(node).getState(); const lib::State &clusterState = state.getClusterState(); diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h index 2665d7938ed..e5a38ab5161 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.h +++ b/storage/src/vespa/storage/distributor/idealstatemanager.h @@ -77,7 +77,10 @@ public: getBucketStatus(out); } + // TODO STRIPE stop exposing this DistributorStripeComponent& getDistributorComponent() { return _distributorComponent; } + DistributorOperationContext& operation_context() { return _distributorComponent; } + const DistributorOperationContext& operation_context() const { return _distributorComponent; } DistributorBucketSpaceRepo &getBucketSpaceRepo() { return _bucketSpaceRepo; } const DistributorBucketSpaceRepo &getBucketSpaceRepo() const { return _bucketSpaceRepo; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 71f5693329a..c2f48a9c3d9 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -69,7 +69,7 @@ GarbageCollectionOperation::onReceive(DistributorMessageSender&, void GarbageCollectionOperation::merge_received_bucket_info_into_db() { // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. - _manager->getDistributorComponent().updateBucketDatabase(getBucket(), _replica_info); + _manager->operation_context().update_bucket_database(getBucket(), _replica_info); BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId()); if (dbentry.valid()) { dbentry->setLastGarbageCollectionTime( diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index 44f7b5d3b49..4509e61746a 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -110,7 +110,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& rep.getSourceBuckets()); for (uint32_t i = 0; i < sourceBuckets.size(); i++) { document::Bucket sourceBucket(msg->getBucket().getBucketSpace(), sourceBuckets[i]); - _manager->getDistributorComponent().removeNodeFromDB(sourceBucket, node); + _manager->operation_context().remove_node_from_bucket_database(sourceBucket, node); } // Add new buckets. @@ -118,7 +118,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& LOG(debug, "Invalid bucketinfo for bucket %s returned in join", getBucketId().toString().c_str()); } else { - _manager->getDistributorComponent().updateBucketDatabase( + _manager->operation_context().update_bucket_database( getBucket(), BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(), node, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index 3bd2406cd85..a8eee5caf49 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -81,7 +81,7 @@ RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply vespalib::string(rep->getResult().getMessage()).c_str(), rep->getBucketInfo().toString().c_str()); - _manager->getDistributorComponent().updateBucketDatabase( + _manager->operation_context().update_bucket_database( getBucket(), BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(), node, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 2b3c80f9401..1e66cc174e0 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -101,7 +101,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP // Must reset trusted since otherwise trustedness of inconsistent // copies would be arbitrarily determined by which copy managed // to finish its split first. - _manager->getDistributorComponent().updateBucketDatabase( + _manager->operation_context().update_bucket_database( document::Bucket(msg->getBucket().getBucketSpace(), sinfo.first), copy, (DatabaseUpdate::CREATE_IF_NONEXISTING | DatabaseUpdate::RESET_TRUSTED)); |