summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-28 13:47:32 +0200
committerGitHub <noreply@github.com>2020-09-28 13:47:32 +0200
commitcef539adea8b1ad6ba2314722c699a0a412082b7 (patch)
treeafbee578314c1df500874bd23ee36f36e5fe5e86 /searchcore
parent37d3b000d7a5c80274358ead957bd2d4045a124b (diff)
parent646120d32e342f8ce25ca72fafd216978ca7412d (diff)
Merge pull request #14572 from vespa-engine/toregge/pass-storage-api-messages-directly-to-storage-chain
Add option to feed using storage api protocol directly on storage chain.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt1
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp185
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h34
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp52
4 files changed, 260 insertions, 12 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index 1d031e2ea30..4ced3fe173b 100644
--- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_executable(searchcore_vespa_feed_bm_app
vespa_feed_bm.cpp
spi_bm_feed_handler.cpp
storage_api_rpc_bm_feed_handler.cpp
+ storage_api_chain_bm_feed_handler.cpp
OUTPUT_NAME vespa-feed-bm
DEPENDS
searchcore_server
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
new file mode 100644
index 00000000000..6f1acd10fe4
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
@@ -0,0 +1,185 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_api_chain_bm_feed_handler.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storage/common/storage_chain_builder.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <cassert>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::StorageLink;
+
+namespace feedbm {
+
+namespace {
+
+std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() {
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
+ auto cmd = std::make_shared<storage::api::SetSystemStateCommand>(bundle);
+ cmd->setPriority(storage::api::StorageMessage::VERYHIGH);
+ return cmd;
+}
+
+}
+
+class BmStorageLink : public StorageLink
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+public:
+ BmStorageLink();
+ ~BmStorageLink() override;
+ bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
+ bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
+ void retain(uint64_t msg_id, PendingTracker &tracker) {
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+ }
+ bool release(uint64_t msg_id) {
+ PendingTracker *tracker = nullptr;
+ {
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ if (itr == _pending.end()) {
+ return false;
+ }
+ tracker = itr->second;
+ _pending.erase(itr);
+ }
+ tracker->release();
+ return true;
+ }
+};
+
+BmStorageLink::BmStorageLink()
+ : storage::StorageLink("vespa-bm-feed"),
+ _mutex(),
+ _pending()
+{
+}
+
+BmStorageLink::~BmStorageLink()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
+}
+
+bool
+BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
+{
+ (void) msg;
+ return false;
+}
+
+bool
+BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg)
+{
+ return release(msg->getMsgId());
+}
+
+struct StorageApiChainBmFeedHandler::Context {
+ BmStorageLink* bm_link;
+ Context()
+ : bm_link(nullptr)
+ {
+ }
+ ~Context() = default;
+};
+
+class MyStorageChainBuilder : public storage::StorageChainBuilder
+{
+ using Parent = storage::StorageChainBuilder;
+ std::shared_ptr<StorageApiChainBmFeedHandler::Context> _context;
+public:
+ MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context);
+ ~MyStorageChainBuilder() override;
+ void add(std::unique_ptr<StorageLink> link) override;
+};
+
+MyStorageChainBuilder::MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context)
+ : storage::StorageChainBuilder(),
+ _context(std::move(context))
+{
+}
+
+MyStorageChainBuilder::~MyStorageChainBuilder() = default;
+
+void
+MyStorageChainBuilder::add(std::unique_ptr<StorageLink> link)
+{
+ vespalib::string name = link->getName();
+ Parent::add(std::move(link));
+ if (name == "Communication manager") {
+ auto my_link = std::make_unique<BmStorageLink>();
+ _context->bm_link = my_link.get();
+ Parent::add(std::move(my_link));
+ }
+}
+
+StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<Context> context)
+ : IBmFeedHandler(),
+ _context(std::move(context))
+{
+ auto cmd = make_set_cluster_state_cmd();
+ PendingTracker tracker(1);
+ send_msg(std::move(cmd), tracker);
+ tracker.drain();
+}
+
+StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default;
+
+void
+StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+{
+ cmd->setSourceIndex(0);
+ auto bm_link = _context->bm_link;
+ bm_link->retain(cmd->getMsgId(), pending_tracker);
+ bm_link->sendDown(std::move(cmd));
+}
+
+void
+StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+std::shared_ptr<StorageApiChainBmFeedHandler::Context>
+StorageApiChainBmFeedHandler::get_context()
+{
+ return std::make_shared<Context>();
+}
+
+std::unique_ptr<storage::IStorageChainBuilder>
+StorageApiChainBmFeedHandler::get_storage_chain_builder(std::shared_ptr<Context> context)
+{
+ return std::make_unique<MyStorageChainBuilder>(std::move(context));
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
new file mode 100644
index 00000000000..521deddd19e
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
@@ -0,0 +1,34 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace storage { class IStorageChainBuilder; }
+namespace storage::api { class StorageCommand; }
+
+namespace feedbm {
+
+/*
+ * Benchmark feed handler for feed to service layer using storage api protocol
+ * directly on the storage chain.
+ */
+class StorageApiChainBmFeedHandler : public IBmFeedHandler
+{
+public:
+ struct Context;
+private:
+ std::shared_ptr<Context> _context;
+ void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+public:
+ StorageApiChainBmFeedHandler(std::shared_ptr<Context> context);
+ ~StorageApiChainBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+
+ static std::shared_ptr<Context> get_context();
+ static std::unique_ptr<storage::IStorageChainBuilder> get_storage_chain_builder(std::shared_ptr<Context> context);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 4208e9448ff..73d741c3fee 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -3,6 +3,7 @@
#include "pending_tracker.h"
#include "spi_bm_feed_handler.h"
#include "storage_api_rpc_bm_feed_handler.h"
+#include "storage_api_chain_bm_feed_handler.h"
#include <vespa/vespalib/testkit/testapp.h>
#include <tests/proton/common/dummydbowner.h>
#include <vespa/config-imported-fields.h>
@@ -56,6 +57,7 @@
#include <vespa/config-slobroks.h>
#include <vespa/metrics/config-metricsmanager.h>
#include <vespa/storageserver/app/servicelayerprocess.h>
+#include <vespa/storage/common/i_storage_chain_builder.h>
#include <vespa/storage/storageserver/storagenode.h>
#include <vespa/messagebus/config-messagebus.h>
#include <vespa/messagebus/testlib/slobrok.h>
@@ -122,6 +124,7 @@ using vespalib::makeLambdaTask;
using feedbm::IBmFeedHandler;
using feedbm::SpiBmFeedHandler;
using feedbm::StorageApiRpcBmFeedHandler;
+using feedbm::StorageApiChainBmFeedHandler;
using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>;
@@ -209,6 +212,7 @@ class BMParams {
uint32_t _remove_passes;
uint32_t _rpc_network_threads;
bool _enable_service_layer;
+ bool _use_storage_chain;
uint32_t get_start(uint32_t thread_id) const {
return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads);
}
@@ -220,7 +224,8 @@ public:
_update_passes(1),
_remove_passes(2),
_rpc_network_threads(1),
- _enable_service_layer(false)
+ _enable_service_layer(false),
+ _use_storage_chain(false)
{
}
BMRange get_range(uint32_t thread_id) const {
@@ -233,6 +238,7 @@ public:
uint32_t get_remove_passes() const { return _remove_passes; }
uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
bool get_enable_service_layer() const { return _enable_service_layer; }
+ bool get_use_storage_chain() const { return _use_storage_chain; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_threads(uint32_t threads_in) { _threads = threads_in; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
@@ -240,6 +246,7 @@ public:
void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
+ void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; }
bool check() const;
};
@@ -269,13 +276,13 @@ BMParams::check() const
return true;
}
-
class MyServiceLayerProcess : public storage::ServiceLayerProcess {
PersistenceProvider& _provider;
public:
MyServiceLayerProcess(const config::ConfigUri & configUri,
- PersistenceProvider &provider);
+ PersistenceProvider &provider,
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder);
~MyServiceLayerProcess() override { shutdown(); }
void shutdown() override;
@@ -284,10 +291,14 @@ public:
};
MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri,
- PersistenceProvider &provider)
+ PersistenceProvider &provider,
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder)
: ServiceLayerProcess(configUri),
_provider(provider)
{
+ if (chain_builder) {
+ set_storage_chain_builder(std::move(chain_builder));
+ }
}
void
@@ -476,7 +487,7 @@ struct PersistenceProviderFixture {
std::unique_ptr<Document> make_document(uint32_t i) const;
std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const;
void create_buckets();
- void start_service_layer();
+ void start_service_layer(bool use_storage_chain);
void shutdown_service_layer();
};
@@ -615,14 +626,21 @@ PersistenceProviderFixture::create_buckets()
}
void
-PersistenceProviderFixture::start_service_layer()
+PersistenceProviderFixture::start_service_layer(bool use_storage_chain)
{
LOG(info, "start slobrok");
_slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
LOG(info, "start service layer");
config::ConfigUri config_uri("bm-servicelayer", _config_context);
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder;
+ std::shared_ptr<StorageApiChainBmFeedHandler::Context> context;
+ if (use_storage_chain) {
+ context = StorageApiChainBmFeedHandler::get_context();
+ chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(context);
+ }
_service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
- *_persistence_engine);
+ *_persistence_engine,
+ std::move(chain_builder));
_service_layer->setupConfig(100ms);
_service_layer->createNode();
_service_layer->getNode().waitUntilInitialized();
@@ -630,7 +648,11 @@ PersistenceProviderFixture::start_service_layer()
config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
_rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100);
_rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo);
+ if (use_storage_chain) {
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(context));
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo);
+ }
}
void
@@ -829,7 +851,7 @@ void benchmark_async_spi(const BMParams &bm_params)
LOG(info, "create %u buckets", f.num_buckets());
f.create_buckets();
if (bm_params.get_enable_service_layer()) {
- f.start_service_layer();
+ f.start_service_layer(bm_params.get_use_storage_chain());
}
vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024);
auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put");
@@ -881,7 +903,8 @@ App::usage()
"[--update-passes update-passes]\n"
"[--remove-passes remove-passes]\n"
"[--rpc-network-threads threads]\n"
- "[--enable-service-layer]" << std::endl;
+ "[--enable-service-layer]\n"
+ "[--use-storage-chain]" << std::endl;
}
bool
@@ -897,7 +920,8 @@ App::get_options()
{ "update-passes", 1, nullptr, 0 },
{ "remove-passes", 1, nullptr, 0 },
{ "rpc-network-threads", 1, nullptr, 0 },
- { "enable-service-layer", 0, nullptr, 0 }
+ { "enable-service-layer", 0, nullptr, 0 },
+ { "use-storage-chain", 0, nullptr, 0 }
};
enum longopts_enum {
LONGOPT_THREADS,
@@ -906,7 +930,8 @@ App::get_options()
LONGOPT_UPDATE_PASSES,
LONGOPT_REMOVE_PASSES,
LONGOPT_RPC_NETWORK_THREADS,
- LONGOPT_ENABLE_SERVICE_LAYER
+ LONGOPT_ENABLE_SERVICE_LAYER,
+ LONGOPT_USE_STORAGE_CHAIN
};
int opt_index = 1;
resetOptIndex(opt_index);
@@ -935,6 +960,9 @@ App::get_options()
case LONGOPT_ENABLE_SERVICE_LAYER:
_bm_params.set_enable_service_layer(true);
break;
+ case LONGOPT_USE_STORAGE_CHAIN:
+ _bm_params.set_use_storage_chain(true);
+ break;
default:
return false;
}