summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-04-28 13:36:49 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-04-28 13:36:49 +0000
commit10f5198860792c503ad35bfe3374a60fe31b467d (patch)
treead5452fa71cada372dbcdbcf3cdaf0cce8cf8750 /storage
parent1cc5ceecb27d8f6df6e3dcc8cb3ac327bc2407a9 (diff)
Remove unused top-level code that is handled per stripe instead.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp124
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h62
2 files changed, 1 insertions, 185 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index f328b599604..90d3d24c240 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -40,12 +40,9 @@ BucketDBUpdater::BucketDBUpdater(DistributorStripeInterface& owner, // FIXME STR
_node_ctx(_distributor_component),
_op_ctx(_distributor_component),
_distributor_interface(_distributor_component.getDistributor()),
- _delayed_requests(),
- _sent_messages(),
_pending_cluster_state(),
_history(),
_sender(sender),
- _enqueued_rechecks(),
_outdated_nodes_map(),
_transition_timer(_node_ctx.clock()),
_stale_reads_enabled(false)
@@ -77,21 +74,13 @@ BucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distri
// ... need to take a guard if so, so can probably not be done at ctor time..?
}
-// TODO STRIPE what to do with merge guards...
// FIXME what about bucket DB replica update timestamp allocations?! Replace with u64 counter..?
// Must at the very least ensure we use stripe-local TS generation for DB inserts...! i.e. no global TS
// Or do we have to touch these at all here? Just defer all this via stripe interface?
void
BucketDBUpdater::flush()
{
- for (auto & entry : _sent_messages) {
- // Cannot sendDown MergeBucketReplies during flushing, since
- // all lower links have been closed
- if (entry.second._mergeReplyGuard) {
- entry.second._mergeReplyGuard->resetReply();
- }
- }
- _sent_messages.clear();
+ // TODO STRIPE: Consider if this must flush_and_close() all stripes
}
void
@@ -114,33 +103,6 @@ BucketDBUpdater::has_pending_cluster_state() const
}
void
-BucketDBUpdater::send_request_bucket_info(
- uint16_t node,
- const document::Bucket& bucket,
- const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
-{
- if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) {
- return;
- }
-
- std::vector<document::BucketId> buckets;
- buckets.push_back(bucket.getBucketId());
-
- auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets);
-
- LOG(debug, "Sending request bucket info command %" PRIu64 " for bucket %s to node %u",
- msg->getMsgId(), bucket.toString().c_str(), node);
-
- msg->setPriority(50);
- msg->setAddress(_node_ctx.node_address(node));
-
- _sent_messages[msg->getMsgId()] =
- BucketRequest(node, _op_ctx.generate_unique_timestamp(),
- bucket, mergeReplyGuard);
- _sender.sendCommand(msg);
-}
-
-void
BucketDBUpdater::remove_superfluous_buckets(
StripeAccessGuard& guard,
const lib::ClusterStateBundle& new_state,
@@ -339,51 +301,6 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa
return false;
}
-// TODO remove entirely from this abstraction level?
-BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
-{
- if (_reply) {
- _distributor_interface.handleCompletedMerge(_reply);
- }
-}
-
-bool
-BucketDBUpdater::onMergeBucketReply(
- const std::shared_ptr<api::MergeBucketReply>& reply)
-{
- auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply);
-
- // In case the merge was unsuccessful somehow, or some nodes weren't
- // actually merged (source-only nodes?) we request the bucket info of the
- // bucket again to make sure it's ok.
- for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
- send_request_bucket_info(reply->getNodes()[i].index,
- reply->getBucket(),
- replyGuard);
- }
-
- return true;
-}
-
-void
-BucketDBUpdater::send_all_queued_bucket_rechecks()
-{
- LOG(spam, "Sending %zu queued bucket rechecks previously received "
- "via NotifyBucketChange commands",
- _enqueued_rechecks.size());
-
- for (const auto & entry :_enqueued_rechecks) {
- send_request_bucket_info(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
- }
- _enqueued_rechecks.clear();
-}
-
-bool sort_pred(const BucketListMerger::BucketEntry& left,
- const BucketListMerger::BucketEntry& right)
-{
- return left.first < right.first;
-}
-
bool
BucketDBUpdater::onRequestBucketInfoReply(
const std::shared_ptr<api::RequestBucketInfoReply>& repl)
@@ -418,17 +335,6 @@ BucketDBUpdater::resend_delayed_messages()
if (_pending_cluster_state) {
_pending_cluster_state->resendDelayedMessages();
}
- if (_delayed_requests.empty()) {
- return; // Don't fetch time if not needed
- }
- framework::MilliSecTime currentTime(_node_ctx.clock());
- while (!_delayed_requests.empty()
- && currentTime >= _delayed_requests.front().first)
- {
- BucketRequest& req(_delayed_requests.front().second);
- send_request_bucket_info(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
- _delayed_requests.pop_front();
- }
}
bool
@@ -488,7 +394,6 @@ BucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard)
_pending_cluster_state.reset();
_outdated_nodes_map.clear();
guard.clear_pending_cluster_state_bundle();
- send_all_queued_bucket_rechecks();
complete_transition_timer();
guard.clear_read_only_bucket_repo_databases();
@@ -547,21 +452,6 @@ const vespalib::string BUCKETDB_UPDATER = "Bucket Database Updater";
}
-void
-BucketDBUpdater::BucketRequest::print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const
-{
- using namespace vespalib::xml;
- xos << XmlTag("storagenode")
- << XmlAttribute("index", targetNode);
- xos << XmlAttribute("bucketspace", bucket.getBucketSpace().getId(), XmlAttribute::HEX);
- if (bucket.getBucketId().getRawId() == 0) {
- xos << XmlAttribute("bucket", ALL);
- } else {
- xos << XmlAttribute("bucket", bucket.getBucketId().getId(), XmlAttribute::HEX);
- }
- xos << timestampAttribute << XmlEndTag();
-}
-
bool
BucketDBUpdater::reportStatus(std::ostream& out,
const framework::HttpUrlPath& path) const
@@ -599,18 +489,6 @@ BucketDBUpdater::report_xml_status(vespalib::xml::XmlOutputStream& xos,
<< XmlAttribute("processingtime", i->_processingTime)
<< XmlEndTag();
}
- xos << XmlEndTag()
- << XmlTag("single_bucket_requests");
- for (const auto & entry : _sent_messages)
- {
- entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp));
- }
- xos << XmlEndTag()
- << XmlTag("delayed_single_bucket_requests");
- for (const auto & entry : _delayed_requests)
- {
- entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime()));
- }
xos << XmlEndTag() << XmlEndTag();
return "";
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 09621c64bf8..04962e3af9b 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -48,7 +48,6 @@ public:
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
- bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
@@ -71,61 +70,6 @@ public:
}
private:
- class MergeReplyGuard {
- public:
- MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
- : _distributor_interface(distributor_interface), _reply(reply) {}
-
- ~MergeReplyGuard();
-
- // Used when we're flushing and simply want to drop the reply rather
- // than send it down
- void resetReply() { _reply.reset(); }
- private:
- DistributorStripeInterface& _distributor_interface;
- std::shared_ptr<api::MergeBucketReply> _reply;
- };
-
- struct BucketRequest {
- BucketRequest()
- : targetNode(0), bucket(), timestamp(0) {};
-
- BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b,
- const std::shared_ptr<MergeReplyGuard>& guard)
- : targetNode(t),
- bucket(b),
- timestamp(currentTime),
- _mergeReplyGuard(guard) {};
-
- void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const;
- uint16_t targetNode;
- document::Bucket bucket;
- uint64_t timestamp;
-
- std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
- };
-
- struct EnqueuedBucketRecheck {
- uint16_t node;
- document::Bucket bucket;
-
- EnqueuedBucketRecheck() : node(0), bucket() {}
-
- EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket)
- : node(_node),
- bucket(_bucket)
- {}
-
- bool operator<(const EnqueuedBucketRecheck& o) const {
- if (node != o.node) {
- return node < o.node;
- }
- return bucket < o.bucket;
- }
- bool operator==(const EnqueuedBucketRecheck& o) const {
- return node == o.node && bucket == o.bucket;
- }
- };
friend class DistributorTestUtil;
// Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
@@ -139,8 +83,6 @@ private:
bool is_pending_cluster_state_completed() const;
void process_completed_pending_cluster_state(StripeAccessGuard& guard);
void activate_pending_cluster_state(StripeAccessGuard& guard);
- void send_request_bucket_info(uint16_t node, const document::Bucket& bucket,
- const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard);
void ensure_transition_timer_started();
void complete_transition_timer();
@@ -155,7 +97,6 @@ private:
void enable_current_cluster_state_bundle_in_distributor_and_stripes(StripeAccessGuard& guard);
void add_current_state_to_cluster_state_history();
- void send_all_queued_bucket_rechecks();
void propagate_active_state_bundle_internally();
@@ -172,12 +113,9 @@ private:
const DistributorNodeContext& _node_ctx;
DistributorStripeOperationContext& _op_ctx;
DistributorStripeInterface& _distributor_interface;
- std::deque<std::pair<framework::MilliSecTime, BucketRequest>> _delayed_requests;
- std::map<uint64_t, BucketRequest> _sent_messages;
std::unique_ptr<PendingClusterState> _pending_cluster_state;
std::list<PendingClusterState::Summary> _history;
DistributorMessageSender& _sender;
- std::set<EnqueuedBucketRecheck> _enqueued_rechecks;
OutdatedNodesMap _outdated_nodes_map;
framework::MilliSecTimer _transition_timer;
std::atomic<bool> _stale_reads_enabled;