summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-02 14:52:10 +0200
committerGitHub <noreply@github.com>2020-10-02 14:52:10 +0200
commit9b9235fc145b531a6fc776ed1498a1edfe597f4c (patch)
treef4f633caafa42fe4b32da83078d72787ec470b54
parent188521786cb0d181acf31122ba3803572f86c322 (diff)
parent08a8d0863a2b7cbb4100bc3e00f078a7961ad3e1 (diff)
Merge pull request #14677 from vespa-engine/toregge/enable-mbus-feeding-for-vespa-feed-bm
Enable feeding over message bus for vespa-feed-bm
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt6
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp180
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_message_bus.h42
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp33
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.h8
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp82
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h37
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp43
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h26
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp2
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h4
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp84
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h41
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp32
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h4
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h1
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp135
17 files changed, 677 insertions, 83 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index b5465b1f0dd..cf18be8f5de 100644
--- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -3,11 +3,15 @@ vespa_add_executable(searchcore_vespa_feed_bm_app
SOURCES
vespa_feed_bm.cpp
bm_cluster_controller.cpp
+ bm_message_bus.cpp
bm_storage_chain_builder.cpp
bm_storage_link.cpp
+ document_api_message_bus_bm_feed_handler.cpp
+ pending_tracker_hash.cpp
spi_bm_feed_handler.cpp
- storage_api_rpc_bm_feed_handler.cpp
storage_api_chain_bm_feed_handler.cpp
+ storage_api_message_bus_bm_feed_handler.cpp
+ storage_api_rpc_bm_feed_handler.cpp
storage_reply_error_checker.cpp
OUTPUT_NAME vespa-feed-bm
DEPENDS
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp
new file mode 100644
index 00000000000..ec50a4c7c01
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp
@@ -0,0 +1,180 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_message_bus.h"
+#include "pending_tracker_hash.h"
+#include "pending_tracker.h"
+#include "storage_reply_error_checker.h"
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/ireplyhandler.h>
+#include <vespa/documentapi/messagebus/documentprotocol.h>
+#include <vespa/documentapi/messagebus/messages/documentmessage.h>
+#include <vespa/storageapi/mbusprot/storageprotocol.h>
+#include <vespa/storageapi/mbusprot/storagereply.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bm_message_bus");
+
+using documentapi::DocumentProtocol;
+using mbus::RPCMessageBus;
+using mbus::Reply;
+using mbus::SourceSession;
+using storage::mbusprot::StorageProtocol;
+using storage::mbusprot::StorageReply;
+
+namespace feedbm {
+
+namespace {
+
+std::atomic<uint64_t> bm_message_bus_msg_id(0u);
+
+vespalib::string reply_as_string(Reply &reply) {
+ vespalib::asciistream os;
+ if (reply.getType() == 0) {
+ os << "empty reply";
+ } else {
+ os << "reply=" << reply.toString() << ", protocol=" << reply.getProtocol();
+ }
+ os << ", ";
+ auto message = reply.getMessage();
+ if (message) {
+ os << "message=" << message->toString();
+ os << ", protocol=" << message->getProtocol();
+ } else {
+ os << "no message";
+ }
+ reply.setMessage(std::move(message));
+ os << ", ";
+ if (reply.hasErrors()) {
+ os << "errors=[";
+ for (uint32_t i = 0; i < reply.getNumErrors(); ++i) {
+ auto &error = reply.getError(i);
+ if (i > 0) {
+ os << ", ";
+ }
+ os << mbus::ErrorCode::getName(error.getCode()) << ": " << error.getMessage() << " (from " << error.getService() << ")";
+ }
+ os << "]";
+ } else {
+ os << "no errors";
+ }
+ return os.str();
+}
+
+}
+
+class BmMessageBus::ReplyHandler : public mbus::IReplyHandler,
+ public StorageReplyErrorChecker
+{
+ PendingTrackerHash _pending_hash;
+public:
+ ReplyHandler();
+ ~ReplyHandler() override;
+ void handleReply(std::unique_ptr<Reply> reply) override;
+ void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
+ void message_aborted(uint64_t msg_id);
+};
+
+BmMessageBus::ReplyHandler::ReplyHandler()
+ : mbus::IReplyHandler(),
+ StorageReplyErrorChecker(),
+ _pending_hash()
+{
+}
+
+BmMessageBus::ReplyHandler::~ReplyHandler() = default;
+
+void
+BmMessageBus::ReplyHandler::handleReply(std::unique_ptr<Reply> reply)
+{
+ auto msg_id = reply->getContext().value.UINT64;
+ auto tracker = _pending_hash.release(msg_id);
+ if (tracker != nullptr) {
+ bool failed = false;
+ if (reply->getType() == 0 || reply->hasErrors()) {
+ failed = true; // empty reply or error
+ } else {
+ auto protocol = reply->getProtocol();
+ if (protocol == DocumentProtocol::NAME) {
+ } else if (protocol == StorageProtocol::NAME) {
+ auto sreply = dynamic_cast<storage::mbusprot::StorageReply *>(reply.get());
+ if (sreply != nullptr) {
+ check_error(*sreply->getReply());
+ } else {
+ failed = true; // unexpected message type
+ }
+ } else {
+ failed = true; // unexpected protocol
+ }
+ }
+ if (failed) {
+ ++_errors;
+ LOG(error, "Unexpected %s", reply_as_string(*reply).c_str());
+ }
+ tracker->release();
+ } else {
+ ++_errors;
+ LOG(error, "Untracked %s", reply_as_string(*reply).c_str());
+ }
+}
+
+void
+BmMessageBus::ReplyHandler::message_aborted(uint64_t msg_id)
+{
+ ++_errors;
+ auto tracker = _pending_hash.release(msg_id);
+ tracker->release();
+}
+
+BmMessageBus::BmMessageBus(const config::ConfigUri& config_uri,
+ std::shared_ptr<const document::DocumentTypeRepo> document_type_repo,
+ const documentapi::LoadTypeSet& load_types)
+ : _reply_handler(std::make_unique<ReplyHandler>()),
+ _message_bus(),
+ _session()
+{
+ mbus::RPCNetworkParams params(config_uri);
+ mbus::ProtocolSet protocol_set;
+ protocol_set.add(std::make_shared<DocumentProtocol>(load_types, document_type_repo));
+ protocol_set.add(std::make_shared<StorageProtocol>(document_type_repo, load_types));
+ params.setIdentity(mbus::Identity("vespa-bm-client"));
+ _message_bus = std::make_unique<mbus::RPCMessageBus>(
+ protocol_set,
+ params,
+ config_uri);
+ mbus::SourceSessionParams srcParams;
+ srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP());
+ srcParams.setReplyHandler(*_reply_handler);
+ _session = _message_bus->getMessageBus().createSourceSession(srcParams);
+}
+
+BmMessageBus::~BmMessageBus()
+{
+ _session.reset();
+ _message_bus.reset();
+ _reply_handler.reset();
+}
+
+uint32_t
+BmMessageBus::get_error_count() const
+{
+ return _reply_handler->get_error_count();
+}
+
+void
+BmMessageBus::send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker)
+{
+ auto msg_id = ++bm_message_bus_msg_id;
+ _reply_handler->retain(msg_id, tracker);
+ msg->setContext(mbus::Context(msg_id));
+ msg->setRetryEnabled(false);
+ auto result = _session->send(std::move(msg), route);
+ if (!result.isAccepted()) {
+ LOG(error, "Message not accepeted, error is '%s'", result.getError().toString().c_str());
+ _reply_handler->message_aborted(msg_id);
+ }
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h
new file mode 100644
index 00000000000..9ebe394e9e6
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h
@@ -0,0 +1,42 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+
+namespace config { class ConfigUri; }
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class LoadTypeSet; }
+
+namespace mbus {
+
+class Message;
+class RPCMessageBus;
+class Route;
+class SourceSession;
+
+}
+
+namespace feedbm {
+
+class PendingTracker;
+
+/*
+ * Message bus for feed benchmark program.
+ */
+class BmMessageBus
+{
+ class ReplyHandler;
+ std::unique_ptr<ReplyHandler> _reply_handler;
+ std::unique_ptr<mbus::RPCMessageBus> _message_bus;
+ std::unique_ptr<mbus::SourceSession> _session;
+public:
+ BmMessageBus(const config::ConfigUri& config_uri,
+ std::shared_ptr<const document::DocumentTypeRepo> document_type_repo,
+ const documentapi::LoadTypeSet& load_types);
+ ~BmMessageBus();
+ uint32_t get_error_count() const;
+ void send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
index 79517e98094..2aeda91c30c 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
@@ -2,45 +2,18 @@
#include "bm_storage_link.h"
#include "pending_tracker.h"
-#include <vespa/vespalib/stllike/hash_map.hpp>
namespace feedbm {
-void
-BmStorageLink::retain(uint64_t msg_id, PendingTracker &tracker)
-{
- tracker.retain();
- std::lock_guard lock(_mutex);
- _pending.insert(std::make_pair(msg_id, &tracker));
-}
-
-PendingTracker *
-BmStorageLink::release(uint64_t msg_id)
-{
- std::lock_guard lock(_mutex);
- auto itr = _pending.find(msg_id);
- if (itr == _pending.end()) {
- return nullptr;
- }
- auto tracker = itr->second;
- _pending.erase(itr);
- return tracker;
-}
-
BmStorageLink::BmStorageLink()
: storage::StorageLink("vespa-bm-feed"),
StorageReplyErrorChecker(),
- _mutex(),
- _pending()
+ _pending_hash()
{
}
-BmStorageLink::~BmStorageLink()
-{
- std::lock_guard lock(_mutex);
- assert(_pending.empty());
-}
+BmStorageLink::~BmStorageLink() = default;
bool
BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
@@ -52,7 +25,7 @@ BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
bool
BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg)
{
- auto tracker = release(msg->getMsgId());
+ auto tracker = _pending_hash.release(msg->getMsgId());
if (tracker != nullptr) {
check_error(*msg);
tracker->release();
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
index 63ece355c02..95528d7b2d9 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
@@ -3,8 +3,8 @@
#pragma once
#include "storage_reply_error_checker.h"
+#include "pending_tracker_hash.h"
#include <vespa/storage/common/storagelink.h>
-#include <vespa/vespalib/stllike/hash_map.h>
namespace feedbm {
@@ -17,15 +17,13 @@ class PendingTracker;
class BmStorageLink : public storage::StorageLink,
public StorageReplyErrorChecker
{
- std::mutex _mutex;
- vespalib::hash_map<uint64_t, PendingTracker *> _pending;
- PendingTracker *release(uint64_t msg_id);
+ PendingTrackerHash _pending_hash;
public:
BmStorageLink();
~BmStorageLink() override;
bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
- void retain(uint64_t msg_id, PendingTracker &tracker);
+ void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
};
}
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
new file mode 100644
index 00000000000..276a5a8136b
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
@@ -0,0 +1,82 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "document_api_message_bus_bm_feed_handler.h"
+#include "bm_message_bus.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::api::StorageMessageAddress;
+using storage::lib::NodeType;
+
+namespace feedbm {
+
+DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus)
+ : IBmFeedHandler(),
+ _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")),
+ _storage_address(std::make_unique<StorageMessageAddress>("storage", NodeType::DISTRIBUTOR, 0)),
+ _message_bus(message_bus)
+{
+}
+
+DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default;
+
+void
+DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker)
+{
+ _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker);
+}
+
+void
+DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker)
+{
+ auto msg = std::make_unique<documentapi::PutDocumentMessage>(std::move(document));
+ send_msg(std::move(msg), tracker);
+}
+
+void
+DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker)
+{
+ auto msg = std::make_unique<documentapi::UpdateDocumentMessage>(std::move(document_update));
+ send_msg(std::move(msg), tracker);
+}
+
+void
+DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker)
+{
+ auto msg = std::make_unique<documentapi::RemoveDocumentMessage>(document_id);
+ send_msg(std::move(msg), tracker);
+}
+
+uint32_t
+DocumentApiMessageBusBmFeedHandler::get_error_count() const
+{
+ return _message_bus.get_error_count();
+}
+
+const vespalib::string&
+DocumentApiMessageBusBmFeedHandler::get_name() const
+{
+ return _name;
+}
+
+bool
+DocumentApiMessageBusBmFeedHandler::manages_buckets() const
+{
+ return true;
+}
+
+bool
+DocumentApiMessageBusBmFeedHandler::manages_timestamp() const
+{
+ return true;
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
new file mode 100644
index 00000000000..1e958da7900
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
@@ -0,0 +1,37 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class DocumentMessage; };
+namespace storage::api { class StorageMessageAddress; }
+
+namespace feedbm {
+
+class BmMessageBus;
+
+/*
+ * Benchmark feed handler for feed to distributor using document api protocol
+ * over message bus.
+ */
+class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler
+{
+ vespalib::string _name;
+ std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
+ BmMessageBus& _message_bus;
+ void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker);
+public:
+ DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus);
+ ~DocumentApiMessageBusBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+ uint32_t get_error_count() const override;
+ const vespalib::string &get_name() const override;
+ bool manages_buckets() const override;
+ bool manages_timestamp() const override;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp
new file mode 100644
index 00000000000..6863d35703e
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp
@@ -0,0 +1,43 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "pending_tracker_hash.h"
+#include "pending_tracker.h"
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <cassert>
+
+namespace feedbm {
+
+PendingTrackerHash::PendingTrackerHash()
+ : _mutex(),
+ _pending()
+{
+}
+
+PendingTrackerHash::~PendingTrackerHash()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
+}
+
+void
+PendingTrackerHash::retain(uint64_t msg_id, PendingTracker &tracker)
+{
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+}
+
+PendingTracker *
+PendingTrackerHash::release(uint64_t msg_id)
+{
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ if (itr == _pending.end()) {
+ return nullptr;
+ }
+ auto tracker = itr->second;
+ _pending.erase(itr);
+ return tracker;
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h
new file mode 100644
index 00000000000..89be93fd4ed
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h
@@ -0,0 +1,26 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <mutex>
+
+namespace feedbm {
+
+class PendingTracker;
+
+/*
+ * Class maintaing mapping from message id to pending tracker
+ */
+class PendingTrackerHash
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+public:
+ PendingTrackerHash();
+ ~PendingTrackerHash();
+ PendingTracker *release(uint64_t msg_id);
+ void retain(uint64_t msg_id, PendingTracker &tracker);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
index 2d45caff152..0e73582fdd4 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
@@ -33,7 +33,7 @@ std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() {
StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor)
: IBmFeedHandler(),
- _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"),
+ _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
_context(std::move(context))
{
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
index 089a7fd89e5..f877a244726 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
@@ -11,8 +11,8 @@ namespace feedbm {
struct BmStorageLinkContext;
/*
- * Benchmark feed handler for feed to service layer using storage api protocol
- * directly on the storage chain.
+ * Benchmark feed handler for feed to service layer or distributor
+ * using storage api protocol directly on the storage chain.
*/
class StorageApiChainBmFeedHandler : public IBmFeedHandler
{
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
new file mode 100644
index 00000000000..a0a3eb5c6db
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
@@ -0,0 +1,84 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_api_message_bus_bm_feed_handler.h"
+#include "bm_message_bus.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/mbusprot/storagecommand.h>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::api::StorageMessageAddress;
+using storage::lib::NodeType;
+
+namespace feedbm {
+
+StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor)
+ : IBmFeedHandler(),
+ _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
+ _distributor(distributor),
+ _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
+ _message_bus(message_bus)
+{
+}
+
+StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default;
+
+void
+StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+{
+ cmd->setSourceIndex(0);
+ auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd);
+ _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker);
+}
+
+void
+StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+uint32_t
+StorageApiMessageBusBmFeedHandler::get_error_count() const
+{
+ return _message_bus.get_error_count();
+}
+
+const vespalib::string&
+StorageApiMessageBusBmFeedHandler::get_name() const
+{
+ return _name;
+}
+
+bool
+StorageApiMessageBusBmFeedHandler::manages_buckets() const
+{
+ return _distributor;
+}
+
+bool
+StorageApiMessageBusBmFeedHandler::manages_timestamp() const
+{
+ return _distributor;
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
new file mode 100644
index 00000000000..84e69053289
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
@@ -0,0 +1,41 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class DocumentMessage; };
+namespace storage::api {
+class StorageCommand;
+class StorageMessageAddress;
+}
+
+namespace feedbm {
+
+class BmMessageBus;
+
+/*
+ * Benchmark feed handler for feed to service layer or distributor
+ * using storage api protocol over message bus.
+ */
+class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler
+{
+ vespalib::string _name;
+ bool _distributor;
+ std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
+ BmMessageBus& _message_bus;
+ void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+public:
+ StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor);
+ ~StorageApiMessageBusBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+ uint32_t get_error_count() const override;
+ const vespalib::string &get_name() const override;
+ bool manages_buckets() const override;
+ bool manages_timestamp() const override;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
index 53e833f3e24..9fe6662b79a 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
@@ -2,6 +2,7 @@
#include "storage_api_rpc_bm_feed_handler.h"
#include "pending_tracker.h"
+#include "pending_tracker_hash.h"
#include "storage_reply_error_checker.h"
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/update/documentupdate.h>
@@ -12,8 +13,6 @@
#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
-#include <vespa/vespalib/stllike/hash_map.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cassert>
using document::Document;
@@ -30,14 +29,12 @@ namespace feedbm {
class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher,
public StorageReplyErrorChecker
{
- std::mutex _mutex;
- vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+ PendingTrackerHash _pending_hash;
public:
MyMessageDispatcher()
: storage::MessageDispatcher(),
StorageReplyErrorChecker(),
- _mutex(),
- _pending()
+ _pending_hash()
{
}
~MyMessageDispatcher() override;
@@ -49,28 +46,19 @@ public:
check_error(*msg);
release(msg->getMsgId());
}
- void retain(uint64_t msg_id, PendingTracker &tracker) {
- tracker.retain();
- std::lock_guard lock(_mutex);
- _pending.insert(std::make_pair(msg_id, &tracker));
- }
+ void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
void release(uint64_t msg_id) {
- PendingTracker *tracker = nullptr;
- {
- std::lock_guard lock(_mutex);
- auto itr = _pending.find(msg_id);
- assert(itr != _pending.end());
- tracker = itr->second;
- _pending.erase(itr);
+ auto tracker = _pending_hash.release(msg_id);
+ if (tracker != nullptr) {
+ tracker->release();
+ } else {
+ ++_errors;
}
- tracker->release();
}
};
StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher()
{
- std::lock_guard lock(_mutex);
- assert(_pending.empty());
}
StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in,
@@ -78,7 +66,7 @@ StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& share
const StorageApiRpcService::Params& rpc_params,
bool distributor)
: IBmFeedHandler(),
- _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"),
+ _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
_storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
_shared_rpc_resources(shared_rpc_resources_in),
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
index 9901c21f174..535171c39e1 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
@@ -19,8 +19,8 @@ class SharedRpcResources;
namespace feedbm {
/*
- * Benchmark feed handler for feed to service layer using storage api protocol
- * over rpc.
+ * Benchmark feed handler for feed to service layer or distributor
+ * using storage api protocol over rpc.
*/
class StorageApiRpcBmFeedHandler : public IBmFeedHandler
{
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
index 78004f3d787..4743367b426 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
@@ -9,6 +9,7 @@ namespace storage::api { class StorageMessage; }
namespace feedbm {
class StorageReplyErrorChecker {
+protected:
std::atomic<uint32_t> _errors;
public:
StorageReplyErrorChecker();
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 3aada3cc958..dc8d787a778 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -1,12 +1,15 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bm_cluster_controller.h"
+#include "bm_message_bus.h"
#include "bm_storage_chain_builder.h"
#include "bm_storage_link_context.h"
#include "pending_tracker.h"
#include "spi_bm_feed_handler.h"
#include "storage_api_chain_bm_feed_handler.h"
+#include "storage_api_message_bus_bm_feed_handler.h"
#include "storage_api_rpc_bm_feed_handler.h"
+#include "document_api_message_bus_bm_feed_handler.h"
#include <tests/proton/common/dummydbowner.h>
#include <vespa/config-attributes.h>
#include <vespa/config-bucketspaces.h>
@@ -123,11 +126,14 @@ using document::FieldUpdate;
using document::IntFieldValue;
using document::test::makeBucketSpace;
using feedbm::BmClusterController;
+using feedbm::BmMessageBus;
using feedbm::BmStorageChainBuilder;
using feedbm::BmStorageLinkContext;
using feedbm::IBmFeedHandler;
+using feedbm::DocumentApiMessageBusBmFeedHandler;
using feedbm::SpiBmFeedHandler;
using feedbm::StorageApiChainBmFeedHandler;
+using feedbm::StorageApiMessageBusBmFeedHandler;
using feedbm::StorageApiRpcBmFeedHandler;
using search::TuneFileDocumentDB;
using search::index::DummyFileHeaderContext;
@@ -193,6 +199,23 @@ std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<Docume
doc_type_name.getName());
}
+void
+make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port)
+{
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+}
+
+void
+make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces)
+{
+ BucketspacesConfigBuilder::Documenttype bucket_space_map;
+ bucket_space_map.name = "test";
+ bucket_space_map.bucketspace = "default";
+ bucketspaces.documenttype.emplace_back(std::move(bucket_space_map));
+}
+
class MyPersistenceEngineOwner : public IPersistenceEngineOwner
{
void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { }
@@ -245,6 +268,8 @@ class BMParams {
uint32_t _response_threads;
bool _enable_distributor;
bool _enable_service_layer;
+ bool _use_document_api;
+ bool _use_message_bus;
bool _use_storage_chain;
bool _use_legacy_bucket_db;
uint32_t get_start(uint32_t thread_id) const {
@@ -261,6 +286,8 @@ public:
_response_threads(2), // Same default as in stor-filestor.def
_enable_distributor(false),
_enable_service_layer(false),
+ _use_document_api(false),
+ _use_message_bus(false),
_use_storage_chain(false),
_use_legacy_bucket_db(false)
{
@@ -276,7 +303,8 @@ public:
uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
uint32_t get_response_threads() const { return _response_threads; }
bool get_enable_distributor() const { return _enable_distributor; }
- bool get_enable_service_layer() const { return _enable_service_layer || _enable_distributor; }
+ bool get_use_document_api() const { return _use_document_api; }
+ bool get_use_message_bus() const { return _use_message_bus; }
bool get_use_storage_chain() const { return _use_storage_chain; }
bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
@@ -288,9 +316,14 @@ public:
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; }
void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
+ void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; }
+ void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; }
void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; }
void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; }
bool check() const;
+ bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
+ bool needs_distributor() const { return _enable_distributor || _use_document_api; }
+ bool needs_message_bus() const { return _use_message_bus || _use_document_api; }
};
bool
@@ -426,17 +459,14 @@ struct MyStorageConfig
} else {
stor_server.rootFolder = "storage";
}
- {
- SlobroksConfigBuilder::Slobrok slobrok;
- slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
- slobroks.slobrok.push_back(std::move(slobrok));
- }
+ make_slobroks_config(slobroks, slobrok_port);
stor_communicationmanager.useDirectStorageapiRpc = true;
stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
stor_communicationmanager.mbusport = mbus_port;
stor_communicationmanager.rpcport = rpc_port;
stor_status.httpport = status_port;
+ make_bucketspaces_config(bucketspaces);
}
~MyStorageConfig();
@@ -524,11 +554,7 @@ struct MyRpcClientConfig {
: config_id(config_id_in),
slobroks()
{
- {
- SlobroksConfigBuilder::Slobrok slobrok;
- slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
- slobroks.slobrok.push_back(std::move(slobrok));
- }
+ make_slobroks_config(slobroks, slobrok_port);
}
~MyRpcClientConfig();
@@ -539,6 +565,28 @@ struct MyRpcClientConfig {
MyRpcClientConfig::~MyRpcClientConfig() = default;
+struct MyMessageBusConfig {
+ vespalib::string config_id;
+ SlobroksConfigBuilder slobroks;
+ MessagebusConfigBuilder messagebus;
+
+ MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port)
+ : config_id(config_id_in),
+ slobroks(),
+ messagebus()
+ {
+ make_slobroks_config(slobroks, slobrok_port);
+ }
+ ~MyMessageBusConfig();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &slobroks);
+ set.addBuilder(config_id, &messagebus);
+ }
+};
+
+MyMessageBusConfig::~MyMessageBusConfig() = default;
+
}
struct PersistenceProviderFixture {
@@ -576,6 +624,7 @@ struct PersistenceProviderFixture {
MyServiceLayerConfig _service_layer_config;
MyDistributorConfig _distributor_config;
MyRpcClientConfig _rpc_client_config;
+ MyMessageBusConfig _message_bus_config;
ConfigSet _config_set;
std::shared_ptr<IConfigContext> _config_context;
std::unique_ptr<IBmFeedHandler> _feed_handler;
@@ -585,6 +634,7 @@ struct PersistenceProviderFixture {
std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
std::unique_ptr<storage::DistributorProcess> _distributor;
+ std::unique_ptr<BmMessageBus> _message_bus;
PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
@@ -599,8 +649,10 @@ struct PersistenceProviderFixture {
void wait_slobrok(const vespalib::string &name);
void start_service_layer(const BMParams& params);
void start_distributor(const BMParams& params);
+ void start_message_bus();
void create_feed_handler(const BMParams& params);
void shutdown_feed_handler();
+ void shutdown_message_bus();
void shutdown_distributor();
void shutdown_service_layer();
};
@@ -640,6 +692,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
_distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
_rpc_client_config("bm-rpc-client", _slobrok_port),
+ _message_bus_config("bm-message-bus", _slobrok_port),
_config_set(),
_config_context(std::make_shared<ConfigContext>(_config_set)),
_feed_handler(),
@@ -648,7 +701,8 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_service_layer(),
_rpc_client_shared_rpc_resources(),
_distributor_chain_context(),
- _distributor()
+ _distributor(),
+ _message_bus()
{
create_document_db();
_persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false);
@@ -657,6 +711,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_service_layer_config.add_builders(_config_set);
_distributor_config.add_builders(_config_set);
_rpc_client_config.add_builders(_config_set);
+ _message_bus_config.add_builders(_config_set);
_feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine);
}
@@ -772,7 +827,7 @@ PersistenceProviderFixture::start_service_layer(const BMParams& params)
LOG(info, "start service layer");
config::ConfigUri config_uri("bm-servicelayer", _config_context);
std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain() && !params.get_enable_distributor()) {
+ if (params.get_use_storage_chain() && !params.needs_distributor()) {
chain_builder = std::make_unique<BmStorageChainBuilder>();
_service_layer_chain_context = chain_builder->get_context();
}
@@ -797,7 +852,7 @@ PersistenceProviderFixture::start_distributor(const BMParams& params)
{
config::ConfigUri config_uri("bm-distributor", _config_context);
std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain()) {
+ if (params.get_use_storage_chain() && !params.get_use_document_api()) {
chain_builder = std::make_unique<BmStorageChainBuilder>();
_distributor_chain_context = chain_builder->get_context();
}
@@ -816,24 +871,39 @@ PersistenceProviderFixture::start_distributor(const BMParams& params)
}
void
+PersistenceProviderFixture::start_message_bus()
+{
+ config::ConfigUri config_uri("bm-message-bus", _config_context);
+ LOG(info, "Starting message bus");
+ _message_bus = std::make_unique<BmMessageBus>(config_uri,
+ _repo,
+ documentapi::LoadTypeSet());
+ LOG(info, "Started message bus");
+}
+
+void
PersistenceProviderFixture::create_feed_handler(const BMParams& params)
{
StorageApiRpcService::Params rpc_params;
// This is the same compression config as the default in stor-communicationmanager.def.
rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
- if (params.get_enable_distributor()) {
+ if (params.get_use_document_api()) {
+ _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus);
+ } else if (params.get_enable_distributor()) {
if (params.get_use_storage_chain()) {
assert(_distributor_chain_context);
_feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
+ } else if (params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true);
} else {
_feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true);
}
- return;
- }
- if (params.get_enable_service_layer()) {
+ } else if (params.needs_service_layer()) {
if (params.get_use_storage_chain()) {
assert(_service_layer_chain_context);
_feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
+ } else if (params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false);
} else {
_feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false);
}
@@ -847,6 +917,15 @@ PersistenceProviderFixture::shutdown_feed_handler()
}
void
+PersistenceProviderFixture::shutdown_message_bus()
+{
+ if (_message_bus) {
+ LOG(info, "stop message bus");
+ _message_bus.reset();
+ }
+}
+
+void
PersistenceProviderFixture::shutdown_distributor()
{
if (_distributor) {
@@ -1130,12 +1209,15 @@ void benchmark_async_spi(const BMParams &bm_params)
if (!f._feed_handler->manages_buckets()) {
f.create_buckets();
}
- if (bm_params.get_enable_service_layer()) {
+ if (bm_params.needs_service_layer()) {
f.start_service_layer(bm_params);
}
- if (bm_params.get_enable_distributor()) {
+ if (bm_params.needs_distributor()) {
f.start_distributor(bm_params);
}
+ if (bm_params.needs_message_bus()) {
+ f.start_message_bus();
+ }
f.create_feed_handler(bm_params);
vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128 * 1024);
auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put");
@@ -1149,6 +1231,7 @@ void benchmark_async_spi(const BMParams &bm_params)
LOG(info, "--------------------------------");
f.shutdown_feed_handler();
+ f.shutdown_message_bus();
f.shutdown_distributor();
f.shutdown_service_layer();
}
@@ -1189,6 +1272,8 @@ App::usage()
"[--response-threads threads]\n"
"[--enable-distributor]\n"
"[--enable-service-layer]\n"
+ "[--use-document-api]\n"
+ "[--use-message-bus\n"
"[--use-storage-chain]\n"
"[--use-legacy-bucket-db]" << std::endl;
}
@@ -1209,6 +1294,8 @@ App::get_options()
{ "response-threads", 1, nullptr, 0 },
{ "enable-distributor", 0, nullptr, 0 },
{ "enable-service-layer", 0, nullptr, 0 },
+ { "use-document-api", 0, nullptr, 0 },
+ { "use-message-bus", 0, nullptr, 0 },
{ "use-storage-chain", 0, nullptr, 0 },
{ "use-legacy-bucket-db", 0, nullptr, 0 }
};
@@ -1222,6 +1309,8 @@ App::get_options()
LONGOPT_RESPONSE_THREADS,
LONGOPT_ENABLE_DISTRIBUTOR,
LONGOPT_ENABLE_SERVICE_LAYER,
+ LONGOPT_USE_DOCUMENT_API,
+ LONGOPT_USE_MESSAGE_BUS,
LONGOPT_USE_STORAGE_CHAIN,
LONGOPT_USE_LEGACY_BUCKET_DB
};
@@ -1258,6 +1347,12 @@ App::get_options()
case LONGOPT_ENABLE_SERVICE_LAYER:
_bm_params.set_enable_service_layer(true);
break;
+ case LONGOPT_USE_DOCUMENT_API:
+ _bm_params.set_use_document_api(true);
+ break;
+ case LONGOPT_USE_MESSAGE_BUS:
+ _bm_params.set_use_message_bus(true);
+ break;
case LONGOPT_USE_STORAGE_CHAIN:
_bm_params.set_use_storage_chain(true);
break;