diff options
Diffstat (limited to 'searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp')
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp | 70 |
1 files changed, 52 insertions, 18 deletions
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp index e905b493cf4..e0822db6f0c 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "spi_bm_feed_handler.h" +#include "i_bm_distribution.h" #include "pending_tracker.h" #include "bucket_info_queue.h" #include <vespa/document/fieldset/fieldsetrepo.h> @@ -31,18 +32,20 @@ void get_bucket_info_loop(PendingTracker &tracker) class MyOperationComplete : public storage::spi::OperationComplete { + PersistenceProvider* _provider; std::atomic<uint32_t> &_errors; Bucket _bucket; PendingTracker& _tracker; public: - MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker); + MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker); ~MyOperationComplete() override; void onComplete(std::unique_ptr<storage::spi::Result> result) override; void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; }; -MyOperationComplete::MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker) - : _errors(errors), +MyOperationComplete::MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker) + : _provider(provider), + _errors(errors), _bucket(bucket), _tracker(tracker) { @@ -62,7 +65,7 @@ MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) } else { auto bucket_info_queue = _tracker.get_bucket_info_queue(); if (bucket_info_queue != nullptr) { - bucket_info_queue->put_bucket(_bucket); + bucket_info_queue->put_bucket(_bucket, _provider); } } } @@ -75,50 +78,81 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result } -SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider, const document::FieldSetRepo &field_set_repo, bool skip_get_spi_bucket_info) +SpiBmFeedHandler::SpiBmFeedHandler(std::vector<PersistenceProvider* >providers, const document::FieldSetRepo &field_set_repo, const IBmDistribution& distribution, bool skip_get_spi_bucket_info) : IBmFeedHandler(), _name(vespalib::string("SpiBmFeedHandler(") + (skip_get_spi_bucket_info ? "skip-get-spi-bucket-info" : "get-spi-bucket-info") + ")"), - _provider(provider), + _providers(std::move(providers)), _field_set_repo(field_set_repo), _errors(0u), - _skip_get_spi_bucket_info(skip_get_spi_bucket_info) + _skip_get_spi_bucket_info(skip_get_spi_bucket_info), + _distribution(distribution) { } SpiBmFeedHandler::~SpiBmFeedHandler() = default; +PersistenceProvider* +SpiBmFeedHandler::get_provider(const document::Bucket& bucket) +{ + uint32_t node_idx = _distribution.get_service_layer_node_idx(bucket); + if (node_idx >= _providers.size()) { + return nullptr; + } + return _providers[node_idx]; +} + void SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker)); + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + provider->putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker)); + } else { + ++_errors; + } } void SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker)); + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + provider->updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker)); + } else { + ++_errors; + } } void SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker)); + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + provider->removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker)); + } else { + ++_errors; + } } void SpiBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - auto field_set = _field_set_repo.getFieldSet(field_set_string); - auto result = _provider.get(spi_bucket, *field_set, document_id, context); - if (result.hasError()) { + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + auto field_set = _field_set_repo.getFieldSet(field_set_string); + auto result = provider->get(spi_bucket, *field_set, document_id, context); + if (result.hasError()) { + ++_errors; + } + } else { ++_errors; } } @@ -127,7 +161,7 @@ void SpiBmFeedHandler::attach_bucket_info_queue(PendingTracker& tracker) { if (!_skip_get_spi_bucket_info) { - tracker.attach_bucket_info_queue(_provider, _errors); + tracker.attach_bucket_info_queue(_errors); } } |