diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-04-28 13:36:49 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-04-28 13:36:49 +0000 |
commit | 10f5198860792c503ad35bfe3374a60fe31b467d (patch) | |
tree | ad5452fa71cada372dbcdbcf3cdaf0cce8cf8750 /storage | |
parent | 1cc5ceecb27d8f6df6e3dcc8cb3ac327bc2407a9 (diff) |
Remove unused top-level code that is handled per stripe instead.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/distributor/bucketdbupdater.cpp | 124 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/bucketdbupdater.h | 62 |
2 files changed, 1 insertions, 185 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index f328b599604..90d3d24c240 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -40,12 +40,9 @@ BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner, // FIXME STR _node_ctx(_distributor_component), _op_ctx(_distributor_component), _distributor_interface(_distributor_component.getDistributor()), - _delayed_requests(), - _sent_messages(), _pending_cluster_state(), _history(), _sender(sender), - _enqueued_rechecks(), _outdated_nodes_map(), _transition_timer(_node_ctx.clock()), _stale_reads_enabled(false) @@ -77,21 +74,13 @@ BucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distri // ... need to take a guard if so, so can probably not be done at ctor time..? } -// TODO STRIPE what to do with merge guards... // FIXME what about bucket DB replica update timestamp allocations?! Replace with u64 counter..? // Must at the very least ensure we use stripe-local TS generation for DB inserts...! i.e. no global TS // Or do we have to touch these at all here? Just defer all this via stripe interface? void BucketDBUpdater::flush() { - for (auto & entry : _sent_messages) { - // Cannot sendDown MergeBucketReplies during flushing, since - // all lower links have been closed - if (entry.second._mergeReplyGuard) { - entry.second._mergeReplyGuard->resetReply(); - } - } - _sent_messages.clear(); + // TODO STRIPE: Consider if this must flush_and_close() all stripes } void @@ -114,33 +103,6 @@ BucketDBUpdater::has_pending_cluster_state() const } void -BucketDBUpdater::send_request_bucket_info( - uint16_t node, - const document::Bucket& bucket, - const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) -{ - if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) { - return; - } - - std::vector<document::BucketId> buckets; - buckets.push_back(bucket.getBucketId()); - - auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets); - - LOG(debug, "Sending request bucket info command %" PRIu64 " for bucket %s to node %u", - msg->getMsgId(), bucket.toString().c_str(), node); - - msg->setPriority(50); - msg->setAddress(_node_ctx.node_address(node)); - - _sent_messages[msg->getMsgId()] = - BucketRequest(node, _op_ctx.generate_unique_timestamp(), - bucket, mergeReplyGuard); - _sender.sendCommand(msg); -} - -void BucketDBUpdater::remove_superfluous_buckets( StripeAccessGuard& guard, const lib::ClusterStateBundle& new_state, @@ -339,51 +301,6 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa return false; } -// TODO remove entirely from this abstraction level? -BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard() -{ - if (_reply) { - _distributor_interface.handleCompletedMerge(_reply); - } -} - -bool -BucketDBUpdater::onMergeBucketReply( - const std::shared_ptr<api::MergeBucketReply>& reply) -{ - auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply); - - // In case the merge was unsuccessful somehow, or some nodes weren't - // actually merged (source-only nodes?) we request the bucket info of the - // bucket again to make sure it's ok. - for (uint32_t i = 0; i < reply->getNodes().size(); i++) { - send_request_bucket_info(reply->getNodes()[i].index, - reply->getBucket(), - replyGuard); - } - - return true; -} - -void -BucketDBUpdater::send_all_queued_bucket_rechecks() -{ - LOG(spam, "Sending %zu queued bucket rechecks previously received " - "via NotifyBucketChange commands", - _enqueued_rechecks.size()); - - for (const auto & entry :_enqueued_rechecks) { - send_request_bucket_info(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>()); - } - _enqueued_rechecks.clear(); -} - -bool sort_pred(const BucketListMerger::BucketEntry& left, - const BucketListMerger::BucketEntry& right) -{ - return left.first < right.first; -} - bool BucketDBUpdater::onRequestBucketInfoReply( const std::shared_ptr<api::RequestBucketInfoReply>& repl) @@ -418,17 +335,6 @@ BucketDBUpdater::resend_delayed_messages() if (_pending_cluster_state) { _pending_cluster_state->resendDelayedMessages(); } - if (_delayed_requests.empty()) { - return; // Don't fetch time if not needed - } - framework::MilliSecTime currentTime(_node_ctx.clock()); - while (!_delayed_requests.empty() - && currentTime >= _delayed_requests.front().first) - { - BucketRequest& req(_delayed_requests.front().second); - send_request_bucket_info(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>()); - _delayed_requests.pop_front(); - } } bool @@ -488,7 +394,6 @@ BucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard) _pending_cluster_state.reset(); _outdated_nodes_map.clear(); guard.clear_pending_cluster_state_bundle(); - send_all_queued_bucket_rechecks(); complete_transition_timer(); guard.clear_read_only_bucket_repo_databases(); @@ -547,21 +452,6 @@ const vespalib::string BUCKETDB_UPDATER = "Bucket Database Updater"; } -void -BucketDBUpdater::BucketRequest::print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute ×tampAttribute) const -{ - using namespace vespalib::xml; - xos << XmlTag("storagenode") - << XmlAttribute("index", targetNode); - xos << XmlAttribute("bucketspace", bucket.getBucketSpace().getId(), XmlAttribute::HEX); - if (bucket.getBucketId().getRawId() == 0) { - xos << XmlAttribute("bucket", ALL); - } else { - xos << XmlAttribute("bucket", bucket.getBucketId().getId(), XmlAttribute::HEX); - } - xos << timestampAttribute << XmlEndTag(); -} - bool BucketDBUpdater::reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const @@ -599,18 +489,6 @@ BucketDBUpdater::report_xml_status(vespalib::xml::XmlOutputStream& xos, << XmlAttribute("processingtime", i->_processingTime) << XmlEndTag(); } - xos << XmlEndTag() - << XmlTag("single_bucket_requests"); - for (const auto & entry : _sent_messages) - { - entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp)); - } - xos << XmlEndTag() - << XmlTag("delayed_single_bucket_requests"); - for (const auto & entry : _delayed_requests) - { - entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime())); - } xos << XmlEndTag() << XmlEndTag(); return ""; } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 09621c64bf8..04962e3af9b 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -48,7 +48,6 @@ public: bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override; bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override; bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override; - bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override; vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; @@ -71,61 +70,6 @@ public: } private: - class MergeReplyGuard { - public: - MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept - : _distributor_interface(distributor_interface), _reply(reply) {} - - ~MergeReplyGuard(); - - // Used when we're flushing and simply want to drop the reply rather - // than send it down - void resetReply() { _reply.reset(); } - private: - DistributorStripeInterface& _distributor_interface; - std::shared_ptr<api::MergeBucketReply> _reply; - }; - - struct BucketRequest { - BucketRequest() - : targetNode(0), bucket(), timestamp(0) {}; - - BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b, - const std::shared_ptr<MergeReplyGuard>& guard) - : targetNode(t), - bucket(b), - timestamp(currentTime), - _mergeReplyGuard(guard) {}; - - void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute ×tampAttribute) const; - uint16_t targetNode; - document::Bucket bucket; - uint64_t timestamp; - - std::shared_ptr<MergeReplyGuard> _mergeReplyGuard; - }; - - struct EnqueuedBucketRecheck { - uint16_t node; - document::Bucket bucket; - - EnqueuedBucketRecheck() : node(0), bucket() {} - - EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket) - : node(_node), - bucket(_bucket) - {} - - bool operator<(const EnqueuedBucketRecheck& o) const { - if (node != o.node) { - return node < o.node; - } - return bucket < o.bucket; - } - bool operator==(const EnqueuedBucketRecheck& o) const { - return node == o.node && bucket == o.bucket; - } - }; friend class DistributorTestUtil; // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor @@ -139,8 +83,6 @@ private: bool is_pending_cluster_state_completed() const; void process_completed_pending_cluster_state(StripeAccessGuard& guard); void activate_pending_cluster_state(StripeAccessGuard& guard); - void send_request_bucket_info(uint16_t node, const document::Bucket& bucket, - const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard); void ensure_transition_timer_started(); void complete_transition_timer(); @@ -155,7 +97,6 @@ private: void enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard); void add_current_state_to_cluster_state_history(); - void send_all_queued_bucket_rechecks(); void propagate_active_state_bundle_internally(); @@ -172,12 +113,9 @@ private: const DistributorNodeContext& _node_ctx; DistributorStripeOperationContext& _op_ctx; DistributorStripeInterface& _distributor_interface; - std::deque<std::pair<framework::MilliSecTime, BucketRequest>> _delayed_requests; - std::map<uint64_t, BucketRequest> _sent_messages; std::unique_ptr<PendingClusterState> _pending_cluster_state; std::list<PendingClusterState::Summary> _history; DistributorMessageSender& _sender; - std::set<EnqueuedBucketRecheck> _enqueued_rechecks; OutdatedNodesMap _outdated_nodes_map; framework::MilliSecTimer _transition_timer; std::atomic<bool> _stale_reads_enabled; |