aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp')
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp70
1 files changed, 52 insertions, 18 deletions
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
index e905b493cf4..e0822db6f0c 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "spi_bm_feed_handler.h"
+#include "i_bm_distribution.h"
#include "pending_tracker.h"
#include "bucket_info_queue.h"
#include <vespa/document/fieldset/fieldsetrepo.h>
@@ -31,18 +32,20 @@ void get_bucket_info_loop(PendingTracker &tracker)
class MyOperationComplete : public storage::spi::OperationComplete
{
+ PersistenceProvider* _provider;
std::atomic<uint32_t> &_errors;
Bucket _bucket;
PendingTracker& _tracker;
public:
- MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker);
+ MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker);
~MyOperationComplete() override;
void onComplete(std::unique_ptr<storage::spi::Result> result) override;
void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
};
-MyOperationComplete::MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker)
- : _errors(errors),
+MyOperationComplete::MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker)
+ : _provider(provider),
+ _errors(errors),
_bucket(bucket),
_tracker(tracker)
{
@@ -62,7 +65,7 @@ MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
} else {
auto bucket_info_queue = _tracker.get_bucket_info_queue();
if (bucket_info_queue != nullptr) {
- bucket_info_queue->put_bucket(_bucket);
+ bucket_info_queue->put_bucket(_bucket, _provider);
}
}
}
@@ -75,50 +78,81 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result
}
-SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider, const document::FieldSetRepo &field_set_repo, bool skip_get_spi_bucket_info)
+SpiBmFeedHandler::SpiBmFeedHandler(std::vector<PersistenceProvider* >providers, const document::FieldSetRepo &field_set_repo, const IBmDistribution& distribution, bool skip_get_spi_bucket_info)
: IBmFeedHandler(),
_name(vespalib::string("SpiBmFeedHandler(") + (skip_get_spi_bucket_info ? "skip-get-spi-bucket-info" : "get-spi-bucket-info") + ")"),
- _provider(provider),
+ _providers(std::move(providers)),
_field_set_repo(field_set_repo),
_errors(0u),
- _skip_get_spi_bucket_info(skip_get_spi_bucket_info)
+ _skip_get_spi_bucket_info(skip_get_spi_bucket_info),
+ _distribution(distribution)
{
}
SpiBmFeedHandler::~SpiBmFeedHandler() = default;
+PersistenceProvider*
+SpiBmFeedHandler::get_provider(const document::Bucket& bucket)
+{
+ uint32_t node_idx = _distribution.get_service_layer_node_idx(bucket);
+ if (node_idx >= _providers.size()) {
+ return nullptr;
+ }
+ return _providers[node_idx];
+}
+
void
SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ provider->putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
+ } else {
+ ++_errors;
+ }
}
void
SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ provider->updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
+ } else {
+ ++_errors;
+ }
}
void
SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ provider->removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
+ } else {
+ ++_errors;
+ }
}
void
SpiBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- auto field_set = _field_set_repo.getFieldSet(field_set_string);
- auto result = _provider.get(spi_bucket, *field_set, document_id, context);
- if (result.hasError()) {
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ auto field_set = _field_set_repo.getFieldSet(field_set_string);
+ auto result = provider->get(spi_bucket, *field_set, document_id, context);
+ if (result.hasError()) {
+ ++_errors;
+ }
+ } else {
++_errors;
}
}
@@ -127,7 +161,7 @@ void
SpiBmFeedHandler::attach_bucket_info_queue(PendingTracker& tracker)
{
if (!_skip_get_spi_bucket_info) {
- tracker.attach_bucket_info_queue(_provider, _errors);
+ tracker.attach_bucket_info_queue(_errors);
}
}