diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-02 14:52:10 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-02 14:52:10 +0200 |
commit | 9b9235fc145b531a6fc776ed1498a1edfe597f4c (patch) | |
tree | f4f633caafa42fe4b32da83078d72787ec470b54 | |
parent | 188521786cb0d181acf31122ba3803572f86c322 (diff) | |
parent | 08a8d0863a2b7cbb4100bc3e00f078a7961ad3e1 (diff) |
Merge pull request #14677 from vespa-engine/toregge/enable-mbus-feeding-for-vespa-feed-bm
Enable feeding over message bus for vespa-feed-bm
17 files changed, 677 insertions, 83 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index b5465b1f0dd..cf18be8f5de 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -3,11 +3,15 @@ vespa_add_executable(searchcore_vespa_feed_bm_app SOURCES vespa_feed_bm.cpp bm_cluster_controller.cpp + bm_message_bus.cpp bm_storage_chain_builder.cpp bm_storage_link.cpp + document_api_message_bus_bm_feed_handler.cpp + pending_tracker_hash.cpp spi_bm_feed_handler.cpp - storage_api_rpc_bm_feed_handler.cpp storage_api_chain_bm_feed_handler.cpp + storage_api_message_bus_bm_feed_handler.cpp + storage_api_rpc_bm_feed_handler.cpp storage_reply_error_checker.cpp OUTPUT_NAME vespa-feed-bm DEPENDS diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp new file mode 100644 index 00000000000..ec50a4c7c01 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp @@ -0,0 +1,180 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_message_bus.h" +#include "pending_tracker_hash.h" +#include "pending_tracker.h" +#include "storage_reply_error_checker.h" +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/network/rpcnetworkparams.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/ireplyhandler.h> +#include <vespa/documentapi/messagebus/documentprotocol.h> +#include <vespa/documentapi/messagebus/messages/documentmessage.h> +#include <vespa/storageapi/mbusprot/storageprotocol.h> +#include <vespa/storageapi/mbusprot/storagereply.h> +#include <vespa/vespalib/stllike/asciistream.h> + +#include <vespa/log/log.h> +LOG_SETUP(".bm_message_bus"); + +using documentapi::DocumentProtocol; +using mbus::RPCMessageBus; +using mbus::Reply; +using mbus::SourceSession; +using storage::mbusprot::StorageProtocol; +using storage::mbusprot::StorageReply; + +namespace feedbm { + +namespace { + +std::atomic<uint64_t> bm_message_bus_msg_id(0u); + +vespalib::string reply_as_string(Reply &reply) { + vespalib::asciistream os; + if (reply.getType() == 0) { + os << "empty reply"; + } else { + os << "reply=" << reply.toString() << ", protocol=" << reply.getProtocol(); + } + os << ", "; + auto message = reply.getMessage(); + if (message) { + os << "message=" << message->toString(); + os << ", protocol=" << message->getProtocol(); + } else { + os << "no message"; + } + reply.setMessage(std::move(message)); + os << ", "; + if (reply.hasErrors()) { + os << "errors=["; + for (uint32_t i = 0; i < reply.getNumErrors(); ++i) { + auto &error = reply.getError(i); + if (i > 0) { + os << ", "; + } + os << mbus::ErrorCode::getName(error.getCode()) << ": " << error.getMessage() << " (from " << error.getService() << ")"; + } + os << "]"; + } else { + os << "no errors"; + } + return os.str(); +} + +} + +class BmMessageBus::ReplyHandler : public mbus::IReplyHandler, + public StorageReplyErrorChecker +{ + PendingTrackerHash _pending_hash; +public: + ReplyHandler(); + ~ReplyHandler() override; + void handleReply(std::unique_ptr<Reply> reply) override; + void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } + void message_aborted(uint64_t msg_id); +}; + +BmMessageBus::ReplyHandler::ReplyHandler() + : mbus::IReplyHandler(), + StorageReplyErrorChecker(), + _pending_hash() +{ +} + +BmMessageBus::ReplyHandler::~ReplyHandler() = default; + +void +BmMessageBus::ReplyHandler::handleReply(std::unique_ptr<Reply> reply) +{ + auto msg_id = reply->getContext().value.UINT64; + auto tracker = _pending_hash.release(msg_id); + if (tracker != nullptr) { + bool failed = false; + if (reply->getType() == 0 || reply->hasErrors()) { + failed = true; // empty reply or error + } else { + auto protocol = reply->getProtocol(); + if (protocol == DocumentProtocol::NAME) { + } else if (protocol == StorageProtocol::NAME) { + auto sreply = dynamic_cast<storage::mbusprot::StorageReply *>(reply.get()); + if (sreply != nullptr) { + check_error(*sreply->getReply()); + } else { + failed = true; // unexpected message type + } + } else { + failed = true; // unexpected protocol + } + } + if (failed) { + ++_errors; + LOG(error, "Unexpected %s", reply_as_string(*reply).c_str()); + } + tracker->release(); + } else { + ++_errors; + LOG(error, "Untracked %s", reply_as_string(*reply).c_str()); + } +} + +void +BmMessageBus::ReplyHandler::message_aborted(uint64_t msg_id) +{ + ++_errors; + auto tracker = _pending_hash.release(msg_id); + tracker->release(); +} + +BmMessageBus::BmMessageBus(const config::ConfigUri& config_uri, + std::shared_ptr<const document::DocumentTypeRepo> document_type_repo, + const documentapi::LoadTypeSet& load_types) + : _reply_handler(std::make_unique<ReplyHandler>()), + _message_bus(), + _session() +{ + mbus::RPCNetworkParams params(config_uri); + mbus::ProtocolSet protocol_set; + protocol_set.add(std::make_shared<DocumentProtocol>(load_types, document_type_repo)); + protocol_set.add(std::make_shared<StorageProtocol>(document_type_repo, load_types)); + params.setIdentity(mbus::Identity("vespa-bm-client")); + _message_bus = std::make_unique<mbus::RPCMessageBus>( + protocol_set, + params, + config_uri); + mbus::SourceSessionParams srcParams; + srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP()); + srcParams.setReplyHandler(*_reply_handler); + _session = _message_bus->getMessageBus().createSourceSession(srcParams); +} + +BmMessageBus::~BmMessageBus() +{ + _session.reset(); + _message_bus.reset(); + _reply_handler.reset(); +} + +uint32_t +BmMessageBus::get_error_count() const +{ + return _reply_handler->get_error_count(); +} + +void +BmMessageBus::send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker) +{ + auto msg_id = ++bm_message_bus_msg_id; + _reply_handler->retain(msg_id, tracker); + msg->setContext(mbus::Context(msg_id)); + msg->setRetryEnabled(false); + auto result = _session->send(std::move(msg), route); + if (!result.isAccepted()) { + LOG(error, "Message not accepeted, error is '%s'", result.getError().toString().c_str()); + _reply_handler->message_aborted(msg_id); + } +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h new file mode 100644 index 00000000000..9ebe394e9e6 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h @@ -0,0 +1,42 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <memory> + +namespace config { class ConfigUri; } +namespace document { class DocumentTypeRepo; } +namespace documentapi { class LoadTypeSet; } + +namespace mbus { + +class Message; +class RPCMessageBus; +class Route; +class SourceSession; + +} + +namespace feedbm { + +class PendingTracker; + +/* + * Message bus for feed benchmark program. + */ +class BmMessageBus +{ + class ReplyHandler; + std::unique_ptr<ReplyHandler> _reply_handler; + std::unique_ptr<mbus::RPCMessageBus> _message_bus; + std::unique_ptr<mbus::SourceSession> _session; +public: + BmMessageBus(const config::ConfigUri& config_uri, + std::shared_ptr<const document::DocumentTypeRepo> document_type_repo, + const documentapi::LoadTypeSet& load_types); + ~BmMessageBus(); + uint32_t get_error_count() const; + void send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp index 79517e98094..2aeda91c30c 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp @@ -2,45 +2,18 @@ #include "bm_storage_link.h" #include "pending_tracker.h" -#include <vespa/vespalib/stllike/hash_map.hpp> namespace feedbm { -void -BmStorageLink::retain(uint64_t msg_id, PendingTracker &tracker) -{ - tracker.retain(); - std::lock_guard lock(_mutex); - _pending.insert(std::make_pair(msg_id, &tracker)); -} - -PendingTracker * -BmStorageLink::release(uint64_t msg_id) -{ - std::lock_guard lock(_mutex); - auto itr = _pending.find(msg_id); - if (itr == _pending.end()) { - return nullptr; - } - auto tracker = itr->second; - _pending.erase(itr); - return tracker; -} - BmStorageLink::BmStorageLink() : storage::StorageLink("vespa-bm-feed"), StorageReplyErrorChecker(), - _mutex(), - _pending() + _pending_hash() { } -BmStorageLink::~BmStorageLink() -{ - std::lock_guard lock(_mutex); - assert(_pending.empty()); -} +BmStorageLink::~BmStorageLink() = default; bool BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) @@ -52,7 +25,7 @@ BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) bool BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) { - auto tracker = release(msg->getMsgId()); + auto tracker = _pending_hash.release(msg->getMsgId()); if (tracker != nullptr) { check_error(*msg); tracker->release(); diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h index 63ece355c02..95528d7b2d9 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h @@ -3,8 +3,8 @@ #pragma once #include "storage_reply_error_checker.h" +#include "pending_tracker_hash.h" #include <vespa/storage/common/storagelink.h> -#include <vespa/vespalib/stllike/hash_map.h> namespace feedbm { @@ -17,15 +17,13 @@ class PendingTracker; class BmStorageLink : public storage::StorageLink, public StorageReplyErrorChecker { - std::mutex _mutex; - vespalib::hash_map<uint64_t, PendingTracker *> _pending; - PendingTracker *release(uint64_t msg_id); + PendingTrackerHash _pending_hash; public: BmStorageLink(); ~BmStorageLink() override; bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override; bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override; - void retain(uint64_t msg_id, PendingTracker &tracker); + void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } }; } diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp new file mode 100644 index 00000000000..276a5a8136b --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp @@ -0,0 +1,82 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "document_api_message_bus_bm_feed_handler.h" +#include "bm_message_bus.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h> +#include <vespa/storageapi/messageapi/storagemessage.h> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::api::StorageMessageAddress; +using storage::lib::NodeType; + +namespace feedbm { + +DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus) + : IBmFeedHandler(), + _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")), + _storage_address(std::make_unique<StorageMessageAddress>("storage", NodeType::DISTRIBUTOR, 0)), + _message_bus(message_bus) +{ +} + +DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default; + +void +DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker) +{ + _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); +} + +void +DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker) +{ + auto msg = std::make_unique<documentapi::PutDocumentMessage>(std::move(document)); + send_msg(std::move(msg), tracker); +} + +void +DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker) +{ + auto msg = std::make_unique<documentapi::UpdateDocumentMessage>(std::move(document_update)); + send_msg(std::move(msg), tracker); +} + +void +DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker) +{ + auto msg = std::make_unique<documentapi::RemoveDocumentMessage>(document_id); + send_msg(std::move(msg), tracker); +} + +uint32_t +DocumentApiMessageBusBmFeedHandler::get_error_count() const +{ + return _message_bus.get_error_count(); +} + +const vespalib::string& +DocumentApiMessageBusBmFeedHandler::get_name() const +{ + return _name; +} + +bool +DocumentApiMessageBusBmFeedHandler::manages_buckets() const +{ + return true; +} + +bool +DocumentApiMessageBusBmFeedHandler::manages_timestamp() const +{ + return true; +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h new file mode 100644 index 00000000000..1e958da7900 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h @@ -0,0 +1,37 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace document { class DocumentTypeRepo; } +namespace documentapi { class DocumentMessage; }; +namespace storage::api { class StorageMessageAddress; } + +namespace feedbm { + +class BmMessageBus; + +/* + * Benchmark feed handler for feed to distributor using document api protocol + * over message bus. + */ +class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler +{ + vespalib::string _name; + std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; + BmMessageBus& _message_bus; + void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker); +public: + DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus); + ~DocumentApiMessageBusBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + uint32_t get_error_count() const override; + const vespalib::string &get_name() const override; + bool manages_buckets() const override; + bool manages_timestamp() const override; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp new file mode 100644 index 00000000000..6863d35703e --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp @@ -0,0 +1,43 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "pending_tracker_hash.h" +#include "pending_tracker.h" +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <cassert> + +namespace feedbm { + +PendingTrackerHash::PendingTrackerHash() + : _mutex(), + _pending() +{ +} + +PendingTrackerHash::~PendingTrackerHash() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +void +PendingTrackerHash::retain(uint64_t msg_id, PendingTracker &tracker) +{ + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); +} + +PendingTracker * +PendingTrackerHash::release(uint64_t msg_id) +{ + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + if (itr == _pending.end()) { + return nullptr; + } + auto tracker = itr->second; + _pending.erase(itr); + return tracker; +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h new file mode 100644 index 00000000000..89be93fd4ed --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/hash_map.h> +#include <mutex> + +namespace feedbm { + +class PendingTracker; + +/* + * Class maintaing mapping from message id to pending tracker + */ +class PendingTrackerHash +{ + std::mutex _mutex; + vespalib::hash_map<uint64_t, PendingTracker *> _pending; +public: + PendingTrackerHash(); + ~PendingTrackerHash(); + PendingTracker *release(uint64_t msg_id); + void retain(uint64_t msg_id, PendingTracker &tracker); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp index 2d45caff152..0e73582fdd4 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp @@ -33,7 +33,7 @@ std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() { StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor) : IBmFeedHandler(), - _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"), + _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), _context(std::move(context)) { diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h index 089a7fd89e5..f877a244726 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h @@ -11,8 +11,8 @@ namespace feedbm { struct BmStorageLinkContext; /* - * Benchmark feed handler for feed to service layer using storage api protocol - * directly on the storage chain. + * Benchmark feed handler for feed to service layer or distributor + * using storage api protocol directly on the storage chain. */ class StorageApiChainBmFeedHandler : public IBmFeedHandler { diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp new file mode 100644 index 00000000000..a0a3eb5c6db --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp @@ -0,0 +1,84 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_message_bus_bm_feed_handler.h" +#include "bm_message_bus.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/mbusprot/storagecommand.h> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::api::StorageMessageAddress; +using storage::lib::NodeType; + +namespace feedbm { + +StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor) + : IBmFeedHandler(), + _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), + _distributor(distributor), + _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), + _message_bus(message_bus) +{ +} + +StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default; + +void +StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +{ + cmd->setSourceIndex(0); + auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd); + _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); +} + +void +StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); + send_msg(std::move(cmd), tracker); +} + +uint32_t +StorageApiMessageBusBmFeedHandler::get_error_count() const +{ + return _message_bus.get_error_count(); +} + +const vespalib::string& +StorageApiMessageBusBmFeedHandler::get_name() const +{ + return _name; +} + +bool +StorageApiMessageBusBmFeedHandler::manages_buckets() const +{ + return _distributor; +} + +bool +StorageApiMessageBusBmFeedHandler::manages_timestamp() const +{ + return _distributor; +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h new file mode 100644 index 00000000000..84e69053289 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h @@ -0,0 +1,41 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace document { class DocumentTypeRepo; } +namespace documentapi { class DocumentMessage; }; +namespace storage::api { +class StorageCommand; +class StorageMessageAddress; +} + +namespace feedbm { + +class BmMessageBus; + +/* + * Benchmark feed handler for feed to service layer or distributor + * using storage api protocol over message bus. + */ +class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler +{ + vespalib::string _name; + bool _distributor; + std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; + BmMessageBus& _message_bus; + void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); +public: + StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor); + ~StorageApiMessageBusBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + uint32_t get_error_count() const override; + const vespalib::string &get_name() const override; + bool manages_buckets() const override; + bool manages_timestamp() const override; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp index 53e833f3e24..9fe6662b79a 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp @@ -2,6 +2,7 @@ #include "storage_api_rpc_bm_feed_handler.h" #include "pending_tracker.h" +#include "pending_tracker_hash.h" #include "storage_reply_error_checker.h" #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> @@ -12,8 +13,6 @@ #include <vespa/storage/storageserver/rpc/message_codec_provider.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> -#include <vespa/vespalib/stllike/hash_map.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <cassert> using document::Document; @@ -30,14 +29,12 @@ namespace feedbm { class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher, public StorageReplyErrorChecker { - std::mutex _mutex; - vespalib::hash_map<uint64_t, PendingTracker *> _pending; + PendingTrackerHash _pending_hash; public: MyMessageDispatcher() : storage::MessageDispatcher(), StorageReplyErrorChecker(), - _mutex(), - _pending() + _pending_hash() { } ~MyMessageDispatcher() override; @@ -49,28 +46,19 @@ public: check_error(*msg); release(msg->getMsgId()); } - void retain(uint64_t msg_id, PendingTracker &tracker) { - tracker.retain(); - std::lock_guard lock(_mutex); - _pending.insert(std::make_pair(msg_id, &tracker)); - } + void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } void release(uint64_t msg_id) { - PendingTracker *tracker = nullptr; - { - std::lock_guard lock(_mutex); - auto itr = _pending.find(msg_id); - assert(itr != _pending.end()); - tracker = itr->second; - _pending.erase(itr); + auto tracker = _pending_hash.release(msg_id); + if (tracker != nullptr) { + tracker->release(); + } else { + ++_errors; } - tracker->release(); } }; StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() { - std::lock_guard lock(_mutex); - assert(_pending.empty()); } StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, @@ -78,7 +66,7 @@ StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& share const StorageApiRpcService::Params& rpc_params, bool distributor) : IBmFeedHandler(), - _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"), + _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), _shared_rpc_resources(shared_rpc_resources_in), diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h index 9901c21f174..535171c39e1 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h @@ -19,8 +19,8 @@ class SharedRpcResources; namespace feedbm { /* - * Benchmark feed handler for feed to service layer using storage api protocol - * over rpc. + * Benchmark feed handler for feed to service layer or distributor + * using storage api protocol over rpc. */ class StorageApiRpcBmFeedHandler : public IBmFeedHandler { diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h index 78004f3d787..4743367b426 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h @@ -9,6 +9,7 @@ namespace storage::api { class StorageMessage; } namespace feedbm { class StorageReplyErrorChecker { +protected: std::atomic<uint32_t> _errors; public: StorageReplyErrorChecker(); diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 3aada3cc958..dc8d787a778 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -1,12 +1,15 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bm_cluster_controller.h" +#include "bm_message_bus.h" #include "bm_storage_chain_builder.h" #include "bm_storage_link_context.h" #include "pending_tracker.h" #include "spi_bm_feed_handler.h" #include "storage_api_chain_bm_feed_handler.h" +#include "storage_api_message_bus_bm_feed_handler.h" #include "storage_api_rpc_bm_feed_handler.h" +#include "document_api_message_bus_bm_feed_handler.h" #include <tests/proton/common/dummydbowner.h> #include <vespa/config-attributes.h> #include <vespa/config-bucketspaces.h> @@ -123,11 +126,14 @@ using document::FieldUpdate; using document::IntFieldValue; using document::test::makeBucketSpace; using feedbm::BmClusterController; +using feedbm::BmMessageBus; using feedbm::BmStorageChainBuilder; using feedbm::BmStorageLinkContext; using feedbm::IBmFeedHandler; +using feedbm::DocumentApiMessageBusBmFeedHandler; using feedbm::SpiBmFeedHandler; using feedbm::StorageApiChainBmFeedHandler; +using feedbm::StorageApiMessageBusBmFeedHandler; using feedbm::StorageApiRpcBmFeedHandler; using search::TuneFileDocumentDB; using search::index::DummyFileHeaderContext; @@ -193,6 +199,23 @@ std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<Docume doc_type_name.getName()); } +void +make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port) +{ + SlobroksConfigBuilder::Slobrok slobrok; + slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); + slobroks.slobrok.push_back(std::move(slobrok)); +} + +void +make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces) +{ + BucketspacesConfigBuilder::Documenttype bucket_space_map; + bucket_space_map.name = "test"; + bucket_space_map.bucketspace = "default"; + bucketspaces.documenttype.emplace_back(std::move(bucket_space_map)); +} + class MyPersistenceEngineOwner : public IPersistenceEngineOwner { void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { } @@ -245,6 +268,8 @@ class BMParams { uint32_t _response_threads; bool _enable_distributor; bool _enable_service_layer; + bool _use_document_api; + bool _use_message_bus; bool _use_storage_chain; bool _use_legacy_bucket_db; uint32_t get_start(uint32_t thread_id) const { @@ -261,6 +286,8 @@ public: _response_threads(2), // Same default as in stor-filestor.def _enable_distributor(false), _enable_service_layer(false), + _use_document_api(false), + _use_message_bus(false), _use_storage_chain(false), _use_legacy_bucket_db(false) { @@ -276,7 +303,8 @@ public: uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } uint32_t get_response_threads() const { return _response_threads; } bool get_enable_distributor() const { return _enable_distributor; } - bool get_enable_service_layer() const { return _enable_service_layer || _enable_distributor; } + bool get_use_document_api() const { return _use_document_api; } + bool get_use_message_bus() const { return _use_message_bus; } bool get_use_storage_chain() const { return _use_storage_chain; } bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; } void set_documents(uint32_t documents_in) { _documents = documents_in; } @@ -288,9 +316,14 @@ public: void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; } void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } + void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; } + void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; } void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; } bool check() const; + bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } + bool needs_distributor() const { return _enable_distributor || _use_document_api; } + bool needs_message_bus() const { return _use_message_bus || _use_document_api; } }; bool @@ -426,17 +459,14 @@ struct MyStorageConfig } else { stor_server.rootFolder = "storage"; } - { - SlobroksConfigBuilder::Slobrok slobrok; - slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); - slobroks.slobrok.push_back(std::move(slobrok)); - } + make_slobroks_config(slobroks, slobrok_port); stor_communicationmanager.useDirectStorageapiRpc = true; stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads(); stor_communicationmanager.mbusport = mbus_port; stor_communicationmanager.rpcport = rpc_port; stor_status.httpport = status_port; + make_bucketspaces_config(bucketspaces); } ~MyStorageConfig(); @@ -524,11 +554,7 @@ struct MyRpcClientConfig { : config_id(config_id_in), slobroks() { - { - SlobroksConfigBuilder::Slobrok slobrok; - slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); - slobroks.slobrok.push_back(std::move(slobrok)); - } + make_slobroks_config(slobroks, slobrok_port); } ~MyRpcClientConfig(); @@ -539,6 +565,28 @@ struct MyRpcClientConfig { MyRpcClientConfig::~MyRpcClientConfig() = default; +struct MyMessageBusConfig { + vespalib::string config_id; + SlobroksConfigBuilder slobroks; + MessagebusConfigBuilder messagebus; + + MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port) + : config_id(config_id_in), + slobroks(), + messagebus() + { + make_slobroks_config(slobroks, slobrok_port); + } + ~MyMessageBusConfig(); + + void add_builders(ConfigSet &set) { + set.addBuilder(config_id, &slobroks); + set.addBuilder(config_id, &messagebus); + } +}; + +MyMessageBusConfig::~MyMessageBusConfig() = default; + } struct PersistenceProviderFixture { @@ -576,6 +624,7 @@ struct PersistenceProviderFixture { MyServiceLayerConfig _service_layer_config; MyDistributorConfig _distributor_config; MyRpcClientConfig _rpc_client_config; + MyMessageBusConfig _message_bus_config; ConfigSet _config_set; std::shared_ptr<IConfigContext> _config_context; std::unique_ptr<IBmFeedHandler> _feed_handler; @@ -585,6 +634,7 @@ struct PersistenceProviderFixture { std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; std::shared_ptr<BmStorageLinkContext> _distributor_chain_context; std::unique_ptr<storage::DistributorProcess> _distributor; + std::unique_ptr<BmMessageBus> _message_bus; PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); @@ -599,8 +649,10 @@ struct PersistenceProviderFixture { void wait_slobrok(const vespalib::string &name); void start_service_layer(const BMParams& params); void start_distributor(const BMParams& params); + void start_message_bus(); void create_feed_handler(const BMParams& params); void shutdown_feed_handler(); + void shutdown_message_bus(); void shutdown_distributor(); void shutdown_service_layer(); }; @@ -640,6 +692,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), _rpc_client_config("bm-rpc-client", _slobrok_port), + _message_bus_config("bm-message-bus", _slobrok_port), _config_set(), _config_context(std::make_shared<ConfigContext>(_config_set)), _feed_handler(), @@ -648,7 +701,8 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _service_layer(), _rpc_client_shared_rpc_resources(), _distributor_chain_context(), - _distributor() + _distributor(), + _message_bus() { create_document_db(); _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false); @@ -657,6 +711,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _service_layer_config.add_builders(_config_set); _distributor_config.add_builders(_config_set); _rpc_client_config.add_builders(_config_set); + _message_bus_config.add_builders(_config_set); _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine); } @@ -772,7 +827,7 @@ PersistenceProviderFixture::start_service_layer(const BMParams& params) LOG(info, "start service layer"); config::ConfigUri config_uri("bm-servicelayer", _config_context); std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain() && !params.get_enable_distributor()) { + if (params.get_use_storage_chain() && !params.needs_distributor()) { chain_builder = std::make_unique<BmStorageChainBuilder>(); _service_layer_chain_context = chain_builder->get_context(); } @@ -797,7 +852,7 @@ PersistenceProviderFixture::start_distributor(const BMParams& params) { config::ConfigUri config_uri("bm-distributor", _config_context); std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain()) { + if (params.get_use_storage_chain() && !params.get_use_document_api()) { chain_builder = std::make_unique<BmStorageChainBuilder>(); _distributor_chain_context = chain_builder->get_context(); } @@ -816,24 +871,39 @@ PersistenceProviderFixture::start_distributor(const BMParams& params) } void +PersistenceProviderFixture::start_message_bus() +{ + config::ConfigUri config_uri("bm-message-bus", _config_context); + LOG(info, "Starting message bus"); + _message_bus = std::make_unique<BmMessageBus>(config_uri, + _repo, + documentapi::LoadTypeSet()); + LOG(info, "Started message bus"); +} + +void PersistenceProviderFixture::create_feed_handler(const BMParams& params) { StorageApiRpcService::Params rpc_params; // This is the same compression config as the default in stor-communicationmanager.def. rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024); - if (params.get_enable_distributor()) { + if (params.get_use_document_api()) { + _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus); + } else if (params.get_enable_distributor()) { if (params.get_use_storage_chain()) { assert(_distributor_chain_context); _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true); + } else if (params.get_use_message_bus()) { + _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true); } else { _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true); } - return; - } - if (params.get_enable_service_layer()) { + } else if (params.needs_service_layer()) { if (params.get_use_storage_chain()) { assert(_service_layer_chain_context); _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false); + } else if (params.get_use_message_bus()) { + _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false); } else { _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false); } @@ -847,6 +917,15 @@ PersistenceProviderFixture::shutdown_feed_handler() } void +PersistenceProviderFixture::shutdown_message_bus() +{ + if (_message_bus) { + LOG(info, "stop message bus"); + _message_bus.reset(); + } +} + +void PersistenceProviderFixture::shutdown_distributor() { if (_distributor) { @@ -1130,12 +1209,15 @@ void benchmark_async_spi(const BMParams &bm_params) if (!f._feed_handler->manages_buckets()) { f.create_buckets(); } - if (bm_params.get_enable_service_layer()) { + if (bm_params.needs_service_layer()) { f.start_service_layer(bm_params); } - if (bm_params.get_enable_distributor()) { + if (bm_params.needs_distributor()) { f.start_distributor(bm_params); } + if (bm_params.needs_message_bus()) { + f.start_message_bus(); + } f.create_feed_handler(bm_params); vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128 * 1024); auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put"); @@ -1149,6 +1231,7 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "--------------------------------"); f.shutdown_feed_handler(); + f.shutdown_message_bus(); f.shutdown_distributor(); f.shutdown_service_layer(); } @@ -1189,6 +1272,8 @@ App::usage() "[--response-threads threads]\n" "[--enable-distributor]\n" "[--enable-service-layer]\n" + "[--use-document-api]\n" + "[--use-message-bus\n" "[--use-storage-chain]\n" "[--use-legacy-bucket-db]" << std::endl; } @@ -1209,6 +1294,8 @@ App::get_options() { "response-threads", 1, nullptr, 0 }, { "enable-distributor", 0, nullptr, 0 }, { "enable-service-layer", 0, nullptr, 0 }, + { "use-document-api", 0, nullptr, 0 }, + { "use-message-bus", 0, nullptr, 0 }, { "use-storage-chain", 0, nullptr, 0 }, { "use-legacy-bucket-db", 0, nullptr, 0 } }; @@ -1222,6 +1309,8 @@ App::get_options() LONGOPT_RESPONSE_THREADS, LONGOPT_ENABLE_DISTRIBUTOR, LONGOPT_ENABLE_SERVICE_LAYER, + LONGOPT_USE_DOCUMENT_API, + LONGOPT_USE_MESSAGE_BUS, LONGOPT_USE_STORAGE_CHAIN, LONGOPT_USE_LEGACY_BUCKET_DB }; @@ -1258,6 +1347,12 @@ App::get_options() case LONGOPT_ENABLE_SERVICE_LAYER: _bm_params.set_enable_service_layer(true); break; + case LONGOPT_USE_DOCUMENT_API: + _bm_params.set_use_document_api(true); + break; + case LONGOPT_USE_MESSAGE_BUS: + _bm_params.set_use_message_bus(true); + break; case LONGOPT_USE_STORAGE_CHAIN: _bm_params.set_use_storage_chain(true); break; |