diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-28 13:47:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-28 13:47:32 +0200 |
commit | cef539adea8b1ad6ba2314722c699a0a412082b7 (patch) | |
tree | afbee578314c1df500874bd23ee36f36e5fe5e86 /searchcore | |
parent | 37d3b000d7a5c80274358ead957bd2d4045a124b (diff) | |
parent | 646120d32e342f8ce25ca72fafd216978ca7412d (diff) |
Merge pull request #14572 from vespa-engine/toregge/pass-storage-api-messages-directly-to-storage-chain
Add option to feed using storage api protocol directly on storage chain.
Diffstat (limited to 'searchcore')
4 files changed, 260 insertions, 12 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index 1d031e2ea30..4ced3fe173b 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_executable(searchcore_vespa_feed_bm_app vespa_feed_bm.cpp spi_bm_feed_handler.cpp storage_api_rpc_bm_feed_handler.cpp + storage_api_chain_bm_feed_handler.cpp OUTPUT_NAME vespa-feed-bm DEPENDS searchcore_server diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp new file mode 100644 index 00000000000..6f1acd10fe4 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp @@ -0,0 +1,185 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_chain_bm_feed_handler.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storage/common/storagelink.h> +#include <vespa/storage/common/storage_chain_builder.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> +#include <cassert> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::StorageLink; + +namespace feedbm { + +namespace { + +std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() { + storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); + auto cmd = std::make_shared<storage::api::SetSystemStateCommand>(bundle); + cmd->setPriority(storage::api::StorageMessage::VERYHIGH); + return cmd; +} + +} + +class BmStorageLink : public StorageLink +{ + std::mutex _mutex; + vespalib::hash_map<uint64_t, PendingTracker *> _pending; +public: + BmStorageLink(); + ~BmStorageLink() override; + bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override; + bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override; + void retain(uint64_t msg_id, PendingTracker &tracker) { + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); + } + bool release(uint64_t msg_id) { + PendingTracker *tracker = nullptr; + { + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + if (itr == _pending.end()) { + return false; + } + tracker = itr->second; + _pending.erase(itr); + } + tracker->release(); + return true; + } +}; + +BmStorageLink::BmStorageLink() + : storage::StorageLink("vespa-bm-feed"), + _mutex(), + _pending() +{ +} + +BmStorageLink::~BmStorageLink() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +bool +BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) +{ + (void) msg; + return false; +} + +bool +BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) +{ + return release(msg->getMsgId()); +} + +struct StorageApiChainBmFeedHandler::Context { + BmStorageLink* bm_link; + Context() + : bm_link(nullptr) + { + } + ~Context() = default; +}; + +class MyStorageChainBuilder : public storage::StorageChainBuilder +{ + using Parent = storage::StorageChainBuilder; + std::shared_ptr<StorageApiChainBmFeedHandler::Context> _context; +public: + MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context); + ~MyStorageChainBuilder() override; + void add(std::unique_ptr<StorageLink> link) override; +}; + +MyStorageChainBuilder::MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context) + : storage::StorageChainBuilder(), + _context(std::move(context)) +{ +} + +MyStorageChainBuilder::~MyStorageChainBuilder() = default; + +void +MyStorageChainBuilder::add(std::unique_ptr<StorageLink> link) +{ + vespalib::string name = link->getName(); + Parent::add(std::move(link)); + if (name == "Communication manager") { + auto my_link = std::make_unique<BmStorageLink>(); + _context->bm_link = my_link.get(); + Parent::add(std::move(my_link)); + } +} + +StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<Context> context) + : IBmFeedHandler(), + _context(std::move(context)) +{ + auto cmd = make_set_cluster_state_cmd(); + PendingTracker tracker(1); + send_msg(std::move(cmd), tracker); + tracker.drain(); +} + +StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default; + +void +StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +{ + cmd->setSourceIndex(0); + auto bm_link = _context->bm_link; + bm_link->retain(cmd->getMsgId(), pending_tracker); + bm_link->sendDown(std::move(cmd)); +} + +void +StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); + send_msg(std::move(cmd), tracker); +} + +std::shared_ptr<StorageApiChainBmFeedHandler::Context> +StorageApiChainBmFeedHandler::get_context() +{ + return std::make_shared<Context>(); +} + +std::unique_ptr<storage::IStorageChainBuilder> +StorageApiChainBmFeedHandler::get_storage_chain_builder(std::shared_ptr<Context> context) +{ + return std::make_unique<MyStorageChainBuilder>(std::move(context)); +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h new file mode 100644 index 00000000000..521deddd19e --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace storage { class IStorageChainBuilder; } +namespace storage::api { class StorageCommand; } + +namespace feedbm { + +/* + * Benchmark feed handler for feed to service layer using storage api protocol + * directly on the storage chain. + */ +class StorageApiChainBmFeedHandler : public IBmFeedHandler +{ +public: + struct Context; +private: + std::shared_ptr<Context> _context; + void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); +public: + StorageApiChainBmFeedHandler(std::shared_ptr<Context> context); + ~StorageApiChainBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + + static std::shared_ptr<Context> get_context(); + static std::unique_ptr<storage::IStorageChainBuilder> get_storage_chain_builder(std::shared_ptr<Context> context); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 4208e9448ff..73d741c3fee 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -3,6 +3,7 @@ #include "pending_tracker.h" #include "spi_bm_feed_handler.h" #include "storage_api_rpc_bm_feed_handler.h" +#include "storage_api_chain_bm_feed_handler.h" #include <vespa/vespalib/testkit/testapp.h> #include <tests/proton/common/dummydbowner.h> #include <vespa/config-imported-fields.h> @@ -56,6 +57,7 @@ #include <vespa/config-slobroks.h> #include <vespa/metrics/config-metricsmanager.h> #include <vespa/storageserver/app/servicelayerprocess.h> +#include <vespa/storage/common/i_storage_chain_builder.h> #include <vespa/storage/storageserver/storagenode.h> #include <vespa/messagebus/config-messagebus.h> #include <vespa/messagebus/testlib/slobrok.h> @@ -122,6 +124,7 @@ using vespalib::makeLambdaTask; using feedbm::IBmFeedHandler; using feedbm::SpiBmFeedHandler; using feedbm::StorageApiRpcBmFeedHandler; +using feedbm::StorageApiChainBmFeedHandler; using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>; @@ -209,6 +212,7 @@ class BMParams { uint32_t _remove_passes; uint32_t _rpc_network_threads; bool _enable_service_layer; + bool _use_storage_chain; uint32_t get_start(uint32_t thread_id) const { return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads); } @@ -220,7 +224,8 @@ public: _update_passes(1), _remove_passes(2), _rpc_network_threads(1), - _enable_service_layer(false) + _enable_service_layer(false), + _use_storage_chain(false) { } BMRange get_range(uint32_t thread_id) const { @@ -233,6 +238,7 @@ public: uint32_t get_remove_passes() const { return _remove_passes; } uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } bool get_enable_service_layer() const { return _enable_service_layer; } + bool get_use_storage_chain() const { return _use_storage_chain; } void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_threads(uint32_t threads_in) { _threads = threads_in; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } @@ -240,6 +246,7 @@ public: void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; } void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } + void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } bool check() const; }; @@ -269,13 +276,13 @@ BMParams::check() const return true; } - class MyServiceLayerProcess : public storage::ServiceLayerProcess { PersistenceProvider& _provider; public: MyServiceLayerProcess(const config::ConfigUri & configUri, - PersistenceProvider &provider); + PersistenceProvider &provider, + std::unique_ptr<storage::IStorageChainBuilder> chain_builder); ~MyServiceLayerProcess() override { shutdown(); } void shutdown() override; @@ -284,10 +291,14 @@ public: }; MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri, - PersistenceProvider &provider) + PersistenceProvider &provider, + std::unique_ptr<storage::IStorageChainBuilder> chain_builder) : ServiceLayerProcess(configUri), _provider(provider) { + if (chain_builder) { + set_storage_chain_builder(std::move(chain_builder)); + } } void @@ -476,7 +487,7 @@ struct PersistenceProviderFixture { std::unique_ptr<Document> make_document(uint32_t i) const; std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const; void create_buckets(); - void start_service_layer(); + void start_service_layer(bool use_storage_chain); void shutdown_service_layer(); }; @@ -615,14 +626,21 @@ PersistenceProviderFixture::create_buckets() } void -PersistenceProviderFixture::start_service_layer() +PersistenceProviderFixture::start_service_layer(bool use_storage_chain) { LOG(info, "start slobrok"); _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); LOG(info, "start service layer"); config::ConfigUri config_uri("bm-servicelayer", _config_context); + std::unique_ptr<storage::IStorageChainBuilder> chain_builder; + std::shared_ptr<StorageApiChainBmFeedHandler::Context> context; + if (use_storage_chain) { + context = StorageApiChainBmFeedHandler::get_context(); + chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(context); + } _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, - *_persistence_engine); + *_persistence_engine, + std::move(chain_builder)); _service_layer->setupConfig(100ms); _service_layer->createNode(); _service_layer->getNode().waitUntilInitialized(); @@ -630,7 +648,11 @@ PersistenceProviderFixture::start_service_layer() config::ConfigUri client_config_uri("bm-rpc-client", _config_context); _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100); _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); + if (use_storage_chain) { + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(context)); + } else { + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); + } } void @@ -829,7 +851,7 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "create %u buckets", f.num_buckets()); f.create_buckets(); if (bm_params.get_enable_service_layer()) { - f.start_service_layer(); + f.start_service_layer(bm_params.get_use_storage_chain()); } vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024); auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put"); @@ -881,7 +903,8 @@ App::usage() "[--update-passes update-passes]\n" "[--remove-passes remove-passes]\n" "[--rpc-network-threads threads]\n" - "[--enable-service-layer]" << std::endl; + "[--enable-service-layer]\n" + "[--use-storage-chain]" << std::endl; } bool @@ -897,7 +920,8 @@ App::get_options() { "update-passes", 1, nullptr, 0 }, { "remove-passes", 1, nullptr, 0 }, { "rpc-network-threads", 1, nullptr, 0 }, - { "enable-service-layer", 0, nullptr, 0 } + { "enable-service-layer", 0, nullptr, 0 }, + { "use-storage-chain", 0, nullptr, 0 } }; enum longopts_enum { LONGOPT_THREADS, @@ -906,7 +930,8 @@ App::get_options() LONGOPT_UPDATE_PASSES, LONGOPT_REMOVE_PASSES, LONGOPT_RPC_NETWORK_THREADS, - LONGOPT_ENABLE_SERVICE_LAYER + LONGOPT_ENABLE_SERVICE_LAYER, + LONGOPT_USE_STORAGE_CHAIN }; int opt_index = 1; resetOptIndex(opt_index); @@ -935,6 +960,9 @@ App::get_options() case LONGOPT_ENABLE_SERVICE_LAYER: _bm_params.set_enable_service_layer(true); break; + case LONGOPT_USE_STORAGE_CHAIN: + _bm_params.set_use_storage_chain(true); + break; default: return false; } |