aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/apps
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-10 11:30:08 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-10 11:30:08 +0200
commitc8beeb61f595019c22ee6eca137ab47625c5712a (patch)
treeba034cbf4fcee3b077dfad266e24460c9c451938 /searchcore/src/apps
parentbb4ee4e9c053ca4f341eaa5490a850e13ea37f5c (diff)
Start moving portions of vespa-feed-bm app to searchcore_bmcluster library.
Diffstat (limited to 'searchcore/src/apps')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt34
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp57
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h21
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp179
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_message_bus.h40
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp34
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h26
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp37
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.h29
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h20
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp38
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h39
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp94
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h40
-rw-r--r--searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h37
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp55
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker.h37
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp43
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h26
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp158
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h37
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp108
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h38
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp94
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h44
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp143
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h53
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp33
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h21
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp1073
30 files changed, 100 insertions, 2588 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index fe83c89d83a..daefef5d413 100644
--- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -2,39 +2,7 @@
vespa_add_executable(searchcore_vespa_feed_bm_app
SOURCES
vespa_feed_bm.cpp
- bm_cluster_controller.cpp
- bm_message_bus.cpp
- bm_storage_chain_builder.cpp
- bm_storage_link.cpp
- bucket_info_queue.cpp
- document_api_message_bus_bm_feed_handler.cpp
- pending_tracker.cpp
- pending_tracker_hash.cpp
- spi_bm_feed_handler.cpp
- storage_api_chain_bm_feed_handler.cpp
- storage_api_message_bus_bm_feed_handler.cpp
- storage_api_rpc_bm_feed_handler.cpp
- storage_reply_error_checker.cpp
OUTPUT_NAME vespa-feed-bm
DEPENDS
- searchcore_server
- searchcore_initializer
- searchcore_reprocessing
- searchcore_index
- searchcore_persistenceengine
- searchcore_docsummary
- searchcore_feedoperation
- searchcore_matching
- searchcore_attribute
- searchcore_documentmetastore
- searchcore_bucketdb
- searchcore_flushengine
- searchcore_pcommon
- searchcore_grouping
- searchcore_proton_metrics
- searchcore_fconfig
- storageserver_storageapp
- messagebus_messagebus-test
- messagebus
- searchlib_searchlib_uca
+ searchcore_bmcluster
)
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
deleted file mode 100644
index a1b40c56e11..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
+++ /dev/null
@@ -1,57 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "bm_cluster_controller.h"
-#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
-#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
-#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
-#include <vespa/fnet/frt/target.h>
-#include <vespa/slobrok/sbmirror.h>
-
-using storage::api::StorageMessageAddress;
-using storage::rpc::SharedRpcResources;
-using storage::lib::NodeType;
-
-namespace feedbm {
-
-namespace {
-
-FRT_RPCRequest *
-make_set_cluster_state_request()
-{
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
- storage::rpc::SlimeClusterStateBundleCodec codec;
- auto encoded_bundle = codec.encode(bundle);
- auto *req = new FRT_RPCRequest();
- auto* params = req->GetParams();
- params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
- params->AddInt32(encoded_bundle._uncompressed_length);
- params->AddData(std::move(*encoded_bundle._buffer));
- req->SetMethodName("setdistributionstates");
- return req;
-}
-
-}
-
-BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in)
- : _shared_rpc_resources(shared_rpc_resources_in)
-{
-}
-
-void
-BmClusterController::set_cluster_up(bool distributor)
-{
- static vespalib::string _storage("storage");
- StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0);
- auto req = make_set_cluster_state_request();
- auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(),
- _shared_rpc_resources.target_factory(), 1);
- uint64_t fake_bucket_id = 0;
- auto target = target_resolver->resolve_rpc_target(storage_address, fake_bucket_id);
- target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
- assert(!req->IsError());
- req->SubRef();
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h
deleted file mode 100644
index 699036be5c9..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-namespace storage::api { class StorageMessageAddress; }
-namespace storage::rpc { class SharedRpcResources; }
-
-namespace feedbm {
-
-/*
- * Fake cluster controller that sets cluster state to be up.
- */
-class BmClusterController
-{
- storage::rpc::SharedRpcResources& _shared_rpc_resources;
-public:
- BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in);
- void set_cluster_up(bool distributor);
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp
deleted file mode 100644
index b608593dada..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp
+++ /dev/null
@@ -1,179 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "bm_message_bus.h"
-#include "pending_tracker_hash.h"
-#include "pending_tracker.h"
-#include "storage_reply_error_checker.h"
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/network/rpcnetworkparams.h>
-#include <vespa/messagebus/rpcmessagebus.h>
-#include <vespa/messagebus/ireplyhandler.h>
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/documentapi/messagebus/messages/documentmessage.h>
-#include <vespa/storageapi/mbusprot/storageprotocol.h>
-#include <vespa/storageapi/mbusprot/storagereply.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".bm_message_bus");
-
-using documentapi::DocumentProtocol;
-using mbus::RPCMessageBus;
-using mbus::Reply;
-using mbus::SourceSession;
-using storage::mbusprot::StorageProtocol;
-using storage::mbusprot::StorageReply;
-
-namespace feedbm {
-
-namespace {
-
-std::atomic<uint64_t> bm_message_bus_msg_id(0u);
-
-vespalib::string reply_as_string(Reply &reply) {
- vespalib::asciistream os;
- if (reply.getType() == 0) {
- os << "empty reply";
- } else {
- os << "reply=" << reply.toString() << ", protocol=" << reply.getProtocol();
- }
- os << ", ";
- auto message = reply.getMessage();
- if (message) {
- os << "message=" << message->toString();
- os << ", protocol=" << message->getProtocol();
- } else {
- os << "no message";
- }
- reply.setMessage(std::move(message));
- os << ", ";
- if (reply.hasErrors()) {
- os << "errors=[";
- for (uint32_t i = 0; i < reply.getNumErrors(); ++i) {
- auto &error = reply.getError(i);
- if (i > 0) {
- os << ", ";
- }
- os << mbus::ErrorCode::getName(error.getCode()) << ": " << error.getMessage() << " (from " << error.getService() << ")";
- }
- os << "]";
- } else {
- os << "no errors";
- }
- return os.str();
-}
-
-}
-
-class BmMessageBus::ReplyHandler : public mbus::IReplyHandler,
- public StorageReplyErrorChecker
-{
- PendingTrackerHash _pending_hash;
-public:
- ReplyHandler();
- ~ReplyHandler() override;
- void handleReply(std::unique_ptr<Reply> reply) override;
- void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
- void message_aborted(uint64_t msg_id);
-};
-
-BmMessageBus::ReplyHandler::ReplyHandler()
- : mbus::IReplyHandler(),
- StorageReplyErrorChecker(),
- _pending_hash()
-{
-}
-
-BmMessageBus::ReplyHandler::~ReplyHandler() = default;
-
-void
-BmMessageBus::ReplyHandler::handleReply(std::unique_ptr<Reply> reply)
-{
- auto msg_id = reply->getContext().value.UINT64;
- auto tracker = _pending_hash.release(msg_id);
- if (tracker != nullptr) {
- bool failed = false;
- if (reply->getType() == 0 || reply->hasErrors()) {
- failed = true; // empty reply or error
- } else {
- auto protocol = reply->getProtocol();
- if (protocol == DocumentProtocol::NAME) {
- } else if (protocol == StorageProtocol::NAME) {
- auto sreply = dynamic_cast<storage::mbusprot::StorageReply *>(reply.get());
- if (sreply != nullptr) {
- check_error(*sreply->getReply());
- } else {
- failed = true; // unexpected message type
- }
- } else {
- failed = true; // unexpected protocol
- }
- }
- if (failed) {
- ++_errors;
- LOG(error, "Unexpected %s", reply_as_string(*reply).c_str());
- }
- tracker->release();
- } else {
- ++_errors;
- LOG(error, "Untracked %s", reply_as_string(*reply).c_str());
- }
-}
-
-void
-BmMessageBus::ReplyHandler::message_aborted(uint64_t msg_id)
-{
- ++_errors;
- auto tracker = _pending_hash.release(msg_id);
- tracker->release();
-}
-
-BmMessageBus::BmMessageBus(const config::ConfigUri& config_uri,
- std::shared_ptr<const document::DocumentTypeRepo> document_type_repo)
- : _reply_handler(std::make_unique<ReplyHandler>()),
- _message_bus(),
- _session()
-{
- mbus::RPCNetworkParams params(config_uri);
- mbus::ProtocolSet protocol_set;
- protocol_set.add(std::make_shared<DocumentProtocol>(document_type_repo));
- protocol_set.add(std::make_shared<StorageProtocol>(document_type_repo));
- params.setIdentity(mbus::Identity("vespa-bm-client"));
- _message_bus = std::make_unique<mbus::RPCMessageBus>(
- protocol_set,
- params,
- config_uri);
- mbus::SourceSessionParams srcParams;
- srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP());
- srcParams.setReplyHandler(*_reply_handler);
- _session = _message_bus->getMessageBus().createSourceSession(srcParams);
-}
-
-BmMessageBus::~BmMessageBus()
-{
- _session.reset();
- _message_bus.reset();
- _reply_handler.reset();
-}
-
-uint32_t
-BmMessageBus::get_error_count() const
-{
- return _reply_handler->get_error_count();
-}
-
-void
-BmMessageBus::send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker)
-{
- auto msg_id = ++bm_message_bus_msg_id;
- _reply_handler->retain(msg_id, tracker);
- msg->setContext(mbus::Context(msg_id));
- msg->setRetryEnabled(false);
- auto result = _session->send(std::move(msg), route);
- if (!result.isAccepted()) {
- LOG(error, "Message not accepeted, error is '%s'", result.getError().toString().c_str());
- _reply_handler->message_aborted(msg_id);
- }
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h
deleted file mode 100644
index a9cff1fb826..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <memory>
-
-namespace config { class ConfigUri; }
-namespace document { class DocumentTypeRepo; }
-
-namespace mbus {
-
-class Message;
-class RPCMessageBus;
-class Route;
-class SourceSession;
-
-}
-
-namespace feedbm {
-
-class PendingTracker;
-
-/*
- * Message bus for feed benchmark program.
- */
-class BmMessageBus
-{
- class ReplyHandler;
- std::unique_ptr<ReplyHandler> _reply_handler;
- std::unique_ptr<mbus::RPCMessageBus> _message_bus;
- std::unique_ptr<mbus::SourceSession> _session;
-public:
- BmMessageBus(const config::ConfigUri& config_uri,
- std::shared_ptr<const document::DocumentTypeRepo> document_type_repo);
- ~BmMessageBus();
- uint32_t get_error_count() const;
- void send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker);
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp
deleted file mode 100644
index bbe0de70ce2..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "bm_storage_chain_builder.h"
-#include "bm_storage_link_context.h"
-#include "bm_storage_link.h"
-
-#include <vespa/log/log.h>
-LOG_SETUP(".bm_storage_chain_builder");
-
-namespace feedbm {
-
-BmStorageChainBuilder::BmStorageChainBuilder()
- : storage::StorageChainBuilder(),
- _context(std::make_shared<BmStorageLinkContext>())
-{
-}
-
-BmStorageChainBuilder::~BmStorageChainBuilder() = default;
-
-void
-BmStorageChainBuilder::add(std::unique_ptr<storage::StorageLink> link)
-{
- vespalib::string name = link->getName();
- storage::StorageChainBuilder::add(std::move(link));
- LOG(info, "Added storage link '%s'", name.c_str());
- if (name == "Communication manager") {
- auto my_link = std::make_unique<BmStorageLink>();
- LOG(info, "Adding extra storage link '%s'", my_link->getName().c_str());
- _context->bm_link = my_link.get();
- storage::StorageChainBuilder::add(std::move(my_link));
- }
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h
deleted file mode 100644
index bba933da9e0..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/storage/common/storage_chain_builder.h>
-
-namespace feedbm {
-
-struct BmStorageLinkContext;
-
-/*
- * Storage chain builder that inserts a BmStorageLink right below the
- * communication manager. This allows sending benchmark feed to chain.
- */
-class BmStorageChainBuilder : public storage::StorageChainBuilder
-{
- std::shared_ptr<BmStorageLinkContext> _context;
-public:
- BmStorageChainBuilder();
- ~BmStorageChainBuilder() override;
- const std::shared_ptr<BmStorageLinkContext>& get_context() { return _context; }
- void add(std::unique_ptr<storage::StorageLink> link) override;
-};
-
-}
-
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
deleted file mode 100644
index 2aeda91c30c..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "bm_storage_link.h"
-#include "pending_tracker.h"
-
-namespace feedbm {
-
-
-BmStorageLink::BmStorageLink()
- : storage::StorageLink("vespa-bm-feed"),
- StorageReplyErrorChecker(),
- _pending_hash()
-{
-}
-
-BmStorageLink::~BmStorageLink() = default;
-
-bool
-BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
-{
- (void) msg;
- return false;
-}
-
-bool
-BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg)
-{
- auto tracker = _pending_hash.release(msg->getMsgId());
- if (tracker != nullptr) {
- check_error(*msg);
- tracker->release();
- return true;
- }
- return false;
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
deleted file mode 100644
index 95528d7b2d9..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "storage_reply_error_checker.h"
-#include "pending_tracker_hash.h"
-#include <vespa/storage/common/storagelink.h>
-
-namespace feedbm {
-
-class PendingTracker;
-
-/*
- * Storage link used to feed storage api messages to a distributor or
- * service layer node. A count of error replies is maintained.
- */
-class BmStorageLink : public storage::StorageLink,
- public StorageReplyErrorChecker
-{
- PendingTrackerHash _pending_hash;
-public:
- BmStorageLink();
- ~BmStorageLink() override;
- bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
- bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
- void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h
deleted file mode 100644
index f2df20f1f66..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-namespace feedbm {
-
-class BmStorageLink;
-
-/*
- * This context is initialized by BmStorageChainBuilder.
- */
-struct BmStorageLinkContext
-{
- BmStorageLink* bm_link;
- BmStorageLinkContext() noexcept
- : bm_link(nullptr)
- {
- }
- ~BmStorageLinkContext() = default;
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp b/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp
deleted file mode 100644
index fc43402d68e..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "bucket_info_queue.h"
-#include <vespa/persistence/spi/persistenceprovider.h>
-
-namespace feedbm {
-
-BucketInfoQueue::BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors)
- : _mutex(),
- _buckets(),
- _provider(provider),
- _errors(errors)
-{
-}
-
-BucketInfoQueue::~BucketInfoQueue()
-{
- get_bucket_info_loop();
-}
-
-void
-BucketInfoQueue::get_bucket_info_loop()
-{
- std::unique_lock guard(_mutex);
- while (!_buckets.empty()) {
- auto bucket = _buckets.front();
- _buckets.pop_front();
- guard.unlock();
- auto bucket_info = _provider.getBucketInfo(bucket);
- if (bucket_info.hasError()) {
- ++_errors;
- }
- guard.lock();
- }
-}
-
-}
-
diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h b/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h
deleted file mode 100644
index 07a55127234..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/persistence/spi/bucket.h>
-#include <mutex>
-#include <deque>
-#include <atomic>
-
-namespace storage::spi { struct PersistenceProvider; }
-
-namespace feedbm {
-
-/*
- * Class containing a queue of buckets where mutating feed operations
- * have been performed, requiring service layer to ask for updated
- * bucket info.
- */
-class BucketInfoQueue
-{
- std::mutex _mutex;
- std::deque<storage::spi::Bucket> _buckets;
- storage::spi::PersistenceProvider& _provider;
- std::atomic<uint32_t>& _errors;
-
-public:
- BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors);
- ~BucketInfoQueue();
-
- void put_bucket(storage::spi::Bucket bucket)
- {
- std::lock_guard guard(_mutex);
- _buckets.emplace_back(std::move(bucket));
- }
-
- void get_bucket_info_loop();
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
deleted file mode 100644
index 38c8490de69..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
+++ /dev/null
@@ -1,94 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "document_api_message_bus_bm_feed_handler.h"
-#include "bm_message_bus.h"
-#include "pending_tracker.h"
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
-using storage::api::StorageMessageAddress;
-using storage::lib::NodeType;
-
-namespace feedbm {
-
-namespace {
- vespalib::string _Storage("storage");
-}
-
-DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus)
- : IBmFeedHandler(),
- _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")),
- _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, NodeType::DISTRIBUTOR, 0)),
- _message_bus(message_bus),
- _route(_storage_address->to_mbus_route())
-{
-}
-
-DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default;
-
-void
-DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker)
-{
- _message_bus.send_msg(std::move(msg), _route, pending_tracker);
-}
-
-void
-DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker)
-{
- auto msg = std::make_unique<documentapi::PutDocumentMessage>(std::move(document));
- send_msg(std::move(msg), tracker);
-}
-
-void
-DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker)
-{
- auto msg = std::make_unique<documentapi::UpdateDocumentMessage>(std::move(document_update));
- send_msg(std::move(msg), tracker);
-}
-
-void
-DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker)
-{
- auto msg = std::make_unique<documentapi::RemoveDocumentMessage>(document_id);
- send_msg(std::move(msg), tracker);
-}
-
-void
-DocumentApiMessageBusBmFeedHandler::get(const document::Bucket&, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- auto msg = std::make_unique<documentapi::GetDocumentMessage>(document_id, field_set_string);
- send_msg(std::move(msg), tracker);
-}
-
-void
-DocumentApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
-{
-}
-
-uint32_t
-DocumentApiMessageBusBmFeedHandler::get_error_count() const
-{
- return _message_bus.get_error_count();
-}
-
-const vespalib::string&
-DocumentApiMessageBusBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-DocumentApiMessageBusBmFeedHandler::manages_timestamp() const
-{
- return true;
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
deleted file mode 100644
index c71bb113c5b..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "i_bm_feed_handler.h"
-#include <vespa/messagebus/routing/route.h>
-
-namespace document { class DocumentTypeRepo; }
-namespace documentapi { class DocumentMessage; };
-namespace storage::api { class StorageMessageAddress; }
-
-namespace feedbm {
-
-class BmMessageBus;
-
-/*
- * Benchmark feed handler for feed to distributor using document api protocol
- * over message bus.
- */
-class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler
-{
- vespalib::string _name;
- std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
- BmMessageBus& _message_bus;
- mbus::Route _route;
- void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker);
-public:
- DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus);
- ~DocumentApiMessageBusBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
- void attach_bucket_info_queue(PendingTracker &tracker) override;
- uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
deleted file mode 100644
index 26cbf27b455..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <memory>
-#include <vespa/vespalib/stllike/string.h>
-
-namespace document {
-class Bucket;
-class Document;
-class DocumentUpdate;
-class DocumentId;
-}
-
-namespace feedbm {
-
-class BucketInfoQueue;
-class PendingTracker;
-
-/*
- * Interface class for benchmark feed handler.
- */
-class IBmFeedHandler
-{
-public:
- virtual ~IBmFeedHandler() = default;
- virtual void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) = 0;
- virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0;
- virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0;
- virtual void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) = 0;
- virtual void attach_bucket_info_queue(PendingTracker& tracker) = 0;
- virtual uint32_t get_error_count() const = 0;
- virtual const vespalib::string &get_name() const = 0;
- virtual bool manages_timestamp() const = 0;
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp
deleted file mode 100644
index 94bed4cb3bd..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "pending_tracker.h"
-#include "bucket_info_queue.h"
-#include <thread>
-#include <chrono>
-
-using namespace std::chrono_literals;
-
-namespace feedbm {
-
-PendingTracker::PendingTracker(uint32_t limit)
- : _pending(0u),
- _limit(limit),
- _bucket_info_queue()
-{
-}
-
-PendingTracker::~PendingTracker()
-{
- drain();
-}
-
-void
-PendingTracker::retain() {
- while (_pending >= _limit) {
- std::this_thread::sleep_for(1ms);
- }
- _pending++;
-}
-
-void
-PendingTracker::drain()
-{
- if (_bucket_info_queue) {
- _bucket_info_queue->get_bucket_info_loop();
- }
- while (_pending > 0) {
- std::this_thread::sleep_for(1ms);
- if (_bucket_info_queue) {
- _bucket_info_queue->get_bucket_info_loop();
- }
- }
- if (_bucket_info_queue) {
- _bucket_info_queue->get_bucket_info_loop();
- }
-}
-
-void
-PendingTracker::attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors)
-{
- _bucket_info_queue = std::make_unique<BucketInfoQueue>(provider, errors);
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h
deleted file mode 100644
index 4ca84ab7442..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <atomic>
-#include <memory>
-
-namespace storage::spi { struct PersistenceProvider; }
-
-namespace feedbm {
-
-class BucketInfoQueue;
-
-/*
- * Class to track number of pending operations, used as backpressure during
- * benchmark feeding.
- */
-class PendingTracker {
- std::atomic<uint32_t> _pending;
- uint32_t _limit;
- std::unique_ptr<BucketInfoQueue> _bucket_info_queue;
-
-public:
- PendingTracker(uint32_t limit);
- ~PendingTracker();
-
- void release() {
- _pending--;
- }
- void retain();
- void drain();
-
- void attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors);
- BucketInfoQueue *get_bucket_info_queue() { return _bucket_info_queue.get(); }
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp
deleted file mode 100644
index 6863d35703e..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "pending_tracker_hash.h"
-#include "pending_tracker.h"
-#include <vespa/vespalib/stllike/hash_map.hpp>
-#include <cassert>
-
-namespace feedbm {
-
-PendingTrackerHash::PendingTrackerHash()
- : _mutex(),
- _pending()
-{
-}
-
-PendingTrackerHash::~PendingTrackerHash()
-{
- std::lock_guard lock(_mutex);
- assert(_pending.empty());
-}
-
-void
-PendingTrackerHash::retain(uint64_t msg_id, PendingTracker &tracker)
-{
- tracker.retain();
- std::lock_guard lock(_mutex);
- _pending.insert(std::make_pair(msg_id, &tracker));
-}
-
-PendingTracker *
-PendingTrackerHash::release(uint64_t msg_id)
-{
- std::lock_guard lock(_mutex);
- auto itr = _pending.find(msg_id);
- if (itr == _pending.end()) {
- return nullptr;
- }
- auto tracker = itr->second;
- _pending.erase(itr);
- return tracker;
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h
deleted file mode 100644
index 89be93fd4ed..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/stllike/hash_map.h>
-#include <mutex>
-
-namespace feedbm {
-
-class PendingTracker;
-
-/*
- * Class maintaing mapping from message id to pending tracker
- */
-class PendingTrackerHash
-{
- std::mutex _mutex;
- vespalib::hash_map<uint64_t, PendingTracker *> _pending;
-public:
- PendingTrackerHash();
- ~PendingTrackerHash();
- PendingTracker *release(uint64_t msg_id);
- void retain(uint64_t msg_id, PendingTracker &tracker);
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
deleted file mode 100644
index 11149eecb3f..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "spi_bm_feed_handler.h"
-#include "pending_tracker.h"
-#include "bucket_info_queue.h"
-#include <vespa/document/fieldset/fieldsetrepo.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/persistence/spi/persistenceprovider.h>
-
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
-using storage::spi::Bucket;
-using storage::spi::PersistenceProvider;
-using storage::spi::Timestamp;
-
-namespace feedbm {
-
-namespace {
-
-storage::spi::Context context(storage::spi::Priority(0), 0);
-
-void get_bucket_info_loop(PendingTracker &tracker)
-{
- auto bucket_info_queue = tracker.get_bucket_info_queue();
- if (bucket_info_queue != nullptr) {
- bucket_info_queue->get_bucket_info_loop();
- }
-}
-
-class MyOperationComplete : public storage::spi::OperationComplete
-{
- std::atomic<uint32_t> &_errors;
- Bucket _bucket;
- PendingTracker& _tracker;
-public:
- MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker);
- ~MyOperationComplete() override;
- void onComplete(std::unique_ptr<storage::spi::Result> result) override;
- void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
-};
-
-MyOperationComplete::MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker)
- : _errors(errors),
- _bucket(bucket),
- _tracker(tracker)
-{
- _tracker.retain();
-}
-
-MyOperationComplete::~MyOperationComplete()
-{
- _tracker.release();
-}
-
-void
-MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
-{
- if (result->hasError()) {
- ++_errors;
- } else {
- auto bucket_info_queue = _tracker.get_bucket_info_queue();
- if (bucket_info_queue != nullptr) {
- bucket_info_queue->put_bucket(_bucket);
- }
- }
-}
-
-void
-MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler)
-{
- (void) resultHandler;
-}
-
-}
-
-SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider, const document::FieldSetRepo &field_set_repo, bool skip_get_spi_bucket_info)
- : IBmFeedHandler(),
- _name(vespalib::string("SpiBmFeedHandler(") + (skip_get_spi_bucket_info ? "skip-get-spi-bucket-info" : "get-spi-bucket-info") + ")"),
- _provider(provider),
- _field_set_repo(field_set_repo),
- _errors(0u),
- _skip_get_spi_bucket_info(skip_get_spi_bucket_info)
-{
-}
-
-SpiBmFeedHandler::~SpiBmFeedHandler() = default;
-
-void
-SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
-{
- get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
-}
-
-void
-SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
-{
- get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
-}
-
-void
-SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
-{
- get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
-}
-
-void
-SpiBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- auto field_set = _field_set_repo.getFieldSet(field_set_string);
- auto result = _provider.get(spi_bucket, *field_set, document_id, context);
- if (result.hasError()) {
- ++_errors;
- }
-}
-
-void
-SpiBmFeedHandler::create_bucket(const document::Bucket& bucket)
-{
- _provider.createBucket(Bucket(bucket), context);
-}
-
-void
-SpiBmFeedHandler::attach_bucket_info_queue(PendingTracker& tracker)
-{
- if (!_skip_get_spi_bucket_info) {
- tracker.attach_bucket_info_queue(_provider, _errors);
- }
-}
-
-uint32_t
-SpiBmFeedHandler::get_error_count() const
-{
- return _errors;
-}
-
-const vespalib::string&
-SpiBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-SpiBmFeedHandler::manages_timestamp() const
-{
- return false;
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
deleted file mode 100644
index a78aa06628b..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "i_bm_feed_handler.h"
-#include <atomic>
-
-namespace document { class FieldSetRepo; }
-namespace storage::spi { struct PersistenceProvider; }
-
-namespace feedbm {
-
-/*
- * Benchmark feed handler for feed directly to persistence provider
- */
-class SpiBmFeedHandler : public IBmFeedHandler
-{
- vespalib::string _name;
- storage::spi::PersistenceProvider& _provider;
- const document::FieldSetRepo& _field_set_repo;
- std::atomic<uint32_t> _errors;
- bool _skip_get_spi_bucket_info;
-public:
- SpiBmFeedHandler(storage::spi::PersistenceProvider& provider, const document::FieldSetRepo& field_set_repo, bool skip_get_spi_bucket_info);
- ~SpiBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
- void create_bucket(const document::Bucket& bucket);
- void attach_bucket_info_queue(PendingTracker &tracker) override;
- uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
deleted file mode 100644
index 82cf2df065f..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "storage_api_chain_bm_feed_handler.h"
-#include "pending_tracker.h"
-#include "storage_reply_error_checker.h"
-#include "bm_storage_link_context.h"
-#include "bm_storage_link.h"
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storageapi/message/state.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
-#include <cassert>
-
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
-
-namespace feedbm {
-
-namespace {
-
-std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() {
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
- auto cmd = std::make_shared<storage::api::SetSystemStateCommand>(bundle);
- cmd->setPriority(storage::api::StorageMessage::VERYHIGH);
- return cmd;
-}
-
-}
-
-StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor)
- : IBmFeedHandler(),
- _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
- _distributor(distributor),
- _context(std::move(context))
-{
- auto cmd = make_set_cluster_state_cmd();
- PendingTracker tracker(1);
- send_msg(std::move(cmd), tracker);
- tracker.drain();
-}
-
-StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default;
-
-void
-StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
-{
- cmd->setSourceIndex(0);
- auto bm_link = _context->bm_link;
- bm_link->retain(cmd->getMsgId(), pending_tracker);
- bm_link->sendDown(std::move(cmd));
-}
-
-void
-StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiChainBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiChainBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
-{
-}
-
-uint32_t
-StorageApiChainBmFeedHandler::get_error_count() const
-{
- return _context->bm_link->get_error_count();
-}
-
-const vespalib::string&
-StorageApiChainBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-StorageApiChainBmFeedHandler::manages_timestamp() const
-{
- return _distributor;
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
deleted file mode 100644
index 0c4b715122e..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "i_bm_feed_handler.h"
-
-namespace storage::api { class StorageCommand; }
-
-namespace feedbm {
-
-struct BmStorageLinkContext;
-
-/*
- * Benchmark feed handler for feed to service layer or distributor
- * using storage api protocol directly on the storage chain.
- */
-class StorageApiChainBmFeedHandler : public IBmFeedHandler
-{
-private:
- vespalib::string _name;
- bool _distributor;
- std::shared_ptr<BmStorageLinkContext> _context;
- void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
-public:
- StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor);
- ~StorageApiChainBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
-
- void attach_bucket_info_queue(PendingTracker &tracker) override;
- uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
deleted file mode 100644
index f63a8e33cc0..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
+++ /dev/null
@@ -1,94 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "storage_api_message_bus_bm_feed_handler.h"
-#include "bm_message_bus.h"
-#include "pending_tracker.h"
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storageapi/mbusprot/storagecommand.h>
-
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
-using storage::api::StorageMessageAddress;
-using storage::lib::NodeType;
-
-namespace feedbm {
-
-namespace {
- vespalib::string _Storage("storage");
-}
-StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor)
- : IBmFeedHandler(),
- _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
- _distributor(distributor),
- _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
- _message_bus(message_bus),
- _route(_storage_address->to_mbus_route())
-{
-}
-
-StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default;
-
-void
-StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
-{
- cmd->setSourceIndex(0);
- auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd);
- _message_bus.send_msg(std::move(msg), _route, pending_tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
-{
-}
-
-uint32_t
-StorageApiMessageBusBmFeedHandler::get_error_count() const
-{
- return _message_bus.get_error_count();
-}
-
-const vespalib::string&
-StorageApiMessageBusBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-StorageApiMessageBusBmFeedHandler::manages_timestamp() const
-{
- return _distributor;
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
deleted file mode 100644
index 2aafd0c6830..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "i_bm_feed_handler.h"
-#include <vespa/messagebus/routing/route.h>
-
-namespace document { class DocumentTypeRepo; }
-namespace documentapi { class DocumentMessage; };
-namespace storage::api {
-class StorageCommand;
-class StorageMessageAddress;
-}
-
-namespace feedbm {
-
-class BmMessageBus;
-
-/*
- * Benchmark feed handler for feed to service layer or distributor
- * using storage api protocol over message bus.
- */
-class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler
-{
- vespalib::string _name;
- bool _distributor;
- std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
- BmMessageBus& _message_bus;
- mbus::Route _route;
- void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
-public:
- StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor);
- ~StorageApiMessageBusBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
- void attach_bucket_info_queue(PendingTracker &tracker) override;
- uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
deleted file mode 100644
index 04d49bba0a3..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "storage_api_rpc_bm_feed_handler.h"
-#include "pending_tracker.h"
-#include "pending_tracker_hash.h"
-#include "storage_reply_error_checker.h"
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storage/storageserver/message_dispatcher.h>
-#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
-#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
-#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
-
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
-using document::DocumentTypeRepo;
-using storage::api::StorageMessageAddress;
-using storage::rpc::SharedRpcResources;
-using storage::rpc::StorageApiRpcService;
-using storage::lib::NodeType;
-
-namespace feedbm {
-
-namespace {
- vespalib::string _Storage("storage");
-}
-
-class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher,
- public StorageReplyErrorChecker
-{
- PendingTrackerHash _pending_hash;
-public:
- MyMessageDispatcher()
- : storage::MessageDispatcher(),
- StorageReplyErrorChecker(),
- _pending_hash()
- {
- }
- ~MyMessageDispatcher() override;
- void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override {
- check_error(*msg);
- release(msg->getMsgId());
- }
- void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override {
- check_error(*msg);
- release(msg->getMsgId());
- }
- void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
- void release(uint64_t msg_id) {
- auto tracker = _pending_hash.release(msg_id);
- if (tracker != nullptr) {
- tracker->release();
- } else {
- ++_errors;
- }
- }
-};
-
-StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher()
-{
-}
-
-StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in,
- std::shared_ptr<const DocumentTypeRepo> repo,
- const StorageApiRpcService::Params& rpc_params,
- bool distributor)
- : IBmFeedHandler(),
- _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
- _distributor(distributor),
- _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
- _shared_rpc_resources(shared_rpc_resources_in),
- _message_dispatcher(std::make_unique<MyMessageDispatcher>()),
- _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo)),
- _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params))
-{
-}
-
-StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default;
-
-void
-StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
-{
- cmd->setSourceIndex(0);
- cmd->setAddress(*_storage_address);
- _message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
- _rpc_client->send_rpc_v1_request(std::move(cmd));
-}
-
-void
-StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
-StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
-StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
-StorageApiRpcBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
-StorageApiRpcBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
-{
-}
-
-uint32_t
-StorageApiRpcBmFeedHandler::get_error_count() const
-{
- return _message_dispatcher->get_error_count();
-}
-
-const vespalib::string&
-StorageApiRpcBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-StorageApiRpcBmFeedHandler::manages_timestamp() const
-{
- return _distributor;
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
deleted file mode 100644
index 5057d8889a5..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "i_bm_feed_handler.h"
-#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
-
-namespace document { class DocumentTypeRepo; }
-namespace storage::api {
-class StorageMessageAddress;
-class StorageCommand;
-}
-
-namespace storage::rpc {
-class MessageCodecProvider;
-class SharedRpcResources;
-}
-
-namespace feedbm {
-
-/*
- * Benchmark feed handler for feed to service layer or distributor
- * using storage api protocol over rpc.
- */
-class StorageApiRpcBmFeedHandler : public IBmFeedHandler
-{
- class MyMessageDispatcher;
- vespalib::string _name;
- bool _distributor;
- std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
- storage::rpc::SharedRpcResources& _shared_rpc_resources;
- std::unique_ptr<MyMessageDispatcher> _message_dispatcher;
- std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider;
- std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client;
-
- void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
-public:
- StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in,
- std::shared_ptr<const document::DocumentTypeRepo> repo,
- const storage::rpc::StorageApiRpcService::Params& rpc_params,
- bool distributor);
- ~StorageApiRpcBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
- void attach_bucket_info_queue(PendingTracker &tracker) override;
- uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp
deleted file mode 100644
index 260b0c8a7af..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "storage_reply_error_checker.h"
-#include <vespa/storageapi/messageapi/storagereply.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".storage_reply_error_checker");
-
-namespace feedbm {
-
-StorageReplyErrorChecker::StorageReplyErrorChecker()
- : _errors(0u)
-{
-}
-
-StorageReplyErrorChecker::~StorageReplyErrorChecker() = default;
-
-void
-StorageReplyErrorChecker::check_error(const storage::api::StorageMessage &msg)
-{
- auto reply = dynamic_cast<const storage::api::StorageReply*>(&msg);
- if (reply != nullptr) {
- if (reply->getResult().failed()) {
- if (++_errors <= 10) {
- LOG(info, "reply '%s', return code '%s'", reply->toString().c_str(), reply->getResult().toString().c_str());
- }
- }
- } else {
- ++_errors;
- }
-}
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
deleted file mode 100644
index 4743367b426..00000000000
--- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <atomic>
-
-namespace storage::api { class StorageMessage; }
-
-namespace feedbm {
-
-class StorageReplyErrorChecker {
-protected:
- std::atomic<uint32_t> _errors;
-public:
- StorageReplyErrorChecker();
- ~StorageReplyErrorChecker();
- void check_error(const storage::api::StorageMessage &msg);
- uint32_t get_error_count() const { return _errors; }
-};
-
-}
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index cd1920d237f..9752a4b5c36 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -1,80 +1,25 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "bm_cluster_controller.h"
-#include "bm_message_bus.h"
-#include "bm_storage_chain_builder.h"
-#include "bm_storage_link_context.h"
-#include "pending_tracker.h"
-#include "spi_bm_feed_handler.h"
-#include "storage_api_chain_bm_feed_handler.h"
-#include "storage_api_message_bus_bm_feed_handler.h"
-#include "storage_api_rpc_bm_feed_handler.h"
-#include "document_api_message_bus_bm_feed_handler.h"
-#include <tests/proton/common/dummydbowner.h>
-#include <vespa/config-attributes.h>
-#include <vespa/config-bucketspaces.h>
-#include <vespa/config-imported-fields.h>
-#include <vespa/config-indexschema.h>
-#include <vespa/config-persistence.h>
-#include <vespa/config-rank-profiles.h>
-#include <vespa/config-slobroks.h>
-#include <vespa/config-stor-distribution.h>
-#include <vespa/config-stor-filestor.h>
-#include <vespa/config-summary.h>
-#include <vespa/config-summarymap.h>
-#include <vespa/config-upgrading.h>
-#include <vespa/config/common/configcontext.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/fieldset/fieldsetrepo.h>
-#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/repo/configbuilder.h>
#include <vespa/document/repo/document_type_repo_factory.h>
#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/document/update/assignvalueupdate.h>
-#include <vespa/document/update/documentupdate.h>
#include <vespa/fastos/app.h>
-#include <vespa/messagebus/config-messagebus.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/metrics/config-metricsmanager.h>
-#include <vespa/searchcommon/common/schemaconfigurer.h>
-#include <vespa/searchcore/proton/common/alloc_config.h>
-#include <vespa/searchcore/proton/common/hw_info.h>
-#include <vespa/searchcore/proton/matching/querylimiter.h>
-#include <vespa/searchcore/proton/metrics/metricswireservice.h>
-#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
-#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
-#include <vespa/searchcore/proton/server/bootstrapconfig.h>
-#include <vespa/searchcore/proton/server/document_db_maintenance_config.h>
-#include <vespa/searchcore/proton/server/documentdb.h>
-#include <vespa/searchcore/proton/server/documentdbconfigmanager.h>
-#include <vespa/searchcore/proton/server/fileconfigmanager.h>
-#include <vespa/searchcore/proton/server/memoryconfigstore.h>
-#include <vespa/searchcore/proton/server/persistencehandlerproxy.h>
-#include <vespa/searchcore/proton/server/threading_service_config.h>
-#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
+#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/searchcore/bmcluster/bm_cluster.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_controller.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_params.h>
+#include <vespa/searchcore/bmcluster/bm_feed.h>
+#include <vespa/searchcore/bmcluster/bm_node.h>
+#include <vespa/searchcore/bmcluster/bm_range.h>
+#include <vespa/searchcore/bmcluster/bucket_selector.h>
+#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
-#include <vespa/searchlib/transactionlog/translogserver.h>
-#include <vespa/searchsummary/config/config-juniperrc.h>
-#include <vespa/slobrok/sbmirror.h>
-#include <vespa/storage/bucketdb/config-stor-bucket-init.h>
-#include <vespa/storage/common/i_storage_chain_builder.h>
-#include <vespa/storage/config/config-stor-bouncer.h>
-#include <vespa/storage/config/config-stor-communicationmanager.h>
-#include <vespa/storage/config/config-stor-distributormanager.h>
-#include <vespa/storage/config/config-stor-opslogger.h>
-#include <vespa/storage/config/config-stor-prioritymapping.h>
-#include <vespa/storage/config/config-stor-server.h>
-#include <vespa/storage/config/config-stor-status.h>
-#include <vespa/storage/config/config-stor-visitordispatcher.h>
-#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
-#include <vespa/storage/visiting/config-stor-visitor.h>
-#include <vespa/storageserver/app/distributorprocess.h>
-#include <vespa/storageserver/app/servicelayerprocess.h>
#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <getopt.h>
#include <iostream>
#include <thread>
@@ -82,82 +27,30 @@
#include <vespa/log/log.h>
LOG_SETUP("vespa-feed-bm");
-using namespace cloud::config::filedistribution;
-using namespace config;
using namespace proton;
using namespace std::chrono_literals;
-using namespace vespa::config::search::core;
-using namespace vespa::config::search::summary;
-using namespace vespa::config::search;
-using vespa::config::content::PersistenceConfigBuilder;
-using vespa::config::content::StorDistributionConfigBuilder;
-using vespa::config::content::StorFilestorConfigBuilder;
-using vespa::config::content::UpgradingConfigBuilder;
-using vespa::config::content::core::BucketspacesConfig;
-using vespa::config::content::core::BucketspacesConfigBuilder;
-using vespa::config::content::core::StorBouncerConfigBuilder;
-using vespa::config::content::core::StorBucketInitConfigBuilder;
-using vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
-using vespa::config::content::core::StorDistributormanagerConfigBuilder;
-using vespa::config::content::core::StorOpsloggerConfigBuilder;
-using vespa::config::content::core::StorPrioritymappingConfigBuilder;
-using vespa::config::content::core::StorServerConfigBuilder;
-using vespa::config::content::core::StorStatusConfigBuilder;
-using vespa::config::content::core::StorVisitorConfigBuilder;
-using vespa::config::content::core::StorVisitordispatcherConfigBuilder;
-using cloud::config::SlobroksConfigBuilder;
-using messagebus::MessagebusConfigBuilder;
-using metrics::MetricsmanagerConfigBuilder;
-using config::ConfigContext;
-using config::ConfigSet;
-using config::ConfigUri;
-using config::IConfigContext;
-using document::AssignValueUpdate;
-using document::BucketId;
-using document::BucketSpace;
-using document::Document;
-using document::DocumentId;
-using document::DocumentType;
using document::DocumentTypeRepo;
using document::DocumentTypeRepoFactory;
-using document::DocumentUpdate;
using document::DocumenttypesConfig;
using document::DocumenttypesConfigBuilder;
-using document::Field;
-using document::FieldSetRepo;
-using document::FieldUpdate;
-using document::IntFieldValue;
-using document::test::makeBucketSpace;
-using feedbm::BmClusterController;
-using feedbm::BmMessageBus;
-using feedbm::BmStorageChainBuilder;
-using feedbm::BmStorageLinkContext;
-using feedbm::IBmFeedHandler;
-using feedbm::DocumentApiMessageBusBmFeedHandler;
-using feedbm::SpiBmFeedHandler;
-using feedbm::StorageApiChainBmFeedHandler;
-using feedbm::StorageApiMessageBusBmFeedHandler;
-using feedbm::StorageApiRpcBmFeedHandler;
-using search::TuneFileDocumentDB;
+using search::bmcluster::BmClusterController;
+using search::bmcluster::IBmFeedHandler;
+using search::bmcluster::BmClusterParams;
+using search::bmcluster::BmCluster;
+using search::bmcluster::BmFeed;
+using search::bmcluster::BmNode;
+using search::bmcluster::BmRange;
+using search::bmcluster::BucketSelector;
using search::index::DummyFileHeaderContext;
-using search::index::Schema;
-using search::index::SchemaBuilder;
-using search::transactionlog::TransLogServer;
-using storage::rpc::SharedRpcResources;
-using storage::rpc::StorageApiRpcService;
using storage::spi::PersistenceProvider;
-using vespalib::compression::CompressionConfig;
using vespalib::makeLambdaTask;
-using proton::ThreadingServiceConfig;
-
-using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>;
namespace {
vespalib::string base_dir = "testdb";
-std::shared_ptr<DocumenttypesConfig> make_document_type() {
+std::shared_ptr<DocumenttypesConfig> make_document_types() {
using Struct = document::config_builder::Struct;
using DataType = document::DataType;
document::config_builder::DocumenttypesConfigBuilderHelper builder;
@@ -165,130 +58,14 @@ std::shared_ptr<DocumenttypesConfig> make_document_type() {
return std::make_shared<DocumenttypesConfig>(builder.config());
}
-std::shared_ptr<AttributesConfig> make_attributes_config() {
- AttributesConfigBuilder builder;
- AttributesConfig::Attribute attribute;
- attribute.name = "int";
- attribute.datatype = AttributesConfig::Attribute::Datatype::INT32;
- builder.attribute.emplace_back(attribute);
- return std::make_shared<AttributesConfig>(builder);
-}
-
-std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<DocumenttypesConfig> document_types, std::shared_ptr<const DocumentTypeRepo> repo, const DocTypeName& doc_type_name)
-{
- auto indexschema = std::make_shared<IndexschemaConfig>();
- auto attributes = make_attributes_config();
- auto summary = std::make_shared<SummaryConfig>();
- std::shared_ptr<Schema> schema(new Schema());
- SchemaBuilder::build(*indexschema, *schema);
- SchemaBuilder::build(*attributes, *schema);
- SchemaBuilder::build(*summary, *schema);
- return std::make_shared<DocumentDBConfig>(
- 1,
- std::make_shared<RankProfilesConfig>(),
- std::make_shared<matching::RankingConstants>(),
- std::make_shared<matching::RankingExpressions>(),
- std::make_shared<matching::OnnxModels>(),
- indexschema,
- attributes,
- summary,
- std::make_shared<SummarymapConfig>(),
- std::make_shared<JuniperrcConfig>(),
- document_types,
- repo,
- std::make_shared<ImportedFieldsConfig>(),
- std::make_shared<TuneFileDocumentDB>(),
- schema,
- std::make_shared<DocumentDBMaintenanceConfig>(),
- search::LogDocumentStore::Config(),
- std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)),
- std::make_shared<const AllocConfig>(),
- "client",
- doc_type_name.getName());
-}
-
-void
-make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port)
-{
- SlobroksConfigBuilder::Slobrok slobrok;
- slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
- slobroks.slobrok.push_back(std::move(slobrok));
-}
-
-void
-make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces)
-{
- BucketspacesConfigBuilder::Documenttype bucket_space_map;
- bucket_space_map.name = "test";
- bucket_space_map.bucketspace = "default";
- bucketspaces.documenttype.emplace_back(std::move(bucket_space_map));
-}
-
-class MyPersistenceEngineOwner : public IPersistenceEngineOwner
-{
- void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { }
-};
-
-struct MyResourceWriteFilter : public IResourceWriteFilter
-{
- bool acceptWriteOperation() const override { return true; }
- State getAcceptState() const override { return IResourceWriteFilter::State(); }
-};
-
-class BucketSelector
-{
- uint32_t _thread_id;
- uint32_t _threads;
- uint32_t _num_buckets;
-public:
- BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in)
- : _thread_id(thread_id_in),
- _threads(threads_in),
- _num_buckets((num_buckets_in / _threads) * _threads)
- {
- }
- uint64_t operator()(uint32_t i) const {
- return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets;
- }
-};
-
-class BMRange
-{
- uint32_t _start;
- uint32_t _end;
-public:
- BMRange(uint32_t start_in, uint32_t end_in)
- : _start(start_in),
- _end(end_in)
- {
- }
- uint32_t get_start() const { return _start; }
- uint32_t get_end() const { return _end; }
-};
-
-class BMParams {
+class BMParams : public BmClusterParams {
uint32_t _documents;
uint32_t _client_threads;
uint32_t _get_passes;
- vespalib::string _indexing_sequencer;
uint32_t _put_passes;
uint32_t _update_passes;
uint32_t _remove_passes;
- uint32_t _rpc_network_threads;
- uint32_t _rpc_events_before_wakeup;
- uint32_t _rpc_targets_per_node;
- uint32_t _response_threads;
uint32_t _max_pending;
- bool _enable_distributor;
- bool _enable_service_layer;
- bool _skip_get_spi_bucket_info;
- bool _use_document_api;
- bool _use_message_bus;
- bool _use_storage_chain;
- bool _use_async_message_handling_on_schedule;
- uint32_t _bucket_db_stripe_bits;
- uint32_t _distributor_stripes;
- bool _skip_communicationmanager_thread;
uint32_t get_start(uint32_t thread_id) const {
return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads);
}
@@ -297,82 +74,39 @@ public:
: _documents(160000),
_client_threads(1),
_get_passes(0),
- _indexing_sequencer(),
_put_passes(2),
_update_passes(1),
_remove_passes(2),
- _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def
- _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def
- _rpc_targets_per_node(1), // Same default as in stor-communicationmanager.def
- _response_threads(2), // Same default as in stor-filestor.def
- _max_pending(1000),
- _enable_distributor(false),
- _enable_service_layer(false),
- _skip_get_spi_bucket_info(false),
- _use_document_api(false),
- _use_message_bus(false),
- _use_storage_chain(false),
- _use_async_message_handling_on_schedule(false),
- _bucket_db_stripe_bits(0),
- _distributor_stripes(0),
- _skip_communicationmanager_thread(false) // Same default as in stor-communicationmanager.def
+ _max_pending(1000)
{
}
- BMRange get_range(uint32_t thread_id) const {
- return BMRange(get_start(thread_id), get_start(thread_id + 1));
+ BmRange get_range(uint32_t thread_id) const {
+ return BmRange(get_start(thread_id), get_start(thread_id + 1));
}
uint32_t get_documents() const { return _documents; }
uint32_t get_max_pending() const { return _max_pending; }
uint32_t get_client_threads() const { return _client_threads; }
uint32_t get_get_passes() const { return _get_passes; }
- const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
uint32_t get_put_passes() const { return _put_passes; }
uint32_t get_update_passes() const { return _update_passes; }
uint32_t get_remove_passes() const { return _remove_passes; }
- uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
- uint32_t get_rpc_events_before_wakup() const { return _rpc_events_before_wakeup; }
- uint32_t get_rpc_targets_per_node() const { return _rpc_targets_per_node; }
- uint32_t get_response_threads() const { return _response_threads; }
- bool get_enable_distributor() const { return _enable_distributor; }
- bool get_skip_get_spi_bucket_info() const { return _skip_get_spi_bucket_info; }
- bool get_use_document_api() const { return _use_document_api; }
- bool get_use_message_bus() const { return _use_message_bus; }
- bool get_use_storage_chain() const { return _use_storage_chain; }
- bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; }
- uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; }
- uint32_t get_distributor_stripes() const { return _distributor_stripes; }
- bool get_skip_communicationmanager_thread() const { return _skip_communicationmanager_thread; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; }
void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; }
void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; }
- void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
- void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
- void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; }
- void set_rpc_targets_per_node(uint32_t targets_in) { _rpc_targets_per_node = targets_in; }
- void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
- void set_enable_distributor(bool value) { _enable_distributor = value; }
- void set_enable_service_layer(bool value) { _enable_service_layer = value; }
- void set_skip_get_spi_bucket_info(bool value) { _skip_get_spi_bucket_info = value; }
- void set_use_document_api(bool value) { _use_document_api = value; }
- void set_use_message_bus(bool value) { _use_message_bus = value; }
- void set_use_storage_chain(bool value) { _use_storage_chain = value; }
- void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; }
- void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; }
- void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; }
- void set_skip_communicationmanager_thread(bool value) { _skip_communicationmanager_thread = value; }
bool check() const;
- bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
- bool needs_distributor() const { return _enable_distributor || _use_document_api; }
- bool needs_message_bus() const { return _use_message_bus || _use_document_api; }
+ bool needs_message_bus() const { return get_use_message_bus() || get_use_document_api(); }
};
bool
BMParams::check() const
{
+ if (!BmClusterParams::check()) {
+ return false;
+ }
if (_client_threads < 1) {
std::cerr << "Too few client threads: " << _client_threads << std::endl;
return false;
@@ -389,533 +123,82 @@ BMParams::check() const
std::cerr << "Put passes too low: " << _put_passes << std::endl;
return false;
}
- if (_rpc_network_threads < 1) {
- std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl;
- return false;
- }
- if (_rpc_targets_per_node < 1) {
- std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl;
- return false;
- }
- if (_response_threads < 1) {
- std::cerr << "Too few response threads: " << _response_threads << std::endl;
- return false;
- }
return true;
}
-class MyServiceLayerProcess : public storage::ServiceLayerProcess {
- PersistenceProvider& _provider;
-
-public:
- MyServiceLayerProcess(const config::ConfigUri & configUri,
- PersistenceProvider &provider,
- std::unique_ptr<storage::IStorageChainBuilder> chain_builder);
- ~MyServiceLayerProcess() override { shutdown(); }
-
- void shutdown() override;
- void setupProvider() override;
- PersistenceProvider& getProvider() override;
-};
-
-MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri,
- PersistenceProvider &provider,
- std::unique_ptr<storage::IStorageChainBuilder> chain_builder)
- : ServiceLayerProcess(configUri),
- _provider(provider)
-{
- if (chain_builder) {
- set_storage_chain_builder(std::move(chain_builder));
- }
-}
-
-void
-MyServiceLayerProcess::shutdown()
-{
- ServiceLayerProcess::shutdown();
-}
-
-void
-MyServiceLayerProcess::setupProvider()
-{
-}
-
-PersistenceProvider&
-MyServiceLayerProcess::getProvider()
-{
- return _provider;
-}
-
-struct MyStorageConfig
-{
- vespalib::string config_id;
- DocumenttypesConfigBuilder documenttypes;
- StorDistributionConfigBuilder stor_distribution;
- StorBouncerConfigBuilder stor_bouncer;
- StorCommunicationmanagerConfigBuilder stor_communicationmanager;
- StorOpsloggerConfigBuilder stor_opslogger;
- StorPrioritymappingConfigBuilder stor_prioritymapping;
- UpgradingConfigBuilder upgrading;
- StorServerConfigBuilder stor_server;
- StorStatusConfigBuilder stor_status;
- BucketspacesConfigBuilder bucketspaces;
- MetricsmanagerConfigBuilder metricsmanager;
- SlobroksConfigBuilder slobroks;
- MessagebusConfigBuilder messagebus;
-
- MyStorageConfig(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
- int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params)
- : config_id(config_id_in),
- documenttypes(documenttypes_in),
- stor_distribution(),
- stor_bouncer(),
- stor_communicationmanager(),
- stor_opslogger(),
- stor_prioritymapping(),
- upgrading(),
- stor_server(),
- stor_status(),
- bucketspaces(),
- metricsmanager(),
- slobroks(),
- messagebus()
- {
- {
- auto &dc = stor_distribution;
- {
- StorDistributionConfigBuilder::Group group;
- {
- StorDistributionConfigBuilder::Group::Nodes node;
- node.index = 0;
- group.nodes.push_back(std::move(node));
- }
- group.index = "invalid";
- group.name = "invalid";
- group.capacity = 1.0;
- group.partitions = "";
- dc.group.push_back(std::move(group));
- }
- dc.redundancy = 1;
- dc.readyCopies = 1;
- }
- stor_server.isDistributor = distributor;
- stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits();
- if (distributor) {
- stor_server.rootFolder = "distributor";
- } else {
- stor_server.rootFolder = "storage";
- }
- make_slobroks_config(slobroks, slobrok_port);
- stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
- stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakup();
- stor_communicationmanager.rpc.numTargetsPerNode = params.get_rpc_targets_per_node();
- stor_communicationmanager.mbusport = mbus_port;
- stor_communicationmanager.rpcport = rpc_port;
- stor_communicationmanager.skipThread = params.get_skip_communicationmanager_thread();
-
- stor_status.httpport = status_port;
- make_bucketspaces_config(bucketspaces);
- }
-
- ~MyStorageConfig();
-
- void add_builders(ConfigSet &set) {
- set.addBuilder(config_id, &documenttypes);
- set.addBuilder(config_id, &stor_distribution);
- set.addBuilder(config_id, &stor_bouncer);
- set.addBuilder(config_id, &stor_communicationmanager);
- set.addBuilder(config_id, &stor_opslogger);
- set.addBuilder(config_id, &stor_prioritymapping);
- set.addBuilder(config_id, &upgrading);
- set.addBuilder(config_id, &stor_server);
- set.addBuilder(config_id, &stor_status);
- set.addBuilder(config_id, &bucketspaces);
- set.addBuilder(config_id, &metricsmanager);
- set.addBuilder(config_id, &slobroks);
- set.addBuilder(config_id, &messagebus);
- }
-};
-
-MyStorageConfig::~MyStorageConfig() = default;
-
-struct MyServiceLayerConfig : public MyStorageConfig
-{
- PersistenceConfigBuilder persistence;
- StorFilestorConfigBuilder stor_filestor;
- StorBucketInitConfigBuilder stor_bucket_init;
- StorVisitorConfigBuilder stor_visitor;
-
- MyServiceLayerConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
- int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params)
- : MyStorageConfig(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
- persistence(),
- stor_filestor(),
- stor_bucket_init(),
- stor_visitor()
- {
- stor_filestor.numResponseThreads = params.get_response_threads();
- stor_filestor.numNetworkThreads = params.get_rpc_network_threads();
- stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule();
- }
-
- ~MyServiceLayerConfig();
-
- void add_builders(ConfigSet &set) {
- MyStorageConfig::add_builders(set);
- set.addBuilder(config_id, &persistence);
- set.addBuilder(config_id, &stor_filestor);
- set.addBuilder(config_id, &stor_bucket_init);
- set.addBuilder(config_id, &stor_visitor);
- }
-};
-
-MyServiceLayerConfig::~MyServiceLayerConfig() = default;
-
-struct MyDistributorConfig : public MyStorageConfig
-{
- StorDistributormanagerConfigBuilder stor_distributormanager;
- StorVisitordispatcherConfigBuilder stor_visitordispatcher;
-
- MyDistributorConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
- int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params)
- : MyStorageConfig(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
- stor_distributormanager(),
- stor_visitordispatcher()
- {
- stor_distributormanager.numDistributorStripes = params.get_distributor_stripes();
- }
-
- ~MyDistributorConfig();
-
- void add_builders(ConfigSet &set) {
- MyStorageConfig::add_builders(set);
- set.addBuilder(config_id, &stor_distributormanager);
- set.addBuilder(config_id, &stor_visitordispatcher);
- }
-};
-
-MyDistributorConfig::~MyDistributorConfig() = default;
-
-struct MyRpcClientConfig {
- vespalib::string config_id;
- SlobroksConfigBuilder slobroks;
-
- MyRpcClientConfig(const vespalib::string &config_id_in, int slobrok_port)
- : config_id(config_id_in),
- slobroks()
- {
- make_slobroks_config(slobroks, slobrok_port);
- }
- ~MyRpcClientConfig();
-
- void add_builders(ConfigSet &set) {
- set.addBuilder(config_id, &slobroks);
- }
-};
-
-MyRpcClientConfig::~MyRpcClientConfig() = default;
-
-struct MyMessageBusConfig {
- vespalib::string config_id;
- SlobroksConfigBuilder slobroks;
- MessagebusConfigBuilder messagebus;
-
- MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port)
- : config_id(config_id_in),
- slobroks(),
- messagebus()
- {
- make_slobroks_config(slobroks, slobrok_port);
- }
- ~MyMessageBusConfig();
-
- void add_builders(ConfigSet &set) {
- set.addBuilder(config_id, &slobroks);
- set.addBuilder(config_id, &messagebus);
- }
-};
-
-MyMessageBusConfig::~MyMessageBusConfig() = default;
-
}
struct PersistenceProviderFixture {
- std::shared_ptr<DocumenttypesConfig> _document_types;
+ std::shared_ptr<const DocumenttypesConfig> _document_types;
std::shared_ptr<const DocumentTypeRepo> _repo;
- DocTypeName _doc_type_name;
- const DocumentType* _document_type;
- const Field& _field;
- std::shared_ptr<DocumentDBConfig> _document_db_config;
- vespalib::string _base_dir;
- DummyFileHeaderContext _file_header_context;
- int _tls_listen_port;
- int _slobrok_port;
- int _rpc_client_port;
- int _service_layer_mbus_port;
- int _service_layer_rpc_port;
- int _service_layer_status_port;
- int _distributor_mbus_port;
- int _distributor_rpc_port;
- int _distributor_status_port;
- TransLogServer _tls;
- vespalib::string _tls_spec;
- matching::QueryLimiter _query_limiter;
- vespalib::Clock _clock;
- DummyWireService _metrics_wire_service;
- MemoryConfigStores _config_stores;
- vespalib::ThreadStackExecutor _summary_executor;
- DummyDBOwner _document_db_owner;
- BucketSpace _bucket_space;
- std::shared_ptr<DocumentDB> _document_db;
- MyPersistenceEngineOwner _persistence_owner;
- MyResourceWriteFilter _write_filter;
- test::DiskMemUsageNotifier _disk_mem_usage_notifier;
- std::shared_ptr<PersistenceEngine> _persistence_engine;
- std::unique_ptr<const FieldSetRepo> _field_set_repo;
- uint32_t _bucket_bits;
- MyServiceLayerConfig _service_layer_config;
- MyDistributorConfig _distributor_config;
- MyRpcClientConfig _rpc_client_config;
- MyMessageBusConfig _message_bus_config;
- ConfigSet _config_set;
- std::shared_ptr<IConfigContext> _config_context;
- std::unique_ptr<IBmFeedHandler> _feed_handler;
- std::unique_ptr<mbus::Slobrok> _slobrok;
- std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context;
- std::unique_ptr<MyServiceLayerProcess> _service_layer;
- std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
- std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
- std::unique_ptr<storage::DistributorProcess> _distributor;
- std::unique_ptr<BmMessageBus> _message_bus;
+ std::unique_ptr<BmCluster> _bm_cluster;
+ std::unique_ptr<BmNode> _bm_node;
+ BmFeed _feed;
+ IBmFeedHandler* _feed_handler;
explicit PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
- void create_document_db(const BMParams & params);
- uint32_t num_buckets() const { return (1u << _bucket_bits); }
- BucketId make_bucket_id(uint32_t n) const { return BucketId(_bucket_bits, n & (num_buckets() - 1)); }
- document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); }
- DocumentId make_document_id(uint32_t n, uint32_t i) const;
- std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const;
- std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const;
void create_buckets();
void wait_slobrok(const vespalib::string &name);
- void start_service_layer(const BMParams& params);
- void start_distributor(const BMParams& params);
+ void start_service_layer(const BmClusterParams& params);
+ void start_distributor(const BmClusterParams& params);
void start_message_bus();
- void create_feed_handler(const BMParams& params);
+ void create_feed_handler(const BmClusterParams& params);
void shutdown_feed_handler();
void shutdown_message_bus();
void shutdown_distributor();
void shutdown_service_layer();
+ PersistenceProvider* get_persistence_provider() { return _bm_node->get_persistence_provider(); }
};
PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
- : _document_types(make_document_type()),
- _repo(DocumentTypeRepoFactory::make(*_document_types)),
- _doc_type_name("test"),
- _document_type(_repo->getDocumentType(_doc_type_name.getName())),
- _field(_document_type->getField("int")),
- _document_db_config(make_document_db_config(_document_types, _repo, _doc_type_name)),
- _base_dir(base_dir),
- _file_header_context(),
- _tls_listen_port(9017),
- _slobrok_port(9018),
- _rpc_client_port(9019),
- _service_layer_mbus_port(9020),
- _service_layer_rpc_port(9021),
- _service_layer_status_port(9022),
- _distributor_mbus_port(9023),
- _distributor_rpc_port(9024),
- _distributor_status_port(9025),
- _tls("tls", _tls_listen_port, _base_dir, _file_header_context),
- _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)),
- _query_limiter(),
- _clock(),
- _metrics_wire_service(),
- _config_stores(),
- _summary_executor(8, 128_Ki),
- _document_db_owner(),
- _bucket_space(makeBucketSpace(_doc_type_name.getName())),
- _document_db(),
- _persistence_owner(),
- _write_filter(),
- _disk_mem_usage_notifier(),
- _persistence_engine(),
- _field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)),
- _bucket_bits(16),
- _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
- _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
- _rpc_client_config("bm-rpc-client", _slobrok_port),
- _message_bus_config("bm-message-bus", _slobrok_port),
- _config_set(),
- _config_context(std::make_shared<ConfigContext>(_config_set)),
- _feed_handler(),
- _slobrok(),
- _service_layer_chain_context(),
- _service_layer(),
- _rpc_client_shared_rpc_resources(),
- _distributor_chain_context(),
- _distributor(),
- _message_bus()
-{
- create_document_db(params);
- _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false);
- auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db);
- _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
- _service_layer_config.add_builders(_config_set);
- _distributor_config.add_builders(_config_set);
- _rpc_client_config.add_builders(_config_set);
- _message_bus_config.add_builders(_config_set);
- _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info());
-}
-
-PersistenceProviderFixture::~PersistenceProviderFixture()
-{
- if (_persistence_engine) {
- _persistence_engine->destroyIterators();
- _persistence_engine->removeHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name);
- }
- if (_document_db) {
- _document_db->close();
- }
-}
-
-void
-PersistenceProviderFixture::create_document_db(const BMParams & params)
-{
- vespalib::mkdir(_base_dir, false);
- vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false);
- vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig";
- {
- FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName());
- fileCfg.saveConfig(*_document_db_config, 1);
- }
- config::DirSpec spec(input_cfg + "/config-1");
- auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>();
- DocumentDBConfigHelper mgr(spec, _doc_type_name.getName());
- auto protonCfg = std::make_shared<ProtonConfigBuilder>();
- if ( ! params.get_indexing_sequencer().empty()) {
- vespalib::string sequencer = params.get_indexing_sequencer();
- std::transform(sequencer.begin(), sequencer.end(), sequencer.begin(), [](unsigned char c){ return std::toupper(c); });
- protonCfg->indexing.optimize = ProtonConfig::Indexing::getOptimize(sequencer);
- }
- auto bootstrap_config = std::make_shared<BootstrapConfig>(1,
- _document_types,
- _repo,
- std::move(protonCfg),
- std::make_shared<FiledistributorrpcConfig>(),
- std::make_shared<BucketspacesConfig>(),
- tuneFileDocDB, HwInfo());
- mgr.forwardConfig(bootstrap_config);
- mgr.nextGeneration(0ms);
- _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name,
- _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner,
- _summary_executor, _summary_executor, *_persistence_engine, _tls,
- _metrics_wire_service, _file_header_context,
- _config_stores.getConfigStore(_doc_type_name.toString()),
- std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo());
- _document_db->start();
- _document_db->waitForOnlineState();
-}
-
-DocumentId
-PersistenceProviderFixture::make_document_id(uint32_t n, uint32_t i) const
+ : _document_types(make_document_types()),
+ _repo(document::DocumentTypeRepoFactory::make(*_document_types)),
+ _bm_cluster(std::make_unique<BmCluster>(params, _repo)),
+ _bm_node(BmNode::create(params, _document_types)),
+ _feed(_repo),
+ _feed_handler(nullptr)
{
- DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i));
- return id;
}
-std::unique_ptr<Document>
-PersistenceProviderFixture::make_document(uint32_t n, uint32_t i) const
-{
- auto id = make_document_id(n, i);
- auto document = std::make_unique<Document>(*_document_type, id);
- document->setRepo(*_repo);
- document->setFieldValue(_field, std::make_unique<IntFieldValue>(i));
- return document;
-}
-
-std::unique_ptr<DocumentUpdate>
-PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const
-{
- auto id = make_document_id(n, i);
- auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id);
- document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15))));
- return document_update;
-}
+PersistenceProviderFixture::~PersistenceProviderFixture() = default;
void
PersistenceProviderFixture::create_buckets()
{
- SpiBmFeedHandler feed_handler(*_persistence_engine, *_field_set_repo, false);
- for (unsigned int i = 0; i < num_buckets(); ++i) {
- feed_handler.create_bucket(make_bucket(i));
+ auto feed_handler = _bm_node->make_create_bucket_feed_handler(false);
+ for (unsigned int i = 0; i < _feed.num_buckets(); ++i) {
+ feed_handler->create_bucket(_feed.make_bucket(i));
}
}
void
PersistenceProviderFixture::wait_slobrok(const vespalib::string &name)
{
- auto &mirror = _rpc_client_shared_rpc_resources->slobrok_mirror();
- LOG(info, "Waiting for %s in slobrok", name.c_str());
- for (;;) {
- auto specs = mirror.lookup(name);
- if (!specs.empty()) {
- LOG(info, "Found %s in slobrok", name.c_str());
- return;
- }
- std::this_thread::sleep_for(100ms);
- }
+ _bm_cluster->wait_slobrok(name);
}
void
-PersistenceProviderFixture::start_service_layer(const BMParams& params)
+PersistenceProviderFixture::start_service_layer(const BmClusterParams& params)
{
- LOG(info, "start slobrok");
- _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
- LOG(info, "start service layer");
- config::ConfigUri config_uri("bm-servicelayer", _config_context);
- std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain() && !params.needs_distributor()) {
- chain_builder = std::make_unique<BmStorageChainBuilder>();
- _service_layer_chain_context = chain_builder->get_context();
- }
- _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
- *_persistence_engine,
- std::move(chain_builder));
- _service_layer->setupConfig(100ms);
- _service_layer->createNode();
- _service_layer->getNode().waitUntilInitialized();
- LOG(info, "start rpc client shared resources");
- config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
- _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>
- (client_config_uri, _rpc_client_port, 100, params.get_rpc_events_before_wakup());
- _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
+ _bm_cluster->start_slobrok();
+ _bm_node->start_service_layer(params);
+ _bm_node->wait_service_layer();
+ _bm_cluster->start_rpc_client();
wait_slobrok("storage/cluster.storage/storage/0/default");
wait_slobrok("storage/cluster.storage/storage/0");
- BmClusterController fake_controller(*_rpc_client_shared_rpc_resources);
+ BmClusterController fake_controller(_bm_cluster->get_rpc_client());
fake_controller.set_cluster_up(false);
}
void
-PersistenceProviderFixture::start_distributor(const BMParams& params)
+PersistenceProviderFixture::start_distributor(const BmClusterParams& params)
{
- config::ConfigUri config_uri("bm-distributor", _config_context);
- std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain() && !params.get_use_document_api()) {
- chain_builder = std::make_unique<BmStorageChainBuilder>();
- _distributor_chain_context = chain_builder->get_context();
- }
- _distributor = std::make_unique<storage::DistributorProcess>(config_uri);
- if (chain_builder) {
- _distributor->set_storage_chain_builder(std::move(chain_builder));
- }
- _distributor->setupConfig(100ms);
- _distributor->createNode();
+ _bm_node->start_distributor(params);
wait_slobrok("storage/cluster.storage/distributor/0/default");
wait_slobrok("storage/cluster.storage/distributor/0");
- BmClusterController fake_controller(*_rpc_client_shared_rpc_resources);
+ BmClusterController fake_controller(_bm_cluster->get_rpc_client());
fake_controller.set_cluster_up(true);
// Wait for bucket ownership transfer safe time
std::this_thread::sleep_for(2s);
@@ -924,102 +207,45 @@ PersistenceProviderFixture::start_distributor(const BMParams& params)
void
PersistenceProviderFixture::start_message_bus()
{
- config::ConfigUri config_uri("bm-message-bus", _config_context);
- LOG(info, "Starting message bus");
- _message_bus = std::make_unique<BmMessageBus>(config_uri, _repo);
- LOG(info, "Started message bus");
+ _bm_cluster->start_message_bus();
}
void
-PersistenceProviderFixture::create_feed_handler(const BMParams& params)
+PersistenceProviderFixture::create_feed_handler(const BmClusterParams& params)
{
- StorageApiRpcService::Params rpc_params;
- // This is the same compression config as the default in stor-communicationmanager.def.
- rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
- rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node();
- if (params.get_use_document_api()) {
- _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus);
- } else if (params.get_enable_distributor()) {
- if (params.get_use_storage_chain()) {
- assert(_distributor_chain_context);
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
- } else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true);
- } else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true);
- }
- } else if (params.needs_service_layer()) {
- if (params.get_use_storage_chain()) {
- assert(_service_layer_chain_context);
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
- } else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false);
- } else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false);
- }
- }
+ _bm_node->create_feed_handler(params, *_bm_cluster);
+ _feed_handler = _bm_node->get_feed_handler();
}
void
PersistenceProviderFixture::shutdown_feed_handler()
{
- _feed_handler.reset();
+ _bm_node->shutdown_feed_handler();
+ _feed_handler = nullptr;
}
void
PersistenceProviderFixture::shutdown_message_bus()
{
- if (_message_bus) {
- LOG(info, "stop message bus");
- _message_bus.reset();
- }
+ _bm_cluster->stop_message_bus();
}
void
PersistenceProviderFixture::shutdown_distributor()
{
- if (_distributor) {
- LOG(info, "stop distributor");
- _distributor->getNode().requestShutdown("controlled shutdown");
- _distributor->shutdown();
- }
+ _bm_node->shutdown_distributor();
}
void
PersistenceProviderFixture::shutdown_service_layer()
{
- if (_rpc_client_shared_rpc_resources) {
- LOG(info, "stop rpc client shared resources");
- _rpc_client_shared_rpc_resources->shutdown();
- _rpc_client_shared_rpc_resources.reset();
- }
- if (_service_layer) {
- LOG(info, "stop service layer");
- _service_layer->getNode().requestShutdown("controlled shutdown");
- _service_layer->shutdown();
- }
- if (_slobrok) {
- LOG(info, "stop slobrok");
- _slobrok.reset();
- }
-}
-
-vespalib::nbostream
-make_put_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
-{
- vespalib::nbostream serialized_feed;
- LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end());
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto n = bucket_selector(i);
- serialized_feed << f.make_bucket_id(n);
- auto document = f.make_document(n, i);
- document->serialize(serialized_feed);
- }
- return serialized_feed;
+ _bm_cluster->stop_rpc_client();
+ _bm_node->shutdown_service_layer();
+ _bm_cluster->stop_slobrok();
}
std::vector<vespalib::nbostream>
-make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label)
+make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label)
{
LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents());
std::vector<vespalib::nbostream> serialized_feed_v;
@@ -1038,27 +264,6 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st
return serialized_feed_v;
}
-void
-put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- f._feed_handler->attach_bucket_info_queue(pending_tracker);
- auto &repo = *f._repo;
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- auto bucket_space = f._bucket_space;
- bool use_timestamp = !f._feed_handler->manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- auto document = std::make_unique<Document>(repo, is);
- f._feed_handler->put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
class AvgSampler {
private:
double _total;
@@ -1077,73 +282,42 @@ void
run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { put_async_task(f, max_pending, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { feed.put_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput);
time_bias += bm_params.get_documents();
}
-vespalib::nbostream
-make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
-{
- vespalib::nbostream serialized_feed;
- LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto n = bucket_selector(i);
- serialized_feed << f.make_bucket_id(n);
- auto document_update = f.make_document_update(n, i);
- document_update->serializeHEAD(serialized_feed);
- }
- return serialized_feed;
-}
-
-void
-update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- f._feed_handler->attach_bucket_info_queue(pending_tracker);
- auto &repo = *f._repo;
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- auto bucket_space = f._bucket_space;
- bool use_timestamp = !f._feed_handler->manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- auto document_update = DocumentUpdate::createHEAD(repo, is);
- f._feed_handler->update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
void
run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { update_async_task(f, max_pending, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { feed.update_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput);
@@ -1151,94 +325,44 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
}
void
-get_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed)
-{
- LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- vespalib::string all_fields(document::AllFields::NAME);
- auto bucket_space = f._bucket_space;
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- DocumentId document_id(is);
- f._feed_handler->get(bucket, all_fields, document_id, pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
-void
run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]()
- { get_async_task(f, max_pending, range, serialized_feed); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]()
+ { feed.get_async_task(feed_handler, max_pending, range, serialized_feed); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput);
}
-vespalib::nbostream
-make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
-{
- vespalib::nbostream serialized_feed;
- LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto n = bucket_selector(i);
- serialized_feed << f.make_bucket_id(n);
- auto document_id = f.make_document_id(n, i);
- vespalib::string raw_id = document_id.toString();
- serialized_feed.write(raw_id.c_str(), raw_id.size() + 1);
- }
- return serialized_feed;
-}
-
-void
-remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- f._feed_handler->attach_bucket_info_queue(pending_tracker);
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- auto bucket_space = f._bucket_space;
- bool use_timestamp = !f._feed_handler->manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- DocumentId document_id(is);
- f._feed_handler->remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
void
run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { remove_async_task(f, max_pending, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { feed.remove_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput);
@@ -1310,10 +434,10 @@ void benchmark_async_spi(const BMParams &bm_params)
{
vespalib::rmdir(base_dir, true);
PersistenceProviderFixture f(bm_params);
- auto &provider = *f._persistence_engine;
+ auto &provider = *f.get_persistence_provider();
LOG(info, "start initialize");
provider.initialize();
- LOG(info, "create %u buckets", f.num_buckets());
+ LOG(info, "create %u buckets", f._feed.num_buckets());
if (!bm_params.needs_distributor()) {
f.create_buckets();
}
@@ -1328,9 +452,10 @@ void benchmark_async_spi(const BMParams &bm_params)
}
f.create_feed_handler(bm_params);
vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki);
- auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put");
- auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update");
- auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove");
+ auto& feed = f._feed;
+ auto put_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put");
+ auto update_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update");
+ auto remove_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove");
int64_t time_bias = 1;
LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str());
benchmark_async_put(f, executor, time_bias, put_feed, bm_params);