diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-30 15:25:46 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-30 15:25:46 +0200 |
commit | 6337d6d35aaecf788b82ff6fa9ecd190d1b55f37 (patch) | |
tree | cc3d70dc83bcb04dde5ad6723873006bcd2486af /searchcore | |
parent | ff205ce5e2eccafeb0957007fb2671f1488e57c3 (diff) |
Move out classes to separate files. Enable feeding via distributor.
Diffstat (limited to 'searchcore')
16 files changed, 417 insertions, 179 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index 897073397ef..b5465b1f0dd 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -2,6 +2,9 @@ vespa_add_executable(searchcore_vespa_feed_bm_app SOURCES vespa_feed_bm.cpp + bm_cluster_controller.cpp + bm_storage_chain_builder.cpp + bm_storage_link.cpp spi_bm_feed_handler.cpp storage_api_rpc_bm_feed_handler.cpp storage_api_chain_bm_feed_handler.cpp diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp new file mode 100644 index 00000000000..324f98625f3 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp @@ -0,0 +1,54 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_cluster_controller.h" +#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/slobrok/sbmirror.h> + +using storage::api::StorageMessageAddress; +using storage::rpc::SharedRpcResources; +using storage::lib::NodeType; + +namespace feedbm { + +namespace { + +FRT_RPCRequest * +make_set_cluster_state_request() +{ + storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); + storage::rpc::SlimeClusterStateBundleCodec codec; + auto encoded_bundle = codec.encode(bundle); + auto *req = new FRT_RPCRequest(); + auto* params = req->GetParams(); + params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type)); + params->AddInt32(encoded_bundle._uncompressed_length); + params->AddData(std::move(*encoded_bundle._buffer)); + req->SetMethodName("setdistributionstates"); + return req; +} + +} + +BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in) + : _shared_rpc_resources(shared_rpc_resources_in) +{ +} + +void +BmClusterController::set_cluster_up(bool distributor) +{ + StorageMessageAddress storage_address("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0); + auto req = make_set_cluster_state_request(); + auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(), _shared_rpc_resources.target_factory()); + auto target = target_resolver->resolve_rpc_target(storage_address); + target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout + assert(!req->IsError()); + req->SubRef(); +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h new file mode 100644 index 00000000000..699036be5c9 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h @@ -0,0 +1,21 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace storage::api { class StorageMessageAddress; } +namespace storage::rpc { class SharedRpcResources; } + +namespace feedbm { + +/* + * Fake cluster controller that sets cluster state to be up. + */ +class BmClusterController +{ + storage::rpc::SharedRpcResources& _shared_rpc_resources; +public: + BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in); + void set_cluster_up(bool distributor); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp new file mode 100644 index 00000000000..bbe0de70ce2 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_storage_chain_builder.h" +#include "bm_storage_link_context.h" +#include "bm_storage_link.h" + +#include <vespa/log/log.h> +LOG_SETUP(".bm_storage_chain_builder"); + +namespace feedbm { + +BmStorageChainBuilder::BmStorageChainBuilder() + : storage::StorageChainBuilder(), + _context(std::make_shared<BmStorageLinkContext>()) +{ +} + +BmStorageChainBuilder::~BmStorageChainBuilder() = default; + +void +BmStorageChainBuilder::add(std::unique_ptr<storage::StorageLink> link) +{ + vespalib::string name = link->getName(); + storage::StorageChainBuilder::add(std::move(link)); + LOG(info, "Added storage link '%s'", name.c_str()); + if (name == "Communication manager") { + auto my_link = std::make_unique<BmStorageLink>(); + LOG(info, "Adding extra storage link '%s'", my_link->getName().c_str()); + _context->bm_link = my_link.get(); + storage::StorageChainBuilder::add(std::move(my_link)); + } +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h new file mode 100644 index 00000000000..b1347b5e953 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/storage/common/storage_chain_builder.h> + +namespace feedbm { + +class BmStorageLinkContext; + +/* + * Storage chain builder that inserts a BmStorageLink right below the + * communication manager. This allows sending benchmark feed to chain. + */ +class BmStorageChainBuilder : public storage::StorageChainBuilder +{ + std::shared_ptr<BmStorageLinkContext> _context; +public: + BmStorageChainBuilder(); + ~BmStorageChainBuilder() override; + const std::shared_ptr<BmStorageLinkContext>& get_context() { return _context; } + void add(std::unique_ptr<storage::StorageLink> link) override; +}; + +} + diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp new file mode 100644 index 00000000000..79517e98094 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp @@ -0,0 +1,64 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_storage_link.h" +#include "pending_tracker.h" +#include <vespa/vespalib/stllike/hash_map.hpp> + +namespace feedbm { + + +void +BmStorageLink::retain(uint64_t msg_id, PendingTracker &tracker) +{ + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); +} + +PendingTracker * +BmStorageLink::release(uint64_t msg_id) +{ + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + if (itr == _pending.end()) { + return nullptr; + } + auto tracker = itr->second; + _pending.erase(itr); + return tracker; +} + +BmStorageLink::BmStorageLink() + : storage::StorageLink("vespa-bm-feed"), + StorageReplyErrorChecker(), + _mutex(), + _pending() +{ +} + +BmStorageLink::~BmStorageLink() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +bool +BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) +{ + (void) msg; + return false; +} + +bool +BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) +{ + auto tracker = release(msg->getMsgId()); + if (tracker != nullptr) { + check_error(*msg); + tracker->release(); + return true; + } + return false; +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h new file mode 100644 index 00000000000..63ece355c02 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h @@ -0,0 +1,31 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "storage_reply_error_checker.h" +#include <vespa/storage/common/storagelink.h> +#include <vespa/vespalib/stllike/hash_map.h> + +namespace feedbm { + +class PendingTracker; + +/* + * Storage link used to feed storage api messages to a distributor or + * service layer node. A count of error replies is maintained. + */ +class BmStorageLink : public storage::StorageLink, + public StorageReplyErrorChecker +{ + std::mutex _mutex; + vespalib::hash_map<uint64_t, PendingTracker *> _pending; + PendingTracker *release(uint64_t msg_id); +public: + BmStorageLink(); + ~BmStorageLink() override; + bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override; + bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override; + void retain(uint64_t msg_id, PendingTracker &tracker); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h new file mode 100644 index 00000000000..adb2a13ec10 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +namespace feedbm { + +class BmStorageLink; + +/* + * This context is initialized by BmStorageChainBuilder. + */ +struct BmStorageLinkContext +{ + BmStorageLink* bm_link; + BmStorageLinkContext() + : bm_link(nullptr) + { + } + ~BmStorageLinkContext() = default; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h index 81a08552c0c..63d0913977e 100644 --- a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h @@ -3,6 +3,7 @@ #pragma once #include <memory> +#include <vespa/vespalib/stllike/string.h> namespace document { class Bucket; @@ -26,6 +27,9 @@ public: virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0; virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0; virtual uint32_t get_error_count() const = 0; + virtual const vespalib::string &get_name() const = 0; + virtual bool manages_buckets() const = 0; + virtual bool manages_timestamp() const = 0; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp index 2df97841bb9..7020e52eaef 100644 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp @@ -63,6 +63,7 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider) : IBmFeedHandler(), + _name("SpiBmFeedHandler"), _provider(provider), _errors(0u) { @@ -100,4 +101,23 @@ SpiBmFeedHandler::get_error_count() const { return _errors; } + +const vespalib::string& +SpiBmFeedHandler::get_name() const +{ + return _name; +} + +bool +SpiBmFeedHandler::manages_buckets() const +{ + return false; +} + +bool +SpiBmFeedHandler::manages_timestamp() const +{ + return false; +} + } diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h index bda41898510..49f526ab5b7 100644 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h @@ -14,6 +14,7 @@ namespace feedbm { */ class SpiBmFeedHandler : public IBmFeedHandler { + vespalib::string _name; storage::spi::PersistenceProvider& _provider; std::atomic<uint32_t> _errors; public: @@ -24,6 +25,9 @@ public: void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; void create_bucket(const document::Bucket& bucket); uint32_t get_error_count() const override; + const vespalib::string &get_name() const override; + bool manages_buckets() const override; + bool manages_timestamp() const override; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp index ba16f226ebb..2d45caff152 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp @@ -3,15 +3,13 @@ #include "storage_api_chain_bm_feed_handler.h" #include "pending_tracker.h" #include "storage_reply_error_checker.h" +#include "bm_storage_link_context.h" +#include "bm_storage_link.h" #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> -#include <vespa/storage/common/storagelink.h> -#include <vespa/storage/common/storage_chain_builder.h> -#include <vespa/vespalib/stllike/hash_map.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vdslib/state/cluster_state_bundle.h> #include <cassert> @@ -19,7 +17,6 @@ using document::Document; using document::DocumentId; using document::DocumentUpdate; -using storage::StorageLink; namespace feedbm { @@ -34,106 +31,10 @@ std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() { } -class BmStorageLink : public StorageLink, - public StorageReplyErrorChecker -{ - std::mutex _mutex; - vespalib::hash_map<uint64_t, PendingTracker *> _pending; -public: - BmStorageLink(); - ~BmStorageLink() override; - bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override; - bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override; - void retain(uint64_t msg_id, PendingTracker &tracker) { - tracker.retain(); - std::lock_guard lock(_mutex); - _pending.insert(std::make_pair(msg_id, &tracker)); - } - bool release(uint64_t msg_id) { - PendingTracker *tracker = nullptr; - { - std::lock_guard lock(_mutex); - auto itr = _pending.find(msg_id); - if (itr == _pending.end()) { - return false; - } - tracker = itr->second; - _pending.erase(itr); - } - tracker->release(); - return true; - } -}; - -BmStorageLink::BmStorageLink() - : storage::StorageLink("vespa-bm-feed"), - StorageReplyErrorChecker(), - _mutex(), - _pending() -{ -} - -BmStorageLink::~BmStorageLink() -{ - std::lock_guard lock(_mutex); - assert(_pending.empty()); -} - -bool -BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) -{ - (void) msg; - return false; -} - -bool -BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) -{ - check_error(*msg); - return release(msg->getMsgId()); -} - -struct StorageApiChainBmFeedHandler::Context { - BmStorageLink* bm_link; - Context() - : bm_link(nullptr) - { - } - ~Context() = default; -}; - -class MyStorageChainBuilder : public storage::StorageChainBuilder -{ - using Parent = storage::StorageChainBuilder; - std::shared_ptr<StorageApiChainBmFeedHandler::Context> _context; -public: - MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context); - ~MyStorageChainBuilder() override; - void add(std::unique_ptr<StorageLink> link) override; -}; - -MyStorageChainBuilder::MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context) - : storage::StorageChainBuilder(), - _context(std::move(context)) -{ -} - -MyStorageChainBuilder::~MyStorageChainBuilder() = default; - -void -MyStorageChainBuilder::add(std::unique_ptr<StorageLink> link) -{ - vespalib::string name = link->getName(); - Parent::add(std::move(link)); - if (name == "Communication manager") { - auto my_link = std::make_unique<BmStorageLink>(); - _context->bm_link = my_link.get(); - Parent::add(std::move(my_link)); - } -} - -StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<Context> context) +StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor) : IBmFeedHandler(), + _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"), + _distributor(distributor), _context(std::move(context)) { auto cmd = make_set_cluster_state_cmd(); @@ -174,22 +75,28 @@ StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const Docum send_msg(std::move(cmd), tracker); } -std::shared_ptr<StorageApiChainBmFeedHandler::Context> -StorageApiChainBmFeedHandler::get_context() +uint32_t +StorageApiChainBmFeedHandler::get_error_count() const +{ + return _context->bm_link->get_error_count(); +} + +const vespalib::string& +StorageApiChainBmFeedHandler::get_name() const { - return std::make_shared<Context>(); + return _name; } -std::unique_ptr<storage::IStorageChainBuilder> -StorageApiChainBmFeedHandler::get_storage_chain_builder(std::shared_ptr<Context> context) +bool +StorageApiChainBmFeedHandler::manages_buckets() const { - return std::make_unique<MyStorageChainBuilder>(std::move(context)); + return _distributor; } -uint32_t -StorageApiChainBmFeedHandler::get_error_count() const +bool +StorageApiChainBmFeedHandler::manages_timestamp() const { - return _context->bm_link->get_error_count(); + return _distributor; } } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h index c3fdab6cd76..089a7fd89e5 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h @@ -4,32 +4,34 @@ #include "i_bm_feed_handler.h" -namespace storage { class IStorageChainBuilder; } namespace storage::api { class StorageCommand; } namespace feedbm { +struct BmStorageLinkContext; + /* * Benchmark feed handler for feed to service layer using storage api protocol * directly on the storage chain. */ class StorageApiChainBmFeedHandler : public IBmFeedHandler { -public: - struct Context; private: - std::shared_ptr<Context> _context; + vespalib::string _name; + bool _distributor; + std::shared_ptr<BmStorageLinkContext> _context; void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); public: - StorageApiChainBmFeedHandler(std::shared_ptr<Context> context); + StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor); ~StorageApiChainBmFeedHandler(); void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - static std::shared_ptr<Context> get_context(); - static std::unique_ptr<storage::IStorageChainBuilder> get_storage_chain_builder(std::shared_ptr<Context> context); uint32_t get_error_count() const override; + const vespalib::string &get_name() const override; + bool manages_buckets() const override; + bool manages_timestamp() const override; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp index 3a974bb7d9a..e15efdf7ce6 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp @@ -9,17 +9,15 @@ #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/storageserver/message_dispatcher.h> -#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> +//#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> #include <vespa/storage/storageserver/rpc/message_codec_provider.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> -#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> +//#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> #include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> #include <vespa/vespalib/stllike/hash_map.h> #include <vespa/vespalib/stllike/hash_map.hpp> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> -#include <vespa/fnet/frt/target.h> -#include <vespa/slobrok/sbmirror.h> +//#include <vespa/fnet/frt/target.h> +//#include <vespa/slobrok/sbmirror.h> #include <cassert> using document::Document; @@ -28,37 +26,10 @@ using document::DocumentUpdate; using document::DocumentTypeRepo; using storage::api::StorageMessageAddress; using storage::rpc::SharedRpcResources; +using storage::lib::NodeType; namespace feedbm { -namespace { - -FRT_RPCRequest * -make_set_cluster_state_request() { - storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); - storage::rpc::SlimeClusterStateBundleCodec codec; - auto encoded_bundle = codec.encode(bundle); - auto *req = new FRT_RPCRequest(); - auto* params = req->GetParams(); - params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type)); - params->AddInt32(encoded_bundle._uncompressed_length); - params->AddData(std::move(*encoded_bundle._buffer)); - req->SetMethodName("setdistributionstates"); - return req; -} - -void -set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) { - auto req = make_set_cluster_state_request(); - auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory()); - auto target = target_resolver->resolve_rpc_target(storage_address); - target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout - assert(!req->IsError()); - req->SubRef(); -} - -} - class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher, public StorageReplyErrorChecker { @@ -105,15 +76,16 @@ StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() assert(_pending.empty()); } -StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo) +StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo, bool distributor) : IBmFeedHandler(), - _storage_address(std::make_unique<StorageMessageAddress>("storage", storage::lib::NodeType::STORAGE, 0)), + _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"), + _distributor(distributor), + _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), _shared_rpc_resources(shared_rpc_resources_in), _message_dispatcher(std::make_unique<MyMessageDispatcher>()), _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo, std::make_shared<documentapi::LoadTypeSet>())), _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, storage::rpc::StorageApiRpcService::Params())) { - set_cluster_up(_shared_rpc_resources, *_storage_address); } StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default; @@ -154,4 +126,22 @@ StorageApiRpcBmFeedHandler::get_error_count() const return _message_dispatcher->get_error_count(); } +const vespalib::string& +StorageApiRpcBmFeedHandler::get_name() const +{ + return _name; +} + +bool +StorageApiRpcBmFeedHandler::manages_buckets() const +{ + return _distributor; +} + +bool +StorageApiRpcBmFeedHandler::manages_timestamp() const +{ + return _distributor; +} + } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h index bc2f62e038f..762a3c5bd63 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h @@ -25,6 +25,8 @@ namespace feedbm { class StorageApiRpcBmFeedHandler : public IBmFeedHandler { class MyMessageDispatcher; + vespalib::string _name; + bool _distributor; std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; storage::rpc::SharedRpcResources& _shared_rpc_resources; std::unique_ptr<MyMessageDispatcher> _message_dispatcher; @@ -33,12 +35,15 @@ class StorageApiRpcBmFeedHandler : public IBmFeedHandler void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); public: - StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo); + StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo, bool distributor); ~StorageApiRpcBmFeedHandler(); void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; uint32_t get_error_count() const override; + const vespalib::string &get_name() const override; + bool manages_buckets() const override; + bool manages_timestamp() const override; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 5b9f7a58293..cca421657ea 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -4,6 +4,9 @@ #include "spi_bm_feed_handler.h" #include "storage_api_rpc_bm_feed_handler.h" #include "storage_api_chain_bm_feed_handler.h" +#include "bm_storage_chain_builder.h" +#include "bm_cluster_controller.h" +#include "bm_storage_link_context.h" #include <vespa/vespalib/testkit/testapp.h> #include <tests/proton/common/dummydbowner.h> #include <vespa/config-imported-fields.h> @@ -65,6 +68,7 @@ #include <vespa/storageserver/app/distributorprocess.h> #include <vespa/storage/config/config-stor-distributormanager.h> #include <vespa/storage/config/config-stor-visitordispatcher.h> +#include <vespa/slobrok/sbmirror.h> #include <getopt.h> #include <iostream> @@ -126,6 +130,9 @@ using search::transactionlog::TransLogServer; using storage::rpc::SharedRpcResources; using storage::spi::PersistenceProvider; using vespalib::makeLambdaTask; +using feedbm::BmClusterController; +using feedbm::BmStorageChainBuilder; +using feedbm::BmStorageLinkContext; using feedbm::IBmFeedHandler; using feedbm::SpiBmFeedHandler; using feedbm::StorageApiRpcBmFeedHandler; @@ -262,7 +269,7 @@ public: uint32_t get_remove_passes() const { return _remove_passes; } uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } bool get_enable_distributor() const { return _enable_distributor; } - bool get_enable_service_layer() const { return _enable_service_layer; } + bool get_enable_service_layer() const { return _enable_service_layer || _enable_distributor; } bool get_use_storage_chain() const { return _use_storage_chain; } void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_threads(uint32_t threads_in) { _threads = threads_in; } @@ -299,10 +306,6 @@ BMParams::check() const std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl; return false; } - if (_enable_distributor && !_enable_service_layer) { - std::cerr << "Service layer must be enabled if distributor layer is enabled" << std::endl; - return false; - } return true; } @@ -558,9 +561,10 @@ struct PersistenceProviderFixture { std::shared_ptr<IConfigContext> _config_context; std::unique_ptr<IBmFeedHandler> _feed_handler; std::unique_ptr<mbus::Slobrok> _slobrok; - std::shared_ptr<StorageApiChainBmFeedHandler::Context> _service_layer_chain_context; + std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context; std::unique_ptr<MyServiceLayerProcess> _service_layer; std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; + std::shared_ptr<BmStorageLinkContext> _distributor_chain_context; std::unique_ptr<storage::DistributorProcess> _distributor; PersistenceProviderFixture(const BMParams& params); @@ -573,6 +577,7 @@ struct PersistenceProviderFixture { std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const; std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const; void create_buckets(); + void wait_slobrok(const vespalib::string &name); void start_service_layer(const BMParams& params); void start_distributor(const BMParams& params); void create_feed_handler(const BMParams& params); @@ -623,6 +628,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _service_layer_chain_context(), _service_layer(), _rpc_client_shared_rpc_resources(), + _distributor_chain_context(), _distributor() { create_document_db(); @@ -725,16 +731,31 @@ PersistenceProviderFixture::create_buckets() } void +PersistenceProviderFixture::wait_slobrok(const vespalib::string &name) +{ + auto &mirror = _rpc_client_shared_rpc_resources->slobrok_mirror(); + LOG(info, "Waiting for %s in slobrok", name.c_str()); + for (;;) { + auto specs = mirror.lookup(name); + if (!specs.empty()) { + LOG(info, "Found %s in slobrok", name.c_str()); + return; + } + std::this_thread::sleep_for(100ms); + } +} + +void PersistenceProviderFixture::start_service_layer(const BMParams& params) { LOG(info, "start slobrok"); _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); LOG(info, "start service layer"); config::ConfigUri config_uri("bm-servicelayer", _config_context); - std::unique_ptr<storage::IStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain()) { - _service_layer_chain_context = StorageApiChainBmFeedHandler::get_context(); - chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(_service_layer_chain_context); + std::unique_ptr<BmStorageChainBuilder> chain_builder; + if (params.get_use_storage_chain() && !params.get_enable_distributor()) { + chain_builder = std::make_unique<BmStorageChainBuilder>(); + _service_layer_chain_context = chain_builder->get_context(); } _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, *_persistence_engine, @@ -746,27 +767,53 @@ PersistenceProviderFixture::start_service_layer(const BMParams& params) config::ConfigUri client_config_uri("bm-rpc-client", _config_context); _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100); _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); + wait_slobrok("storage/cluster.storage/storage/0/default"); + wait_slobrok("storage/cluster.storage/storage/0"); + BmClusterController fake_controller(*_rpc_client_shared_rpc_resources); + fake_controller.set_cluster_up(false); } void PersistenceProviderFixture::start_distributor(const BMParams& params) { - if (params.get_enable_distributor()) { - config::ConfigUri config_uri("bm-distributor", _config_context); - _distributor = std::make_unique<storage::DistributorProcess>(config_uri); - _distributor->setupConfig(100ms); - _distributor->createNode(); + config::ConfigUri config_uri("bm-distributor", _config_context); + std::unique_ptr<BmStorageChainBuilder> chain_builder; + if (params.get_use_storage_chain()) { + chain_builder = std::make_unique<BmStorageChainBuilder>(); + _distributor_chain_context = chain_builder->get_context(); + } + _distributor = std::make_unique<storage::DistributorProcess>(config_uri); + if (chain_builder) { + _distributor->set_storage_chain_builder(std::move(chain_builder)); } + _distributor->setupConfig(100ms); + _distributor->createNode(); + wait_slobrok("storage/cluster.storage/distributor/0/default"); + wait_slobrok("storage/cluster.storage/distributor/0"); + BmClusterController fake_controller(*_rpc_client_shared_rpc_resources); + fake_controller.set_cluster_up(true); + // Wait for bucket ownership transfer safe time + std::this_thread::sleep_for(2s); } void PersistenceProviderFixture::create_feed_handler(const BMParams& params) { + if (params.get_enable_distributor()) { + if (params.get_use_storage_chain()) { + assert(_distributor_chain_context); + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true); + } else { + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, true); + } + return; + } if (params.get_enable_service_layer()) { if (params.get_use_storage_chain()) { - _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context); + assert(_service_layer_chain_context); + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false); } else { - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, false); } } } @@ -849,11 +896,12 @@ put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbo vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; + bool use_timestamp = !f._feed_handler->manages_timestamp(); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; document::Bucket bucket(bucket_space, bucket_id); auto document = std::make_unique<Document>(repo, is); - f._feed_handler->put(bucket, std::move(document), time_bias + i, pending_tracker); + f._feed_handler->put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -901,11 +949,12 @@ update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib:: vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; + bool use_timestamp = !f._feed_handler->manages_timestamp(); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; document::Bucket bucket(bucket_space, bucket_id); auto document_update = DocumentUpdate::createHEAD(repo, is); - f._feed_handler->update(bucket, std::move(document_update), time_bias + i, pending_tracker); + f._feed_handler->update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -953,11 +1002,12 @@ remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib:: vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; + bool use_timestamp = !f._feed_handler->manages_timestamp(); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; document::Bucket bucket(bucket_space, bucket_id); DocumentId document_id(is); - f._feed_handler->remove(bucket, document_id, time_bias + i, pending_tracker); + f._feed_handler->remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -990,7 +1040,9 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "start initialize"); provider.initialize(); LOG(info, "create %u buckets", f.num_buckets()); - f.create_buckets(); + if (!f._feed_handler->manages_buckets()) { + f.create_buckets(); + } if (bm_params.get_enable_service_layer()) { f.start_service_layer(bm_params); } @@ -1003,6 +1055,7 @@ void benchmark_async_spi(const BMParams &bm_params) auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update"); auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove"); int64_t time_bias = 1; + LOG(info, "Feed handler is %s", f._feed_handler->get_name().c_str()); for (uint32_t pass = 0; pass < bm_params.get_put_passes(); ++pass) { run_put_async_tasks(f, executor, pass, time_bias, put_feed, bm_params); } |