diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-09-10 11:30:08 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-09-10 11:30:08 +0200 |
commit | c8beeb61f595019c22ee6eca137ab47625c5712a (patch) | |
tree | ba034cbf4fcee3b077dfad266e24460c9c451938 /searchcore/src/apps | |
parent | bb4ee4e9c053ca4f341eaa5490a850e13ea37f5c (diff) |
Start moving portions of vespa-feed-bm app to searchcore_bmcluster library.
Diffstat (limited to 'searchcore/src/apps')
30 files changed, 100 insertions, 2588 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index fe83c89d83a..daefef5d413 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -2,39 +2,7 @@ vespa_add_executable(searchcore_vespa_feed_bm_app SOURCES vespa_feed_bm.cpp - bm_cluster_controller.cpp - bm_message_bus.cpp - bm_storage_chain_builder.cpp - bm_storage_link.cpp - bucket_info_queue.cpp - document_api_message_bus_bm_feed_handler.cpp - pending_tracker.cpp - pending_tracker_hash.cpp - spi_bm_feed_handler.cpp - storage_api_chain_bm_feed_handler.cpp - storage_api_message_bus_bm_feed_handler.cpp - storage_api_rpc_bm_feed_handler.cpp - storage_reply_error_checker.cpp OUTPUT_NAME vespa-feed-bm DEPENDS - searchcore_server - searchcore_initializer - searchcore_reprocessing - searchcore_index - searchcore_persistenceengine - searchcore_docsummary - searchcore_feedoperation - searchcore_matching - searchcore_attribute - searchcore_documentmetastore - searchcore_bucketdb - searchcore_flushengine - searchcore_pcommon - searchcore_grouping - searchcore_proton_metrics - searchcore_fconfig - storageserver_storageapp - messagebus_messagebus-test - messagebus - searchlib_searchlib_uca + searchcore_bmcluster ) diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp deleted file mode 100644 index a1b40c56e11..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bm_cluster_controller.h" -#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> -#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> -#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> -#include <vespa/fnet/frt/target.h> -#include <vespa/slobrok/sbmirror.h> - -using storage::api::StorageMessageAddress; -using storage::rpc::SharedRpcResources; -using storage::lib::NodeType; - -namespace feedbm { - -namespace { - -FRT_RPCRequest * -make_set_cluster_state_request() -{ - storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); - storage::rpc::SlimeClusterStateBundleCodec codec; - auto encoded_bundle = codec.encode(bundle); - auto *req = new FRT_RPCRequest(); - auto* params = req->GetParams(); - params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type)); - params->AddInt32(encoded_bundle._uncompressed_length); - params->AddData(std::move(*encoded_bundle._buffer)); - req->SetMethodName("setdistributionstates"); - return req; -} - -} - -BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in) - : _shared_rpc_resources(shared_rpc_resources_in) -{ -} - -void -BmClusterController::set_cluster_up(bool distributor) -{ - static vespalib::string _storage("storage"); - StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0); - auto req = make_set_cluster_state_request(); - auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(), - _shared_rpc_resources.target_factory(), 1); - uint64_t fake_bucket_id = 0; - auto target = target_resolver->resolve_rpc_target(storage_address, fake_bucket_id); - target->get()->InvokeSync(req, 10.0); // 10 seconds timeout - assert(!req->IsError()); - req->SubRef(); -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h deleted file mode 100644 index 699036be5c9..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -namespace storage::api { class StorageMessageAddress; } -namespace storage::rpc { class SharedRpcResources; } - -namespace feedbm { - -/* - * Fake cluster controller that sets cluster state to be up. - */ -class BmClusterController -{ - storage::rpc::SharedRpcResources& _shared_rpc_resources; -public: - BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in); - void set_cluster_up(bool distributor); -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp deleted file mode 100644 index b608593dada..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bm_message_bus.h" -#include "pending_tracker_hash.h" -#include "pending_tracker.h" -#include "storage_reply_error_checker.h" -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/network/rpcnetworkparams.h> -#include <vespa/messagebus/rpcmessagebus.h> -#include <vespa/messagebus/ireplyhandler.h> -#include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/documentapi/messagebus/messages/documentmessage.h> -#include <vespa/storageapi/mbusprot/storageprotocol.h> -#include <vespa/storageapi/mbusprot/storagereply.h> -#include <vespa/vespalib/stllike/asciistream.h> - -#include <vespa/log/log.h> -LOG_SETUP(".bm_message_bus"); - -using documentapi::DocumentProtocol; -using mbus::RPCMessageBus; -using mbus::Reply; -using mbus::SourceSession; -using storage::mbusprot::StorageProtocol; -using storage::mbusprot::StorageReply; - -namespace feedbm { - -namespace { - -std::atomic<uint64_t> bm_message_bus_msg_id(0u); - -vespalib::string reply_as_string(Reply &reply) { - vespalib::asciistream os; - if (reply.getType() == 0) { - os << "empty reply"; - } else { - os << "reply=" << reply.toString() << ", protocol=" << reply.getProtocol(); - } - os << ", "; - auto message = reply.getMessage(); - if (message) { - os << "message=" << message->toString(); - os << ", protocol=" << message->getProtocol(); - } else { - os << "no message"; - } - reply.setMessage(std::move(message)); - os << ", "; - if (reply.hasErrors()) { - os << "errors=["; - for (uint32_t i = 0; i < reply.getNumErrors(); ++i) { - auto &error = reply.getError(i); - if (i > 0) { - os << ", "; - } - os << mbus::ErrorCode::getName(error.getCode()) << ": " << error.getMessage() << " (from " << error.getService() << ")"; - } - os << "]"; - } else { - os << "no errors"; - } - return os.str(); -} - -} - -class BmMessageBus::ReplyHandler : public mbus::IReplyHandler, - public StorageReplyErrorChecker -{ - PendingTrackerHash _pending_hash; -public: - ReplyHandler(); - ~ReplyHandler() override; - void handleReply(std::unique_ptr<Reply> reply) override; - void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } - void message_aborted(uint64_t msg_id); -}; - -BmMessageBus::ReplyHandler::ReplyHandler() - : mbus::IReplyHandler(), - StorageReplyErrorChecker(), - _pending_hash() -{ -} - -BmMessageBus::ReplyHandler::~ReplyHandler() = default; - -void -BmMessageBus::ReplyHandler::handleReply(std::unique_ptr<Reply> reply) -{ - auto msg_id = reply->getContext().value.UINT64; - auto tracker = _pending_hash.release(msg_id); - if (tracker != nullptr) { - bool failed = false; - if (reply->getType() == 0 || reply->hasErrors()) { - failed = true; // empty reply or error - } else { - auto protocol = reply->getProtocol(); - if (protocol == DocumentProtocol::NAME) { - } else if (protocol == StorageProtocol::NAME) { - auto sreply = dynamic_cast<storage::mbusprot::StorageReply *>(reply.get()); - if (sreply != nullptr) { - check_error(*sreply->getReply()); - } else { - failed = true; // unexpected message type - } - } else { - failed = true; // unexpected protocol - } - } - if (failed) { - ++_errors; - LOG(error, "Unexpected %s", reply_as_string(*reply).c_str()); - } - tracker->release(); - } else { - ++_errors; - LOG(error, "Untracked %s", reply_as_string(*reply).c_str()); - } -} - -void -BmMessageBus::ReplyHandler::message_aborted(uint64_t msg_id) -{ - ++_errors; - auto tracker = _pending_hash.release(msg_id); - tracker->release(); -} - -BmMessageBus::BmMessageBus(const config::ConfigUri& config_uri, - std::shared_ptr<const document::DocumentTypeRepo> document_type_repo) - : _reply_handler(std::make_unique<ReplyHandler>()), - _message_bus(), - _session() -{ - mbus::RPCNetworkParams params(config_uri); - mbus::ProtocolSet protocol_set; - protocol_set.add(std::make_shared<DocumentProtocol>(document_type_repo)); - protocol_set.add(std::make_shared<StorageProtocol>(document_type_repo)); - params.setIdentity(mbus::Identity("vespa-bm-client")); - _message_bus = std::make_unique<mbus::RPCMessageBus>( - protocol_set, - params, - config_uri); - mbus::SourceSessionParams srcParams; - srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP()); - srcParams.setReplyHandler(*_reply_handler); - _session = _message_bus->getMessageBus().createSourceSession(srcParams); -} - -BmMessageBus::~BmMessageBus() -{ - _session.reset(); - _message_bus.reset(); - _reply_handler.reset(); -} - -uint32_t -BmMessageBus::get_error_count() const -{ - return _reply_handler->get_error_count(); -} - -void -BmMessageBus::send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker) -{ - auto msg_id = ++bm_message_bus_msg_id; - _reply_handler->retain(msg_id, tracker); - msg->setContext(mbus::Context(msg_id)); - msg->setRetryEnabled(false); - auto result = _session->send(std::move(msg), route); - if (!result.isAccepted()) { - LOG(error, "Message not accepeted, error is '%s'", result.getError().toString().c_str()); - _reply_handler->message_aborted(msg_id); - } -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h deleted file mode 100644 index a9cff1fb826..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <memory> - -namespace config { class ConfigUri; } -namespace document { class DocumentTypeRepo; } - -namespace mbus { - -class Message; -class RPCMessageBus; -class Route; -class SourceSession; - -} - -namespace feedbm { - -class PendingTracker; - -/* - * Message bus for feed benchmark program. - */ -class BmMessageBus -{ - class ReplyHandler; - std::unique_ptr<ReplyHandler> _reply_handler; - std::unique_ptr<mbus::RPCMessageBus> _message_bus; - std::unique_ptr<mbus::SourceSession> _session; -public: - BmMessageBus(const config::ConfigUri& config_uri, - std::shared_ptr<const document::DocumentTypeRepo> document_type_repo); - ~BmMessageBus(); - uint32_t get_error_count() const; - void send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker); -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp deleted file mode 100644 index bbe0de70ce2..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bm_storage_chain_builder.h" -#include "bm_storage_link_context.h" -#include "bm_storage_link.h" - -#include <vespa/log/log.h> -LOG_SETUP(".bm_storage_chain_builder"); - -namespace feedbm { - -BmStorageChainBuilder::BmStorageChainBuilder() - : storage::StorageChainBuilder(), - _context(std::make_shared<BmStorageLinkContext>()) -{ -} - -BmStorageChainBuilder::~BmStorageChainBuilder() = default; - -void -BmStorageChainBuilder::add(std::unique_ptr<storage::StorageLink> link) -{ - vespalib::string name = link->getName(); - storage::StorageChainBuilder::add(std::move(link)); - LOG(info, "Added storage link '%s'", name.c_str()); - if (name == "Communication manager") { - auto my_link = std::make_unique<BmStorageLink>(); - LOG(info, "Adding extra storage link '%s'", my_link->getName().c_str()); - _context->bm_link = my_link.get(); - storage::StorageChainBuilder::add(std::move(my_link)); - } -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h deleted file mode 100644 index bba933da9e0..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/storage/common/storage_chain_builder.h> - -namespace feedbm { - -struct BmStorageLinkContext; - -/* - * Storage chain builder that inserts a BmStorageLink right below the - * communication manager. This allows sending benchmark feed to chain. - */ -class BmStorageChainBuilder : public storage::StorageChainBuilder -{ - std::shared_ptr<BmStorageLinkContext> _context; -public: - BmStorageChainBuilder(); - ~BmStorageChainBuilder() override; - const std::shared_ptr<BmStorageLinkContext>& get_context() { return _context; } - void add(std::unique_ptr<storage::StorageLink> link) override; -}; - -} - diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp deleted file mode 100644 index 2aeda91c30c..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bm_storage_link.h" -#include "pending_tracker.h" - -namespace feedbm { - - -BmStorageLink::BmStorageLink() - : storage::StorageLink("vespa-bm-feed"), - StorageReplyErrorChecker(), - _pending_hash() -{ -} - -BmStorageLink::~BmStorageLink() = default; - -bool -BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) -{ - (void) msg; - return false; -} - -bool -BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) -{ - auto tracker = _pending_hash.release(msg->getMsgId()); - if (tracker != nullptr) { - check_error(*msg); - tracker->release(); - return true; - } - return false; -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h deleted file mode 100644 index 95528d7b2d9..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "storage_reply_error_checker.h" -#include "pending_tracker_hash.h" -#include <vespa/storage/common/storagelink.h> - -namespace feedbm { - -class PendingTracker; - -/* - * Storage link used to feed storage api messages to a distributor or - * service layer node. A count of error replies is maintained. - */ -class BmStorageLink : public storage::StorageLink, - public StorageReplyErrorChecker -{ - PendingTrackerHash _pending_hash; -public: - BmStorageLink(); - ~BmStorageLink() override; - bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override; - bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override; - void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h deleted file mode 100644 index f2df20f1f66..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -namespace feedbm { - -class BmStorageLink; - -/* - * This context is initialized by BmStorageChainBuilder. - */ -struct BmStorageLinkContext -{ - BmStorageLink* bm_link; - BmStorageLinkContext() noexcept - : bm_link(nullptr) - { - } - ~BmStorageLinkContext() = default; -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp b/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp deleted file mode 100644 index fc43402d68e..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bucket_info_queue.h" -#include <vespa/persistence/spi/persistenceprovider.h> - -namespace feedbm { - -BucketInfoQueue::BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors) - : _mutex(), - _buckets(), - _provider(provider), - _errors(errors) -{ -} - -BucketInfoQueue::~BucketInfoQueue() -{ - get_bucket_info_loop(); -} - -void -BucketInfoQueue::get_bucket_info_loop() -{ - std::unique_lock guard(_mutex); - while (!_buckets.empty()) { - auto bucket = _buckets.front(); - _buckets.pop_front(); - guard.unlock(); - auto bucket_info = _provider.getBucketInfo(bucket); - if (bucket_info.hasError()) { - ++_errors; - } - guard.lock(); - } -} - -} - diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h b/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h deleted file mode 100644 index 07a55127234..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/persistence/spi/bucket.h> -#include <mutex> -#include <deque> -#include <atomic> - -namespace storage::spi { struct PersistenceProvider; } - -namespace feedbm { - -/* - * Class containing a queue of buckets where mutating feed operations - * have been performed, requiring service layer to ask for updated - * bucket info. - */ -class BucketInfoQueue -{ - std::mutex _mutex; - std::deque<storage::spi::Bucket> _buckets; - storage::spi::PersistenceProvider& _provider; - std::atomic<uint32_t>& _errors; - -public: - BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors); - ~BucketInfoQueue(); - - void put_bucket(storage::spi::Bucket bucket) - { - std::lock_guard guard(_mutex); - _buckets.emplace_back(std::move(bucket)); - } - - void get_bucket_info_loop(); -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp deleted file mode 100644 index 38c8490de69..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "document_api_message_bus_bm_feed_handler.h" -#include "bm_message_bus.h" -#include "pending_tracker.h" -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h> -#include <vespa/storageapi/messageapi/storagemessage.h> - -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; -using storage::api::StorageMessageAddress; -using storage::lib::NodeType; - -namespace feedbm { - -namespace { - vespalib::string _Storage("storage"); -} - -DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus) - : IBmFeedHandler(), - _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")), - _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, NodeType::DISTRIBUTOR, 0)), - _message_bus(message_bus), - _route(_storage_address->to_mbus_route()) -{ -} - -DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default; - -void -DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker) -{ - _message_bus.send_msg(std::move(msg), _route, pending_tracker); -} - -void -DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker) -{ - auto msg = std::make_unique<documentapi::PutDocumentMessage>(std::move(document)); - send_msg(std::move(msg), tracker); -} - -void -DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker) -{ - auto msg = std::make_unique<documentapi::UpdateDocumentMessage>(std::move(document_update)); - send_msg(std::move(msg), tracker); -} - -void -DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker) -{ - auto msg = std::make_unique<documentapi::RemoveDocumentMessage>(document_id); - send_msg(std::move(msg), tracker); -} - -void -DocumentApiMessageBusBmFeedHandler::get(const document::Bucket&, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - auto msg = std::make_unique<documentapi::GetDocumentMessage>(document_id, field_set_string); - send_msg(std::move(msg), tracker); -} - -void -DocumentApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&) -{ -} - -uint32_t -DocumentApiMessageBusBmFeedHandler::get_error_count() const -{ - return _message_bus.get_error_count(); -} - -const vespalib::string& -DocumentApiMessageBusBmFeedHandler::get_name() const -{ - return _name; -} - -bool -DocumentApiMessageBusBmFeedHandler::manages_timestamp() const -{ - return true; -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h deleted file mode 100644 index c71bb113c5b..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "i_bm_feed_handler.h" -#include <vespa/messagebus/routing/route.h> - -namespace document { class DocumentTypeRepo; } -namespace documentapi { class DocumentMessage; }; -namespace storage::api { class StorageMessageAddress; } - -namespace feedbm { - -class BmMessageBus; - -/* - * Benchmark feed handler for feed to distributor using document api protocol - * over message bus. - */ -class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler -{ - vespalib::string _name; - std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; - BmMessageBus& _message_bus; - mbus::Route _route; - void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker); -public: - DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus); - ~DocumentApiMessageBusBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; - void attach_bucket_info_queue(PendingTracker &tracker) override; - uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h deleted file mode 100644 index 26cbf27b455..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <memory> -#include <vespa/vespalib/stllike/string.h> - -namespace document { -class Bucket; -class Document; -class DocumentUpdate; -class DocumentId; -} - -namespace feedbm { - -class BucketInfoQueue; -class PendingTracker; - -/* - * Interface class for benchmark feed handler. - */ -class IBmFeedHandler -{ -public: - virtual ~IBmFeedHandler() = default; - virtual void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) = 0; - virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0; - virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0; - virtual void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) = 0; - virtual void attach_bucket_info_queue(PendingTracker& tracker) = 0; - virtual uint32_t get_error_count() const = 0; - virtual const vespalib::string &get_name() const = 0; - virtual bool manages_timestamp() const = 0; -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp deleted file mode 100644 index 94bed4cb3bd..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "pending_tracker.h" -#include "bucket_info_queue.h" -#include <thread> -#include <chrono> - -using namespace std::chrono_literals; - -namespace feedbm { - -PendingTracker::PendingTracker(uint32_t limit) - : _pending(0u), - _limit(limit), - _bucket_info_queue() -{ -} - -PendingTracker::~PendingTracker() -{ - drain(); -} - -void -PendingTracker::retain() { - while (_pending >= _limit) { - std::this_thread::sleep_for(1ms); - } - _pending++; -} - -void -PendingTracker::drain() -{ - if (_bucket_info_queue) { - _bucket_info_queue->get_bucket_info_loop(); - } - while (_pending > 0) { - std::this_thread::sleep_for(1ms); - if (_bucket_info_queue) { - _bucket_info_queue->get_bucket_info_loop(); - } - } - if (_bucket_info_queue) { - _bucket_info_queue->get_bucket_info_loop(); - } -} - -void -PendingTracker::attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors) -{ - _bucket_info_queue = std::make_unique<BucketInfoQueue>(provider, errors); -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h deleted file mode 100644 index 4ca84ab7442..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <atomic> -#include <memory> - -namespace storage::spi { struct PersistenceProvider; } - -namespace feedbm { - -class BucketInfoQueue; - -/* - * Class to track number of pending operations, used as backpressure during - * benchmark feeding. - */ -class PendingTracker { - std::atomic<uint32_t> _pending; - uint32_t _limit; - std::unique_ptr<BucketInfoQueue> _bucket_info_queue; - -public: - PendingTracker(uint32_t limit); - ~PendingTracker(); - - void release() { - _pending--; - } - void retain(); - void drain(); - - void attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors); - BucketInfoQueue *get_bucket_info_queue() { return _bucket_info_queue.get(); } -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp deleted file mode 100644 index 6863d35703e..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "pending_tracker_hash.h" -#include "pending_tracker.h" -#include <vespa/vespalib/stllike/hash_map.hpp> -#include <cassert> - -namespace feedbm { - -PendingTrackerHash::PendingTrackerHash() - : _mutex(), - _pending() -{ -} - -PendingTrackerHash::~PendingTrackerHash() -{ - std::lock_guard lock(_mutex); - assert(_pending.empty()); -} - -void -PendingTrackerHash::retain(uint64_t msg_id, PendingTracker &tracker) -{ - tracker.retain(); - std::lock_guard lock(_mutex); - _pending.insert(std::make_pair(msg_id, &tracker)); -} - -PendingTracker * -PendingTrackerHash::release(uint64_t msg_id) -{ - std::lock_guard lock(_mutex); - auto itr = _pending.find(msg_id); - if (itr == _pending.end()) { - return nullptr; - } - auto tracker = itr->second; - _pending.erase(itr); - return tracker; -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h deleted file mode 100644 index 89be93fd4ed..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/vespalib/stllike/hash_map.h> -#include <mutex> - -namespace feedbm { - -class PendingTracker; - -/* - * Class maintaing mapping from message id to pending tracker - */ -class PendingTrackerHash -{ - std::mutex _mutex; - vespalib::hash_map<uint64_t, PendingTracker *> _pending; -public: - PendingTrackerHash(); - ~PendingTrackerHash(); - PendingTracker *release(uint64_t msg_id); - void retain(uint64_t msg_id, PendingTracker &tracker); -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp deleted file mode 100644 index 11149eecb3f..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "spi_bm_feed_handler.h" -#include "pending_tracker.h" -#include "bucket_info_queue.h" -#include <vespa/document/fieldset/fieldsetrepo.h> -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/persistence/spi/persistenceprovider.h> - -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; -using storage::spi::Bucket; -using storage::spi::PersistenceProvider; -using storage::spi::Timestamp; - -namespace feedbm { - -namespace { - -storage::spi::Context context(storage::spi::Priority(0), 0); - -void get_bucket_info_loop(PendingTracker &tracker) -{ - auto bucket_info_queue = tracker.get_bucket_info_queue(); - if (bucket_info_queue != nullptr) { - bucket_info_queue->get_bucket_info_loop(); - } -} - -class MyOperationComplete : public storage::spi::OperationComplete -{ - std::atomic<uint32_t> &_errors; - Bucket _bucket; - PendingTracker& _tracker; -public: - MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker); - ~MyOperationComplete() override; - void onComplete(std::unique_ptr<storage::spi::Result> result) override; - void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; -}; - -MyOperationComplete::MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker) - : _errors(errors), - _bucket(bucket), - _tracker(tracker) -{ - _tracker.retain(); -} - -MyOperationComplete::~MyOperationComplete() -{ - _tracker.release(); -} - -void -MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) -{ - if (result->hasError()) { - ++_errors; - } else { - auto bucket_info_queue = _tracker.get_bucket_info_queue(); - if (bucket_info_queue != nullptr) { - bucket_info_queue->put_bucket(_bucket); - } - } -} - -void -MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler) -{ - (void) resultHandler; -} - -} - -SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider, const document::FieldSetRepo &field_set_repo, bool skip_get_spi_bucket_info) - : IBmFeedHandler(), - _name(vespalib::string("SpiBmFeedHandler(") + (skip_get_spi_bucket_info ? "skip-get-spi-bucket-info" : "get-spi-bucket-info") + ")"), - _provider(provider), - _field_set_repo(field_set_repo), - _errors(0u), - _skip_get_spi_bucket_info(skip_get_spi_bucket_info) -{ -} - -SpiBmFeedHandler::~SpiBmFeedHandler() = default; - -void -SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) -{ - get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker)); -} - -void -SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) -{ - get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker)); -} - -void -SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) -{ - get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker)); -} - -void -SpiBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - auto field_set = _field_set_repo.getFieldSet(field_set_string); - auto result = _provider.get(spi_bucket, *field_set, document_id, context); - if (result.hasError()) { - ++_errors; - } -} - -void -SpiBmFeedHandler::create_bucket(const document::Bucket& bucket) -{ - _provider.createBucket(Bucket(bucket), context); -} - -void -SpiBmFeedHandler::attach_bucket_info_queue(PendingTracker& tracker) -{ - if (!_skip_get_spi_bucket_info) { - tracker.attach_bucket_info_queue(_provider, _errors); - } -} - -uint32_t -SpiBmFeedHandler::get_error_count() const -{ - return _errors; -} - -const vespalib::string& -SpiBmFeedHandler::get_name() const -{ - return _name; -} - -bool -SpiBmFeedHandler::manages_timestamp() const -{ - return false; -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h deleted file mode 100644 index a78aa06628b..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "i_bm_feed_handler.h" -#include <atomic> - -namespace document { class FieldSetRepo; } -namespace storage::spi { struct PersistenceProvider; } - -namespace feedbm { - -/* - * Benchmark feed handler for feed directly to persistence provider - */ -class SpiBmFeedHandler : public IBmFeedHandler -{ - vespalib::string _name; - storage::spi::PersistenceProvider& _provider; - const document::FieldSetRepo& _field_set_repo; - std::atomic<uint32_t> _errors; - bool _skip_get_spi_bucket_info; -public: - SpiBmFeedHandler(storage::spi::PersistenceProvider& provider, const document::FieldSetRepo& field_set_repo, bool skip_get_spi_bucket_info); - ~SpiBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; - void create_bucket(const document::Bucket& bucket); - void attach_bucket_info_queue(PendingTracker &tracker) override; - uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp deleted file mode 100644 index 82cf2df065f..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storage_api_chain_bm_feed_handler.h" -#include "pending_tracker.h" -#include "storage_reply_error_checker.h" -#include "bm_storage_link_context.h" -#include "bm_storage_link.h" -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/storageapi/messageapi/storagemessage.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/message/state.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> -#include <cassert> - -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; - -namespace feedbm { - -namespace { - -std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() { - storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); - auto cmd = std::make_shared<storage::api::SetSystemStateCommand>(bundle); - cmd->setPriority(storage::api::StorageMessage::VERYHIGH); - return cmd; -} - -} - -StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor) - : IBmFeedHandler(), - _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), - _distributor(distributor), - _context(std::move(context)) -{ - auto cmd = make_set_cluster_state_cmd(); - PendingTracker tracker(1); - send_msg(std::move(cmd), tracker); - tracker.drain(); -} - -StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default; - -void -StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) -{ - cmd->setSourceIndex(0); - auto bm_link = _context->bm_link; - bm_link->retain(cmd->getMsgId(), pending_tracker); - bm_link->sendDown(std::move(cmd)); -} - -void -StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiChainBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiChainBmFeedHandler::attach_bucket_info_queue(PendingTracker&) -{ -} - -uint32_t -StorageApiChainBmFeedHandler::get_error_count() const -{ - return _context->bm_link->get_error_count(); -} - -const vespalib::string& -StorageApiChainBmFeedHandler::get_name() const -{ - return _name; -} - -bool -StorageApiChainBmFeedHandler::manages_timestamp() const -{ - return _distributor; -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h deleted file mode 100644 index 0c4b715122e..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "i_bm_feed_handler.h" - -namespace storage::api { class StorageCommand; } - -namespace feedbm { - -struct BmStorageLinkContext; - -/* - * Benchmark feed handler for feed to service layer or distributor - * using storage api protocol directly on the storage chain. - */ -class StorageApiChainBmFeedHandler : public IBmFeedHandler -{ -private: - vespalib::string _name; - bool _distributor; - std::shared_ptr<BmStorageLinkContext> _context; - void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); -public: - StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor); - ~StorageApiChainBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; - - void attach_bucket_info_queue(PendingTracker &tracker) override; - uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp deleted file mode 100644 index f63a8e33cc0..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storage_api_message_bus_bm_feed_handler.h" -#include "bm_message_bus.h" -#include "pending_tracker.h" -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/storageapi/messageapi/storagemessage.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/mbusprot/storagecommand.h> - -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; -using storage::api::StorageMessageAddress; -using storage::lib::NodeType; - -namespace feedbm { - -namespace { - vespalib::string _Storage("storage"); -} -StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor) - : IBmFeedHandler(), - _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), - _distributor(distributor), - _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), - _message_bus(message_bus), - _route(_storage_address->to_mbus_route()) -{ -} - -StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default; - -void -StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) -{ - cmd->setSourceIndex(0); - auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd); - _message_bus.send_msg(std::move(msg), _route, pending_tracker); -} - -void -StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiMessageBusBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&) -{ -} - -uint32_t -StorageApiMessageBusBmFeedHandler::get_error_count() const -{ - return _message_bus.get_error_count(); -} - -const vespalib::string& -StorageApiMessageBusBmFeedHandler::get_name() const -{ - return _name; -} - -bool -StorageApiMessageBusBmFeedHandler::manages_timestamp() const -{ - return _distributor; -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h deleted file mode 100644 index 2aafd0c6830..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "i_bm_feed_handler.h" -#include <vespa/messagebus/routing/route.h> - -namespace document { class DocumentTypeRepo; } -namespace documentapi { class DocumentMessage; }; -namespace storage::api { -class StorageCommand; -class StorageMessageAddress; -} - -namespace feedbm { - -class BmMessageBus; - -/* - * Benchmark feed handler for feed to service layer or distributor - * using storage api protocol over message bus. - */ -class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler -{ - vespalib::string _name; - bool _distributor; - std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; - BmMessageBus& _message_bus; - mbus::Route _route; - void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); -public: - StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor); - ~StorageApiMessageBusBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; - void attach_bucket_info_queue(PendingTracker &tracker) override; - uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp deleted file mode 100644 index 04d49bba0a3..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storage_api_rpc_bm_feed_handler.h" -#include "pending_tracker.h" -#include "pending_tracker_hash.h" -#include "storage_reply_error_checker.h" -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/storageapi/messageapi/storagemessage.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storage/storageserver/message_dispatcher.h> -#include <vespa/storage/storageserver/rpc/message_codec_provider.h> -#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> -#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> - -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; -using document::DocumentTypeRepo; -using storage::api::StorageMessageAddress; -using storage::rpc::SharedRpcResources; -using storage::rpc::StorageApiRpcService; -using storage::lib::NodeType; - -namespace feedbm { - -namespace { - vespalib::string _Storage("storage"); -} - -class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher, - public StorageReplyErrorChecker -{ - PendingTrackerHash _pending_hash; -public: - MyMessageDispatcher() - : storage::MessageDispatcher(), - StorageReplyErrorChecker(), - _pending_hash() - { - } - ~MyMessageDispatcher() override; - void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override { - check_error(*msg); - release(msg->getMsgId()); - } - void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override { - check_error(*msg); - release(msg->getMsgId()); - } - void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } - void release(uint64_t msg_id) { - auto tracker = _pending_hash.release(msg_id); - if (tracker != nullptr) { - tracker->release(); - } else { - ++_errors; - } - } -}; - -StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() -{ -} - -StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, - std::shared_ptr<const DocumentTypeRepo> repo, - const StorageApiRpcService::Params& rpc_params, - bool distributor) - : IBmFeedHandler(), - _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), - _distributor(distributor), - _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), - _shared_rpc_resources(shared_rpc_resources_in), - _message_dispatcher(std::make_unique<MyMessageDispatcher>()), - _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo)), - _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params)) -{ -} - -StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default; - -void -StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) -{ - cmd->setSourceIndex(0); - cmd->setAddress(*_storage_address); - _message_dispatcher->retain(cmd->getMsgId(), pending_tracker); - _rpc_client->send_rpc_v1_request(std::move(cmd)); -} - -void -StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); - send_rpc(std::move(cmd), tracker); -} - -void -StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); - send_rpc(std::move(cmd), tracker); -} - -void -StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); - send_rpc(std::move(cmd), tracker); -} - -void -StorageApiRpcBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string); - send_rpc(std::move(cmd), tracker); -} - -void -StorageApiRpcBmFeedHandler::attach_bucket_info_queue(PendingTracker&) -{ -} - -uint32_t -StorageApiRpcBmFeedHandler::get_error_count() const -{ - return _message_dispatcher->get_error_count(); -} - -const vespalib::string& -StorageApiRpcBmFeedHandler::get_name() const -{ - return _name; -} - -bool -StorageApiRpcBmFeedHandler::manages_timestamp() const -{ - return _distributor; -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h deleted file mode 100644 index 5057d8889a5..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "i_bm_feed_handler.h" -#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> - -namespace document { class DocumentTypeRepo; } -namespace storage::api { -class StorageMessageAddress; -class StorageCommand; -} - -namespace storage::rpc { -class MessageCodecProvider; -class SharedRpcResources; -} - -namespace feedbm { - -/* - * Benchmark feed handler for feed to service layer or distributor - * using storage api protocol over rpc. - */ -class StorageApiRpcBmFeedHandler : public IBmFeedHandler -{ - class MyMessageDispatcher; - vespalib::string _name; - bool _distributor; - std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; - storage::rpc::SharedRpcResources& _shared_rpc_resources; - std::unique_ptr<MyMessageDispatcher> _message_dispatcher; - std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider; - std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client; - - void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); -public: - StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, - std::shared_ptr<const document::DocumentTypeRepo> repo, - const storage::rpc::StorageApiRpcService::Params& rpc_params, - bool distributor); - ~StorageApiRpcBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; - void attach_bucket_info_queue(PendingTracker &tracker) override; - uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp deleted file mode 100644 index 260b0c8a7af..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storage_reply_error_checker.h" -#include <vespa/storageapi/messageapi/storagereply.h> - -#include <vespa/log/log.h> -LOG_SETUP(".storage_reply_error_checker"); - -namespace feedbm { - -StorageReplyErrorChecker::StorageReplyErrorChecker() - : _errors(0u) -{ -} - -StorageReplyErrorChecker::~StorageReplyErrorChecker() = default; - -void -StorageReplyErrorChecker::check_error(const storage::api::StorageMessage &msg) -{ - auto reply = dynamic_cast<const storage::api::StorageReply*>(&msg); - if (reply != nullptr) { - if (reply->getResult().failed()) { - if (++_errors <= 10) { - LOG(info, "reply '%s', return code '%s'", reply->toString().c_str(), reply->getResult().toString().c_str()); - } - } - } else { - ++_errors; - } -} - -} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h deleted file mode 100644 index 4743367b426..00000000000 --- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <atomic> - -namespace storage::api { class StorageMessage; } - -namespace feedbm { - -class StorageReplyErrorChecker { -protected: - std::atomic<uint32_t> _errors; -public: - StorageReplyErrorChecker(); - ~StorageReplyErrorChecker(); - void check_error(const storage::api::StorageMessage &msg); - uint32_t get_error_count() const { return _errors; } -}; - -} diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index cd1920d237f..9752a4b5c36 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -1,80 +1,25 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "bm_cluster_controller.h" -#include "bm_message_bus.h" -#include "bm_storage_chain_builder.h" -#include "bm_storage_link_context.h" -#include "pending_tracker.h" -#include "spi_bm_feed_handler.h" -#include "storage_api_chain_bm_feed_handler.h" -#include "storage_api_message_bus_bm_feed_handler.h" -#include "storage_api_rpc_bm_feed_handler.h" -#include "document_api_message_bus_bm_feed_handler.h" -#include <tests/proton/common/dummydbowner.h> -#include <vespa/config-attributes.h> -#include <vespa/config-bucketspaces.h> -#include <vespa/config-imported-fields.h> -#include <vespa/config-indexschema.h> -#include <vespa/config-persistence.h> -#include <vespa/config-rank-profiles.h> -#include <vespa/config-slobroks.h> -#include <vespa/config-stor-distribution.h> -#include <vespa/config-stor-filestor.h> -#include <vespa/config-summary.h> -#include <vespa/config-summarymap.h> -#include <vespa/config-upgrading.h> -#include <vespa/config/common/configcontext.h> -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/fieldset/fieldsetrepo.h> -#include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/repo/configbuilder.h> #include <vespa/document/repo/document_type_repo_factory.h> #include <vespa/document/repo/documenttyperepo.h> -#include <vespa/document/test/make_bucket_space.h> -#include <vespa/document/update/assignvalueupdate.h> -#include <vespa/document/update/documentupdate.h> #include <vespa/fastos/app.h> -#include <vespa/messagebus/config-messagebus.h> -#include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/metrics/config-metricsmanager.h> -#include <vespa/searchcommon/common/schemaconfigurer.h> -#include <vespa/searchcore/proton/common/alloc_config.h> -#include <vespa/searchcore/proton/common/hw_info.h> -#include <vespa/searchcore/proton/matching/querylimiter.h> -#include <vespa/searchcore/proton/metrics/metricswireservice.h> -#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> -#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> -#include <vespa/searchcore/proton/server/bootstrapconfig.h> -#include <vespa/searchcore/proton/server/document_db_maintenance_config.h> -#include <vespa/searchcore/proton/server/documentdb.h> -#include <vespa/searchcore/proton/server/documentdbconfigmanager.h> -#include <vespa/searchcore/proton/server/fileconfigmanager.h> -#include <vespa/searchcore/proton/server/memoryconfigstore.h> -#include <vespa/searchcore/proton/server/persistencehandlerproxy.h> -#include <vespa/searchcore/proton/server/threading_service_config.h> -#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> +#include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/searchcore/bmcluster/bm_cluster.h> +#include <vespa/searchcore/bmcluster/bm_cluster_controller.h> +#include <vespa/searchcore/bmcluster/bm_cluster_params.h> +#include <vespa/searchcore/bmcluster/bm_feed.h> +#include <vespa/searchcore/bmcluster/bm_node.h> +#include <vespa/searchcore/bmcluster/bm_range.h> +#include <vespa/searchcore/bmcluster/bucket_selector.h> +#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> -#include <vespa/searchlib/transactionlog/translogserver.h> -#include <vespa/searchsummary/config/config-juniperrc.h> -#include <vespa/slobrok/sbmirror.h> -#include <vespa/storage/bucketdb/config-stor-bucket-init.h> -#include <vespa/storage/common/i_storage_chain_builder.h> -#include <vespa/storage/config/config-stor-bouncer.h> -#include <vespa/storage/config/config-stor-communicationmanager.h> -#include <vespa/storage/config/config-stor-distributormanager.h> -#include <vespa/storage/config/config-stor-opslogger.h> -#include <vespa/storage/config/config-stor-prioritymapping.h> -#include <vespa/storage/config/config-stor-server.h> -#include <vespa/storage/config/config-stor-status.h> -#include <vespa/storage/config/config-stor-visitordispatcher.h> -#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> -#include <vespa/storage/visiting/config-stor-visitor.h> -#include <vespa/storageserver/app/distributorprocess.h> -#include <vespa/storageserver/app/servicelayerprocess.h> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <getopt.h> #include <iostream> #include <thread> @@ -82,82 +27,30 @@ #include <vespa/log/log.h> LOG_SETUP("vespa-feed-bm"); -using namespace cloud::config::filedistribution; -using namespace config; using namespace proton; using namespace std::chrono_literals; -using namespace vespa::config::search::core; -using namespace vespa::config::search::summary; -using namespace vespa::config::search; -using vespa::config::content::PersistenceConfigBuilder; -using vespa::config::content::StorDistributionConfigBuilder; -using vespa::config::content::StorFilestorConfigBuilder; -using vespa::config::content::UpgradingConfigBuilder; -using vespa::config::content::core::BucketspacesConfig; -using vespa::config::content::core::BucketspacesConfigBuilder; -using vespa::config::content::core::StorBouncerConfigBuilder; -using vespa::config::content::core::StorBucketInitConfigBuilder; -using vespa::config::content::core::StorCommunicationmanagerConfigBuilder; -using vespa::config::content::core::StorDistributormanagerConfigBuilder; -using vespa::config::content::core::StorOpsloggerConfigBuilder; -using vespa::config::content::core::StorPrioritymappingConfigBuilder; -using vespa::config::content::core::StorServerConfigBuilder; -using vespa::config::content::core::StorStatusConfigBuilder; -using vespa::config::content::core::StorVisitorConfigBuilder; -using vespa::config::content::core::StorVisitordispatcherConfigBuilder; -using cloud::config::SlobroksConfigBuilder; -using messagebus::MessagebusConfigBuilder; -using metrics::MetricsmanagerConfigBuilder; -using config::ConfigContext; -using config::ConfigSet; -using config::ConfigUri; -using config::IConfigContext; -using document::AssignValueUpdate; -using document::BucketId; -using document::BucketSpace; -using document::Document; -using document::DocumentId; -using document::DocumentType; using document::DocumentTypeRepo; using document::DocumentTypeRepoFactory; -using document::DocumentUpdate; using document::DocumenttypesConfig; using document::DocumenttypesConfigBuilder; -using document::Field; -using document::FieldSetRepo; -using document::FieldUpdate; -using document::IntFieldValue; -using document::test::makeBucketSpace; -using feedbm::BmClusterController; -using feedbm::BmMessageBus; -using feedbm::BmStorageChainBuilder; -using feedbm::BmStorageLinkContext; -using feedbm::IBmFeedHandler; -using feedbm::DocumentApiMessageBusBmFeedHandler; -using feedbm::SpiBmFeedHandler; -using feedbm::StorageApiChainBmFeedHandler; -using feedbm::StorageApiMessageBusBmFeedHandler; -using feedbm::StorageApiRpcBmFeedHandler; -using search::TuneFileDocumentDB; +using search::bmcluster::BmClusterController; +using search::bmcluster::IBmFeedHandler; +using search::bmcluster::BmClusterParams; +using search::bmcluster::BmCluster; +using search::bmcluster::BmFeed; +using search::bmcluster::BmNode; +using search::bmcluster::BmRange; +using search::bmcluster::BucketSelector; using search::index::DummyFileHeaderContext; -using search::index::Schema; -using search::index::SchemaBuilder; -using search::transactionlog::TransLogServer; -using storage::rpc::SharedRpcResources; -using storage::rpc::StorageApiRpcService; using storage::spi::PersistenceProvider; -using vespalib::compression::CompressionConfig; using vespalib::makeLambdaTask; -using proton::ThreadingServiceConfig; - -using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>; namespace { vespalib::string base_dir = "testdb"; -std::shared_ptr<DocumenttypesConfig> make_document_type() { +std::shared_ptr<DocumenttypesConfig> make_document_types() { using Struct = document::config_builder::Struct; using DataType = document::DataType; document::config_builder::DocumenttypesConfigBuilderHelper builder; @@ -165,130 +58,14 @@ std::shared_ptr<DocumenttypesConfig> make_document_type() { return std::make_shared<DocumenttypesConfig>(builder.config()); } -std::shared_ptr<AttributesConfig> make_attributes_config() { - AttributesConfigBuilder builder; - AttributesConfig::Attribute attribute; - attribute.name = "int"; - attribute.datatype = AttributesConfig::Attribute::Datatype::INT32; - builder.attribute.emplace_back(attribute); - return std::make_shared<AttributesConfig>(builder); -} - -std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<DocumenttypesConfig> document_types, std::shared_ptr<const DocumentTypeRepo> repo, const DocTypeName& doc_type_name) -{ - auto indexschema = std::make_shared<IndexschemaConfig>(); - auto attributes = make_attributes_config(); - auto summary = std::make_shared<SummaryConfig>(); - std::shared_ptr<Schema> schema(new Schema()); - SchemaBuilder::build(*indexschema, *schema); - SchemaBuilder::build(*attributes, *schema); - SchemaBuilder::build(*summary, *schema); - return std::make_shared<DocumentDBConfig>( - 1, - std::make_shared<RankProfilesConfig>(), - std::make_shared<matching::RankingConstants>(), - std::make_shared<matching::RankingExpressions>(), - std::make_shared<matching::OnnxModels>(), - indexschema, - attributes, - summary, - std::make_shared<SummarymapConfig>(), - std::make_shared<JuniperrcConfig>(), - document_types, - repo, - std::make_shared<ImportedFieldsConfig>(), - std::make_shared<TuneFileDocumentDB>(), - schema, - std::make_shared<DocumentDBMaintenanceConfig>(), - search::LogDocumentStore::Config(), - std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)), - std::make_shared<const AllocConfig>(), - "client", - doc_type_name.getName()); -} - -void -make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port) -{ - SlobroksConfigBuilder::Slobrok slobrok; - slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); - slobroks.slobrok.push_back(std::move(slobrok)); -} - -void -make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces) -{ - BucketspacesConfigBuilder::Documenttype bucket_space_map; - bucket_space_map.name = "test"; - bucket_space_map.bucketspace = "default"; - bucketspaces.documenttype.emplace_back(std::move(bucket_space_map)); -} - -class MyPersistenceEngineOwner : public IPersistenceEngineOwner -{ - void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { } -}; - -struct MyResourceWriteFilter : public IResourceWriteFilter -{ - bool acceptWriteOperation() const override { return true; } - State getAcceptState() const override { return IResourceWriteFilter::State(); } -}; - -class BucketSelector -{ - uint32_t _thread_id; - uint32_t _threads; - uint32_t _num_buckets; -public: - BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in) - : _thread_id(thread_id_in), - _threads(threads_in), - _num_buckets((num_buckets_in / _threads) * _threads) - { - } - uint64_t operator()(uint32_t i) const { - return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets; - } -}; - -class BMRange -{ - uint32_t _start; - uint32_t _end; -public: - BMRange(uint32_t start_in, uint32_t end_in) - : _start(start_in), - _end(end_in) - { - } - uint32_t get_start() const { return _start; } - uint32_t get_end() const { return _end; } -}; - -class BMParams { +class BMParams : public BmClusterParams { uint32_t _documents; uint32_t _client_threads; uint32_t _get_passes; - vespalib::string _indexing_sequencer; uint32_t _put_passes; uint32_t _update_passes; uint32_t _remove_passes; - uint32_t _rpc_network_threads; - uint32_t _rpc_events_before_wakeup; - uint32_t _rpc_targets_per_node; - uint32_t _response_threads; uint32_t _max_pending; - bool _enable_distributor; - bool _enable_service_layer; - bool _skip_get_spi_bucket_info; - bool _use_document_api; - bool _use_message_bus; - bool _use_storage_chain; - bool _use_async_message_handling_on_schedule; - uint32_t _bucket_db_stripe_bits; - uint32_t _distributor_stripes; - bool _skip_communicationmanager_thread; uint32_t get_start(uint32_t thread_id) const { return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads); } @@ -297,82 +74,39 @@ public: : _documents(160000), _client_threads(1), _get_passes(0), - _indexing_sequencer(), _put_passes(2), _update_passes(1), _remove_passes(2), - _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def - _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def - _rpc_targets_per_node(1), // Same default as in stor-communicationmanager.def - _response_threads(2), // Same default as in stor-filestor.def - _max_pending(1000), - _enable_distributor(false), - _enable_service_layer(false), - _skip_get_spi_bucket_info(false), - _use_document_api(false), - _use_message_bus(false), - _use_storage_chain(false), - _use_async_message_handling_on_schedule(false), - _bucket_db_stripe_bits(0), - _distributor_stripes(0), - _skip_communicationmanager_thread(false) // Same default as in stor-communicationmanager.def + _max_pending(1000) { } - BMRange get_range(uint32_t thread_id) const { - return BMRange(get_start(thread_id), get_start(thread_id + 1)); + BmRange get_range(uint32_t thread_id) const { + return BmRange(get_start(thread_id), get_start(thread_id + 1)); } uint32_t get_documents() const { return _documents; } uint32_t get_max_pending() const { return _max_pending; } uint32_t get_client_threads() const { return _client_threads; } uint32_t get_get_passes() const { return _get_passes; } - const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } uint32_t get_put_passes() const { return _put_passes; } uint32_t get_update_passes() const { return _update_passes; } uint32_t get_remove_passes() const { return _remove_passes; } - uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } - uint32_t get_rpc_events_before_wakup() const { return _rpc_events_before_wakeup; } - uint32_t get_rpc_targets_per_node() const { return _rpc_targets_per_node; } - uint32_t get_response_threads() const { return _response_threads; } - bool get_enable_distributor() const { return _enable_distributor; } - bool get_skip_get_spi_bucket_info() const { return _skip_get_spi_bucket_info; } - bool get_use_document_api() const { return _use_document_api; } - bool get_use_message_bus() const { return _use_message_bus; } - bool get_use_storage_chain() const { return _use_storage_chain; } - bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; } - uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } - uint32_t get_distributor_stripes() const { return _distributor_stripes; } - bool get_skip_communicationmanager_thread() const { return _skip_communicationmanager_thread; } void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; } void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; } void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; } - void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; } - void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } - void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; } - void set_rpc_targets_per_node(uint32_t targets_in) { _rpc_targets_per_node = targets_in; } - void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } - void set_enable_distributor(bool value) { _enable_distributor = value; } - void set_enable_service_layer(bool value) { _enable_service_layer = value; } - void set_skip_get_spi_bucket_info(bool value) { _skip_get_spi_bucket_info = value; } - void set_use_document_api(bool value) { _use_document_api = value; } - void set_use_message_bus(bool value) { _use_message_bus = value; } - void set_use_storage_chain(bool value) { _use_storage_chain = value; } - void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; } - void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } - void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; } - void set_skip_communicationmanager_thread(bool value) { _skip_communicationmanager_thread = value; } bool check() const; - bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } - bool needs_distributor() const { return _enable_distributor || _use_document_api; } - bool needs_message_bus() const { return _use_message_bus || _use_document_api; } + bool needs_message_bus() const { return get_use_message_bus() || get_use_document_api(); } }; bool BMParams::check() const { + if (!BmClusterParams::check()) { + return false; + } if (_client_threads < 1) { std::cerr << "Too few client threads: " << _client_threads << std::endl; return false; @@ -389,533 +123,82 @@ BMParams::check() const std::cerr << "Put passes too low: " << _put_passes << std::endl; return false; } - if (_rpc_network_threads < 1) { - std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl; - return false; - } - if (_rpc_targets_per_node < 1) { - std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl; - return false; - } - if (_response_threads < 1) { - std::cerr << "Too few response threads: " << _response_threads << std::endl; - return false; - } return true; } -class MyServiceLayerProcess : public storage::ServiceLayerProcess { - PersistenceProvider& _provider; - -public: - MyServiceLayerProcess(const config::ConfigUri & configUri, - PersistenceProvider &provider, - std::unique_ptr<storage::IStorageChainBuilder> chain_builder); - ~MyServiceLayerProcess() override { shutdown(); } - - void shutdown() override; - void setupProvider() override; - PersistenceProvider& getProvider() override; -}; - -MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri, - PersistenceProvider &provider, - std::unique_ptr<storage::IStorageChainBuilder> chain_builder) - : ServiceLayerProcess(configUri), - _provider(provider) -{ - if (chain_builder) { - set_storage_chain_builder(std::move(chain_builder)); - } -} - -void -MyServiceLayerProcess::shutdown() -{ - ServiceLayerProcess::shutdown(); -} - -void -MyServiceLayerProcess::setupProvider() -{ -} - -PersistenceProvider& -MyServiceLayerProcess::getProvider() -{ - return _provider; -} - -struct MyStorageConfig -{ - vespalib::string config_id; - DocumenttypesConfigBuilder documenttypes; - StorDistributionConfigBuilder stor_distribution; - StorBouncerConfigBuilder stor_bouncer; - StorCommunicationmanagerConfigBuilder stor_communicationmanager; - StorOpsloggerConfigBuilder stor_opslogger; - StorPrioritymappingConfigBuilder stor_prioritymapping; - UpgradingConfigBuilder upgrading; - StorServerConfigBuilder stor_server; - StorStatusConfigBuilder stor_status; - BucketspacesConfigBuilder bucketspaces; - MetricsmanagerConfigBuilder metricsmanager; - SlobroksConfigBuilder slobroks; - MessagebusConfigBuilder messagebus; - - MyStorageConfig(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, - int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params) - : config_id(config_id_in), - documenttypes(documenttypes_in), - stor_distribution(), - stor_bouncer(), - stor_communicationmanager(), - stor_opslogger(), - stor_prioritymapping(), - upgrading(), - stor_server(), - stor_status(), - bucketspaces(), - metricsmanager(), - slobroks(), - messagebus() - { - { - auto &dc = stor_distribution; - { - StorDistributionConfigBuilder::Group group; - { - StorDistributionConfigBuilder::Group::Nodes node; - node.index = 0; - group.nodes.push_back(std::move(node)); - } - group.index = "invalid"; - group.name = "invalid"; - group.capacity = 1.0; - group.partitions = ""; - dc.group.push_back(std::move(group)); - } - dc.redundancy = 1; - dc.readyCopies = 1; - } - stor_server.isDistributor = distributor; - stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits(); - if (distributor) { - stor_server.rootFolder = "distributor"; - } else { - stor_server.rootFolder = "storage"; - } - make_slobroks_config(slobroks, slobrok_port); - stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads(); - stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakup(); - stor_communicationmanager.rpc.numTargetsPerNode = params.get_rpc_targets_per_node(); - stor_communicationmanager.mbusport = mbus_port; - stor_communicationmanager.rpcport = rpc_port; - stor_communicationmanager.skipThread = params.get_skip_communicationmanager_thread(); - - stor_status.httpport = status_port; - make_bucketspaces_config(bucketspaces); - } - - ~MyStorageConfig(); - - void add_builders(ConfigSet &set) { - set.addBuilder(config_id, &documenttypes); - set.addBuilder(config_id, &stor_distribution); - set.addBuilder(config_id, &stor_bouncer); - set.addBuilder(config_id, &stor_communicationmanager); - set.addBuilder(config_id, &stor_opslogger); - set.addBuilder(config_id, &stor_prioritymapping); - set.addBuilder(config_id, &upgrading); - set.addBuilder(config_id, &stor_server); - set.addBuilder(config_id, &stor_status); - set.addBuilder(config_id, &bucketspaces); - set.addBuilder(config_id, &metricsmanager); - set.addBuilder(config_id, &slobroks); - set.addBuilder(config_id, &messagebus); - } -}; - -MyStorageConfig::~MyStorageConfig() = default; - -struct MyServiceLayerConfig : public MyStorageConfig -{ - PersistenceConfigBuilder persistence; - StorFilestorConfigBuilder stor_filestor; - StorBucketInitConfigBuilder stor_bucket_init; - StorVisitorConfigBuilder stor_visitor; - - MyServiceLayerConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, - int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params) - : MyStorageConfig(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), - persistence(), - stor_filestor(), - stor_bucket_init(), - stor_visitor() - { - stor_filestor.numResponseThreads = params.get_response_threads(); - stor_filestor.numNetworkThreads = params.get_rpc_network_threads(); - stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule(); - } - - ~MyServiceLayerConfig(); - - void add_builders(ConfigSet &set) { - MyStorageConfig::add_builders(set); - set.addBuilder(config_id, &persistence); - set.addBuilder(config_id, &stor_filestor); - set.addBuilder(config_id, &stor_bucket_init); - set.addBuilder(config_id, &stor_visitor); - } -}; - -MyServiceLayerConfig::~MyServiceLayerConfig() = default; - -struct MyDistributorConfig : public MyStorageConfig -{ - StorDistributormanagerConfigBuilder stor_distributormanager; - StorVisitordispatcherConfigBuilder stor_visitordispatcher; - - MyDistributorConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, - int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params) - : MyStorageConfig(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), - stor_distributormanager(), - stor_visitordispatcher() - { - stor_distributormanager.numDistributorStripes = params.get_distributor_stripes(); - } - - ~MyDistributorConfig(); - - void add_builders(ConfigSet &set) { - MyStorageConfig::add_builders(set); - set.addBuilder(config_id, &stor_distributormanager); - set.addBuilder(config_id, &stor_visitordispatcher); - } -}; - -MyDistributorConfig::~MyDistributorConfig() = default; - -struct MyRpcClientConfig { - vespalib::string config_id; - SlobroksConfigBuilder slobroks; - - MyRpcClientConfig(const vespalib::string &config_id_in, int slobrok_port) - : config_id(config_id_in), - slobroks() - { - make_slobroks_config(slobroks, slobrok_port); - } - ~MyRpcClientConfig(); - - void add_builders(ConfigSet &set) { - set.addBuilder(config_id, &slobroks); - } -}; - -MyRpcClientConfig::~MyRpcClientConfig() = default; - -struct MyMessageBusConfig { - vespalib::string config_id; - SlobroksConfigBuilder slobroks; - MessagebusConfigBuilder messagebus; - - MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port) - : config_id(config_id_in), - slobroks(), - messagebus() - { - make_slobroks_config(slobroks, slobrok_port); - } - ~MyMessageBusConfig(); - - void add_builders(ConfigSet &set) { - set.addBuilder(config_id, &slobroks); - set.addBuilder(config_id, &messagebus); - } -}; - -MyMessageBusConfig::~MyMessageBusConfig() = default; - } struct PersistenceProviderFixture { - std::shared_ptr<DocumenttypesConfig> _document_types; + std::shared_ptr<const DocumenttypesConfig> _document_types; std::shared_ptr<const DocumentTypeRepo> _repo; - DocTypeName _doc_type_name; - const DocumentType* _document_type; - const Field& _field; - std::shared_ptr<DocumentDBConfig> _document_db_config; - vespalib::string _base_dir; - DummyFileHeaderContext _file_header_context; - int _tls_listen_port; - int _slobrok_port; - int _rpc_client_port; - int _service_layer_mbus_port; - int _service_layer_rpc_port; - int _service_layer_status_port; - int _distributor_mbus_port; - int _distributor_rpc_port; - int _distributor_status_port; - TransLogServer _tls; - vespalib::string _tls_spec; - matching::QueryLimiter _query_limiter; - vespalib::Clock _clock; - DummyWireService _metrics_wire_service; - MemoryConfigStores _config_stores; - vespalib::ThreadStackExecutor _summary_executor; - DummyDBOwner _document_db_owner; - BucketSpace _bucket_space; - std::shared_ptr<DocumentDB> _document_db; - MyPersistenceEngineOwner _persistence_owner; - MyResourceWriteFilter _write_filter; - test::DiskMemUsageNotifier _disk_mem_usage_notifier; - std::shared_ptr<PersistenceEngine> _persistence_engine; - std::unique_ptr<const FieldSetRepo> _field_set_repo; - uint32_t _bucket_bits; - MyServiceLayerConfig _service_layer_config; - MyDistributorConfig _distributor_config; - MyRpcClientConfig _rpc_client_config; - MyMessageBusConfig _message_bus_config; - ConfigSet _config_set; - std::shared_ptr<IConfigContext> _config_context; - std::unique_ptr<IBmFeedHandler> _feed_handler; - std::unique_ptr<mbus::Slobrok> _slobrok; - std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context; - std::unique_ptr<MyServiceLayerProcess> _service_layer; - std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; - std::shared_ptr<BmStorageLinkContext> _distributor_chain_context; - std::unique_ptr<storage::DistributorProcess> _distributor; - std::unique_ptr<BmMessageBus> _message_bus; + std::unique_ptr<BmCluster> _bm_cluster; + std::unique_ptr<BmNode> _bm_node; + BmFeed _feed; + IBmFeedHandler* _feed_handler; explicit PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); - void create_document_db(const BMParams & params); - uint32_t num_buckets() const { return (1u << _bucket_bits); } - BucketId make_bucket_id(uint32_t n) const { return BucketId(_bucket_bits, n & (num_buckets() - 1)); } - document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); } - DocumentId make_document_id(uint32_t n, uint32_t i) const; - std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const; - std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const; void create_buckets(); void wait_slobrok(const vespalib::string &name); - void start_service_layer(const BMParams& params); - void start_distributor(const BMParams& params); + void start_service_layer(const BmClusterParams& params); + void start_distributor(const BmClusterParams& params); void start_message_bus(); - void create_feed_handler(const BMParams& params); + void create_feed_handler(const BmClusterParams& params); void shutdown_feed_handler(); void shutdown_message_bus(); void shutdown_distributor(); void shutdown_service_layer(); + PersistenceProvider* get_persistence_provider() { return _bm_node->get_persistence_provider(); } }; PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) - : _document_types(make_document_type()), - _repo(DocumentTypeRepoFactory::make(*_document_types)), - _doc_type_name("test"), - _document_type(_repo->getDocumentType(_doc_type_name.getName())), - _field(_document_type->getField("int")), - _document_db_config(make_document_db_config(_document_types, _repo, _doc_type_name)), - _base_dir(base_dir), - _file_header_context(), - _tls_listen_port(9017), - _slobrok_port(9018), - _rpc_client_port(9019), - _service_layer_mbus_port(9020), - _service_layer_rpc_port(9021), - _service_layer_status_port(9022), - _distributor_mbus_port(9023), - _distributor_rpc_port(9024), - _distributor_status_port(9025), - _tls("tls", _tls_listen_port, _base_dir, _file_header_context), - _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), - _query_limiter(), - _clock(), - _metrics_wire_service(), - _config_stores(), - _summary_executor(8, 128_Ki), - _document_db_owner(), - _bucket_space(makeBucketSpace(_doc_type_name.getName())), - _document_db(), - _persistence_owner(), - _write_filter(), - _disk_mem_usage_notifier(), - _persistence_engine(), - _field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)), - _bucket_bits(16), - _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), - _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), - _rpc_client_config("bm-rpc-client", _slobrok_port), - _message_bus_config("bm-message-bus", _slobrok_port), - _config_set(), - _config_context(std::make_shared<ConfigContext>(_config_set)), - _feed_handler(), - _slobrok(), - _service_layer_chain_context(), - _service_layer(), - _rpc_client_shared_rpc_resources(), - _distributor_chain_context(), - _distributor(), - _message_bus() -{ - create_document_db(params); - _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false); - auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db); - _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); - _service_layer_config.add_builders(_config_set); - _distributor_config.add_builders(_config_set); - _rpc_client_config.add_builders(_config_set); - _message_bus_config.add_builders(_config_set); - _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info()); -} - -PersistenceProviderFixture::~PersistenceProviderFixture() -{ - if (_persistence_engine) { - _persistence_engine->destroyIterators(); - _persistence_engine->removeHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name); - } - if (_document_db) { - _document_db->close(); - } -} - -void -PersistenceProviderFixture::create_document_db(const BMParams & params) -{ - vespalib::mkdir(_base_dir, false); - vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false); - vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig"; - { - FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName()); - fileCfg.saveConfig(*_document_db_config, 1); - } - config::DirSpec spec(input_cfg + "/config-1"); - auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>(); - DocumentDBConfigHelper mgr(spec, _doc_type_name.getName()); - auto protonCfg = std::make_shared<ProtonConfigBuilder>(); - if ( ! params.get_indexing_sequencer().empty()) { - vespalib::string sequencer = params.get_indexing_sequencer(); - std::transform(sequencer.begin(), sequencer.end(), sequencer.begin(), [](unsigned char c){ return std::toupper(c); }); - protonCfg->indexing.optimize = ProtonConfig::Indexing::getOptimize(sequencer); - } - auto bootstrap_config = std::make_shared<BootstrapConfig>(1, - _document_types, - _repo, - std::move(protonCfg), - std::make_shared<FiledistributorrpcConfig>(), - std::make_shared<BucketspacesConfig>(), - tuneFileDocDB, HwInfo()); - mgr.forwardConfig(bootstrap_config); - mgr.nextGeneration(0ms); - _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name, - _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner, - _summary_executor, _summary_executor, *_persistence_engine, _tls, - _metrics_wire_service, _file_header_context, - _config_stores.getConfigStore(_doc_type_name.toString()), - std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo()); - _document_db->start(); - _document_db->waitForOnlineState(); -} - -DocumentId -PersistenceProviderFixture::make_document_id(uint32_t n, uint32_t i) const + : _document_types(make_document_types()), + _repo(document::DocumentTypeRepoFactory::make(*_document_types)), + _bm_cluster(std::make_unique<BmCluster>(params, _repo)), + _bm_node(BmNode::create(params, _document_types)), + _feed(_repo), + _feed_handler(nullptr) { - DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i)); - return id; } -std::unique_ptr<Document> -PersistenceProviderFixture::make_document(uint32_t n, uint32_t i) const -{ - auto id = make_document_id(n, i); - auto document = std::make_unique<Document>(*_document_type, id); - document->setRepo(*_repo); - document->setFieldValue(_field, std::make_unique<IntFieldValue>(i)); - return document; -} - -std::unique_ptr<DocumentUpdate> -PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const -{ - auto id = make_document_id(n, i); - auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id); - document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15)))); - return document_update; -} +PersistenceProviderFixture::~PersistenceProviderFixture() = default; void PersistenceProviderFixture::create_buckets() { - SpiBmFeedHandler feed_handler(*_persistence_engine, *_field_set_repo, false); - for (unsigned int i = 0; i < num_buckets(); ++i) { - feed_handler.create_bucket(make_bucket(i)); + auto feed_handler = _bm_node->make_create_bucket_feed_handler(false); + for (unsigned int i = 0; i < _feed.num_buckets(); ++i) { + feed_handler->create_bucket(_feed.make_bucket(i)); } } void PersistenceProviderFixture::wait_slobrok(const vespalib::string &name) { - auto &mirror = _rpc_client_shared_rpc_resources->slobrok_mirror(); - LOG(info, "Waiting for %s in slobrok", name.c_str()); - for (;;) { - auto specs = mirror.lookup(name); - if (!specs.empty()) { - LOG(info, "Found %s in slobrok", name.c_str()); - return; - } - std::this_thread::sleep_for(100ms); - } + _bm_cluster->wait_slobrok(name); } void -PersistenceProviderFixture::start_service_layer(const BMParams& params) +PersistenceProviderFixture::start_service_layer(const BmClusterParams& params) { - LOG(info, "start slobrok"); - _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); - LOG(info, "start service layer"); - config::ConfigUri config_uri("bm-servicelayer", _config_context); - std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain() && !params.needs_distributor()) { - chain_builder = std::make_unique<BmStorageChainBuilder>(); - _service_layer_chain_context = chain_builder->get_context(); - } - _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, - *_persistence_engine, - std::move(chain_builder)); - _service_layer->setupConfig(100ms); - _service_layer->createNode(); - _service_layer->getNode().waitUntilInitialized(); - LOG(info, "start rpc client shared resources"); - config::ConfigUri client_config_uri("bm-rpc-client", _config_context); - _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources> - (client_config_uri, _rpc_client_port, 100, params.get_rpc_events_before_wakup()); - _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); + _bm_cluster->start_slobrok(); + _bm_node->start_service_layer(params); + _bm_node->wait_service_layer(); + _bm_cluster->start_rpc_client(); wait_slobrok("storage/cluster.storage/storage/0/default"); wait_slobrok("storage/cluster.storage/storage/0"); - BmClusterController fake_controller(*_rpc_client_shared_rpc_resources); + BmClusterController fake_controller(_bm_cluster->get_rpc_client()); fake_controller.set_cluster_up(false); } void -PersistenceProviderFixture::start_distributor(const BMParams& params) +PersistenceProviderFixture::start_distributor(const BmClusterParams& params) { - config::ConfigUri config_uri("bm-distributor", _config_context); - std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain() && !params.get_use_document_api()) { - chain_builder = std::make_unique<BmStorageChainBuilder>(); - _distributor_chain_context = chain_builder->get_context(); - } - _distributor = std::make_unique<storage::DistributorProcess>(config_uri); - if (chain_builder) { - _distributor->set_storage_chain_builder(std::move(chain_builder)); - } - _distributor->setupConfig(100ms); - _distributor->createNode(); + _bm_node->start_distributor(params); wait_slobrok("storage/cluster.storage/distributor/0/default"); wait_slobrok("storage/cluster.storage/distributor/0"); - BmClusterController fake_controller(*_rpc_client_shared_rpc_resources); + BmClusterController fake_controller(_bm_cluster->get_rpc_client()); fake_controller.set_cluster_up(true); // Wait for bucket ownership transfer safe time std::this_thread::sleep_for(2s); @@ -924,102 +207,45 @@ PersistenceProviderFixture::start_distributor(const BMParams& params) void PersistenceProviderFixture::start_message_bus() { - config::ConfigUri config_uri("bm-message-bus", _config_context); - LOG(info, "Starting message bus"); - _message_bus = std::make_unique<BmMessageBus>(config_uri, _repo); - LOG(info, "Started message bus"); + _bm_cluster->start_message_bus(); } void -PersistenceProviderFixture::create_feed_handler(const BMParams& params) +PersistenceProviderFixture::create_feed_handler(const BmClusterParams& params) { - StorageApiRpcService::Params rpc_params; - // This is the same compression config as the default in stor-communicationmanager.def. - rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024); - rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node(); - if (params.get_use_document_api()) { - _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus); - } else if (params.get_enable_distributor()) { - if (params.get_use_storage_chain()) { - assert(_distributor_chain_context); - _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true); - } else if (params.get_use_message_bus()) { - _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true); - } else { - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true); - } - } else if (params.needs_service_layer()) { - if (params.get_use_storage_chain()) { - assert(_service_layer_chain_context); - _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false); - } else if (params.get_use_message_bus()) { - _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false); - } else { - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false); - } - } + _bm_node->create_feed_handler(params, *_bm_cluster); + _feed_handler = _bm_node->get_feed_handler(); } void PersistenceProviderFixture::shutdown_feed_handler() { - _feed_handler.reset(); + _bm_node->shutdown_feed_handler(); + _feed_handler = nullptr; } void PersistenceProviderFixture::shutdown_message_bus() { - if (_message_bus) { - LOG(info, "stop message bus"); - _message_bus.reset(); - } + _bm_cluster->stop_message_bus(); } void PersistenceProviderFixture::shutdown_distributor() { - if (_distributor) { - LOG(info, "stop distributor"); - _distributor->getNode().requestShutdown("controlled shutdown"); - _distributor->shutdown(); - } + _bm_node->shutdown_distributor(); } void PersistenceProviderFixture::shutdown_service_layer() { - if (_rpc_client_shared_rpc_resources) { - LOG(info, "stop rpc client shared resources"); - _rpc_client_shared_rpc_resources->shutdown(); - _rpc_client_shared_rpc_resources.reset(); - } - if (_service_layer) { - LOG(info, "stop service layer"); - _service_layer->getNode().requestShutdown("controlled shutdown"); - _service_layer->shutdown(); - } - if (_slobrok) { - LOG(info, "stop slobrok"); - _slobrok.reset(); - } -} - -vespalib::nbostream -make_put_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) -{ - vespalib::nbostream serialized_feed; - LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end()); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto n = bucket_selector(i); - serialized_feed << f.make_bucket_id(n); - auto document = f.make_document(n, i); - document->serialize(serialized_feed); - } - return serialized_feed; + _bm_cluster->stop_rpc_client(); + _bm_node->shutdown_service_layer(); + _bm_cluster->stop_slobrok(); } std::vector<vespalib::nbostream> -make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) +make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) { LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents()); std::vector<vespalib::nbostream> serialized_feed_v; @@ -1038,27 +264,6 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st return serialized_feed_v; } -void -put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - f._feed_handler->attach_bucket_info_queue(pending_tracker); - auto &repo = *f._repo; - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - auto bucket_space = f._bucket_space; - bool use_timestamp = !f._feed_handler->manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - auto document = std::make_unique<Document>(repo, is); - f._feed_handler->put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - class AvgSampler { private: double _total; @@ -1077,73 +282,42 @@ void run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { put_async_task(f, max_pending, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { feed.put_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput); time_bias += bm_params.get_documents(); } -vespalib::nbostream -make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) -{ - vespalib::nbostream serialized_feed; - LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto n = bucket_selector(i); - serialized_feed << f.make_bucket_id(n); - auto document_update = f.make_document_update(n, i); - document_update->serializeHEAD(serialized_feed); - } - return serialized_feed; -} - -void -update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - f._feed_handler->attach_bucket_info_queue(pending_tracker); - auto &repo = *f._repo; - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - auto bucket_space = f._bucket_space; - bool use_timestamp = !f._feed_handler->manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - auto document_update = DocumentUpdate::createHEAD(repo, is); - f._feed_handler->update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - void run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { update_async_task(f, max_pending, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { feed.update_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput); @@ -1151,94 +325,44 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu } void -get_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed) -{ - LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - vespalib::string all_fields(document::AllFields::NAME); - auto bucket_space = f._bucket_space; - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - DocumentId document_id(is); - f._feed_handler->get(bucket, all_fields, document_id, pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - -void run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]() - { get_async_task(f, max_pending, range, serialized_feed); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]() + { feed.get_async_task(feed_handler, max_pending, range, serialized_feed); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput); } -vespalib::nbostream -make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) -{ - vespalib::nbostream serialized_feed; - LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto n = bucket_selector(i); - serialized_feed << f.make_bucket_id(n); - auto document_id = f.make_document_id(n, i); - vespalib::string raw_id = document_id.toString(); - serialized_feed.write(raw_id.c_str(), raw_id.size() + 1); - } - return serialized_feed; -} - -void -remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - f._feed_handler->attach_bucket_info_queue(pending_tracker); - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - auto bucket_space = f._bucket_space; - bool use_timestamp = !f._feed_handler->manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - DocumentId document_id(is); - f._feed_handler->remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - void run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { remove_async_task(f, max_pending, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { feed.remove_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput); @@ -1310,10 +434,10 @@ void benchmark_async_spi(const BMParams &bm_params) { vespalib::rmdir(base_dir, true); PersistenceProviderFixture f(bm_params); - auto &provider = *f._persistence_engine; + auto &provider = *f.get_persistence_provider(); LOG(info, "start initialize"); provider.initialize(); - LOG(info, "create %u buckets", f.num_buckets()); + LOG(info, "create %u buckets", f._feed.num_buckets()); if (!bm_params.needs_distributor()) { f.create_buckets(); } @@ -1328,9 +452,10 @@ void benchmark_async_spi(const BMParams &bm_params) } f.create_feed_handler(bm_params); vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki); - auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put"); - auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update"); - auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove"); + auto& feed = f._feed; + auto put_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put"); + auto update_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update"); + auto remove_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove"); int64_t time_bias = 1; LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str()); benchmark_async_put(f, executor, time_bias, put_feed, bm_params); |