summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-22 11:26:09 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-22 16:26:31 +0200
commit9a15b63e3e9e95739f938a064fb26ddfd1f9ae84 (patch)
tree7a320d17e15de50de3eef8f7c12ea338ef017362 /searchcore
parent70e892af43a435cbb3ebb2cdfa05e28ad64e9a7f (diff)
Add service layer in benchmark.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt3
-rw-r--r--searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp416
2 files changed, 410 insertions, 9 deletions
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt
index e188bc16ec0..5b562cd1d28 100644
--- a/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt
@@ -20,5 +20,8 @@ vespa_add_executable(searchcore_vespa_spi_feed_bm_app
searchcore_grouping
searchcore_proton_metrics
searchcore_fconfig
+ storageserver_storageapp
+ messagebus_messagebus-test
+ messagebus
searchlib_searchlib_uca
)
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
index ecfa2a07cef..1bebcee8a1d 100644
--- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
@@ -11,6 +11,7 @@
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/repo/configbuilder.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/repo/document_type_repo_factory.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/update/assignvalueupdate.h>
#include <vespa/document/update/documentupdate.h>
@@ -37,6 +38,40 @@
#include <vespa/config-summary.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/fastos/app.h>
+#include <vespa/storage/bucketdb/config-stor-bucket-init.h>
+#include <vespa/storage/config/config-stor-bouncer.h>
+#include <vespa/storage/config/config-stor-communicationmanager.h>
+#include <vespa/storage/config/config-stor-opslogger.h>
+#include <vespa/storage/config/config-stor-prioritymapping.h>
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storage/config/config-stor-status.h>
+#include <vespa/storage/visiting/config-stor-visitor.h>
+#include <vespa/config-load-type.h>
+#include <vespa/config-persistence.h>
+#include <vespa/config-stor-distribution.h>
+#include <vespa/config-stor-filestor.h>
+#include <vespa/config-upgrading.h>
+#include <vespa/config-slobroks.h>
+#include <vespa/metrics/config-metricsmanager.h>
+#include <vespa/storageserver/app/servicelayerprocess.h>
+#include <vespa/storage/storageserver/storagenode.h>
+#include <vespa/messagebus/config-messagebus.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/storage/storageserver/message_dispatcher.h>
+#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
+#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
+#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/slobrok/sbmirror.h>
#include <getopt.h>
#include <iostream>
@@ -51,7 +86,28 @@ using namespace vespa::config::search::summary;
using namespace vespa::config::search;
using namespace std::chrono_literals;
using vespa::config::content::core::BucketspacesConfig;
+using vespa::config::content::core::BucketspacesConfigBuilder;
+using vespa::config::content::StorDistributionConfigBuilder;
+using vespa::config::content::StorFilestorConfigBuilder;
+using vespa::config::content::PersistenceConfigBuilder;
+using vespa::config::content::core::StorBouncerConfigBuilder;
+using vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
+using vespa::config::content::core::StorBucketInitConfigBuilder;
+using vespa::config::content::core::StorOpsloggerConfigBuilder;
+using vespa::config::content::core::StorPrioritymappingConfigBuilder;
+using vespa::config::content::LoadTypeConfigBuilder;
+using vespa::config::content::UpgradingConfigBuilder;
+using vespa::config::content::core::StorServerConfigBuilder;
+using vespa::config::content::core::StorStatusConfigBuilder;
+using vespa::config::content::core::StorVisitorConfigBuilder;
+using metrics::MetricsmanagerConfigBuilder;
+using cloud::config::SlobroksConfigBuilder;
+using messagebus::MessagebusConfigBuilder;
+using config::ConfigContext;
+using config::ConfigUri;
+using config::ConfigSet;
+using config::IConfigContext;
using document::AssignValueUpdate;
using document::BucketId;
using document::BucketSpace;
@@ -59,17 +115,23 @@ using document::Document;
using document::DocumentId;
using document::DocumentType;
using document::DocumentTypeRepo;
+using document::DocumentTypeRepoFactory;
using document::DocumenttypesConfig;
+using document::DocumenttypesConfigBuilder;
using document::DocumentUpdate;
using document::Field;
using document::FieldUpdate;
using document::IntFieldValue;
using document::test::makeBucketSpace;
+using documentapi::LoadTypeSet;
using search::TuneFileDocumentDB;
using search::index::DummyFileHeaderContext;
using search::index::Schema;
using search::index::SchemaBuilder;
using search::transactionlog::TransLogServer;
+using storage::rpc::MessageCodecProvider;
+using storage::rpc::SharedRpcResources;
+using storage::rpc::StorageApiRpcService;
using storage::spi::Bucket;
using storage::spi::PartitionId;
using storage::spi::PersistenceProvider;
@@ -242,6 +304,7 @@ class BMParams {
uint32_t _put_passes;
uint32_t _update_passes;
uint32_t _remove_passes;
+ bool _enable_service_layer;
uint32_t get_start(uint32_t thread_id) const {
return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads);
}
@@ -251,7 +314,8 @@ public:
_threads(32),
_put_passes(2),
_update_passes(1),
- _remove_passes(2)
+ _remove_passes(2),
+ _enable_service_layer(false)
{
}
BMRange get_range(uint32_t thread_id) const {
@@ -262,11 +326,13 @@ public:
uint32_t get_put_passes() const { return _put_passes; }
uint32_t get_update_passes() const { return _update_passes; }
uint32_t get_remove_passes() const { return _remove_passes; }
+ bool get_enable_service_layer() const { return _enable_service_layer; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_threads(uint32_t threads_in) { _threads = threads_in; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
+ void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
bool check() const;
};
@@ -292,8 +358,230 @@ BMParams::check() const
return true;
}
+
+class MyServiceLayerProcess : public storage::ServiceLayerProcess {
+ PersistenceProvider& _provider;
+
+public:
+ MyServiceLayerProcess(const config::ConfigUri & configUri,
+ PersistenceProvider &provider);
+ ~MyServiceLayerProcess() override { shutdown(); }
+
+ void shutdown() override;
+ void setupProvider() override;
+ PersistenceProvider& getProvider() override;
+};
+
+MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri,
+ PersistenceProvider &provider)
+ : ServiceLayerProcess(configUri),
+ _provider(provider)
+{
+}
+
+void
+MyServiceLayerProcess::shutdown()
+{
+ ServiceLayerProcess::shutdown();
+}
+
+void
+MyServiceLayerProcess::setupProvider()
+{
+}
+
+PersistenceProvider&
+MyServiceLayerProcess::getProvider()
+{
+ return _provider;
+}
+
+struct MyStorageConfig
+{
+ vespalib::string config_id;
+ DocumenttypesConfigBuilder documenttypes;
+ PersistenceConfigBuilder persistence;
+ StorDistributionConfigBuilder stor_distribution;
+ StorFilestorConfigBuilder stor_filestor;
+ StorBouncerConfigBuilder stor_bouncer;
+ StorCommunicationmanagerConfigBuilder stor_communicationmanager;
+ StorBucketInitConfigBuilder stor_bucket_init;
+ StorOpsloggerConfigBuilder stor_opslogger;
+ StorPrioritymappingConfigBuilder stor_prioritymapping;
+ UpgradingConfigBuilder upgrading;
+ StorServerConfigBuilder stor_server;
+ StorStatusConfigBuilder stor_status;
+ StorVisitorConfigBuilder stor_visitor;
+ BucketspacesConfigBuilder bucketspaces;
+ LoadTypeConfigBuilder load_type;
+ MetricsmanagerConfigBuilder metricsmanager;
+ SlobroksConfigBuilder slobroks;
+ MessagebusConfigBuilder messagebus;
+
+ MyStorageConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int status_port)
+ : config_id(config_id_in),
+ documenttypes(documenttypes_in),
+ persistence(),
+ stor_distribution(),
+ stor_filestor(),
+ stor_bouncer(),
+ stor_communicationmanager(),
+ stor_bucket_init(),
+ stor_opslogger(),
+ stor_prioritymapping(),
+ upgrading(),
+ stor_server(),
+ stor_status(),
+ stor_visitor(),
+ bucketspaces(),
+ load_type(),
+ metricsmanager(),
+ slobroks(),
+ messagebus()
+ {
+ {
+ auto &dc = stor_distribution;
+ {
+ StorDistributionConfigBuilder::Group group;
+ {
+ StorDistributionConfigBuilder::Group::Nodes node;
+ node.index = 0;
+ group.nodes.push_back(std::move(node));
+ }
+ group.index = "invalid";
+ group.name = "invalid";
+ group.capacity = 1.0;
+ group.partitions = "";
+ dc.group.push_back(std::move(group));
+ }
+ dc.redundancy = 1;
+ dc.readyCopies = 1;
+ }
+ stor_server.rootFolder = "storage";
+ {
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+ }
+ stor_communicationmanager.useDirectStorageapiRpc = true;
+ stor_status.httpport = status_port;
+ }
+
+ ~MyStorageConfig();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &documenttypes);
+ set.addBuilder(config_id, &persistence);
+ set.addBuilder(config_id, &stor_distribution);
+ set.addBuilder(config_id, &stor_filestor);
+ set.addBuilder(config_id, &stor_bouncer);
+ set.addBuilder(config_id, &stor_communicationmanager);
+ set.addBuilder(config_id, &stor_bucket_init);
+ set.addBuilder(config_id, &stor_opslogger);
+ set.addBuilder(config_id, &stor_prioritymapping);
+ set.addBuilder(config_id, &upgrading);
+ set.addBuilder(config_id, &stor_server);
+ set.addBuilder(config_id, &stor_status);
+ set.addBuilder(config_id, &stor_visitor);
+ set.addBuilder(config_id, &bucketspaces);
+ set.addBuilder(config_id, &load_type);
+ set.addBuilder(config_id, &metricsmanager);
+ set.addBuilder(config_id, &slobroks);
+ set.addBuilder(config_id, &messagebus);
+ }
+};
+
+MyStorageConfig::~MyStorageConfig() = default;
+
+struct MyRpcClientConfig {
+ vespalib::string config_id;
+ SlobroksConfigBuilder slobroks;
+
+ MyRpcClientConfig(const vespalib::string &config_id_in, int slobrok_port)
+ : config_id(config_id_in),
+ slobroks()
+ {
+ {
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+ }
+ }
+ ~MyRpcClientConfig();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &slobroks);
+ }
+};
+
+MyRpcClientConfig::~MyRpcClientConfig() = default;
+
+class MyMessageDispatcher : public storage::MessageDispatcher
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, MyPendingTracker *> _pending;
+public:
+ MyMessageDispatcher()
+ : storage::MessageDispatcher(),
+ _mutex(),
+ _pending()
+ {
+ }
+ ~MyMessageDispatcher() override;
+ void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override {
+ release(msg->getMsgId());
+ }
+ void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override {
+ release(msg->getMsgId());
+ }
+ void retain(uint64_t msg_id, MyPendingTracker &tracker) {
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+ }
+ void release(uint64_t msg_id) {
+ MyPendingTracker *tracker = nullptr;
+ {
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ assert(itr != _pending.end());
+ tracker = itr->second;
+ _pending.erase(itr);
+ }
+ tracker->release();
+ }
+};
+
+MyMessageDispatcher::~MyMessageDispatcher()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
}
+FRT_RPCRequest *make_set_cluster_state_request() {
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
+ storage::rpc::SlimeClusterStateBundleCodec codec;
+ auto encoded_bundle = codec.encode(bundle);
+ auto *req = new FRT_RPCRequest();
+ auto* params = req->GetParams();
+ params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
+ params->AddInt32(encoded_bundle._uncompressed_length);
+ const auto buf_len = encoded_bundle._buffer->getDataLen();
+ params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len);
+ req->SetMethodName("setdistributionstates");
+ return req;
+}
+
+void set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) {
+ auto req = make_set_cluster_state_request();
+ auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory());
+ auto target = target_resolver->resolve_rpc_target(storage_address);
+ target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
+ assert(!req->IsError());
+ req->SubRef();
+}
+
+}
struct PersistenceProviderFixture {
std::shared_ptr<DocumenttypesConfig> _document_types;
@@ -305,6 +593,9 @@ struct PersistenceProviderFixture {
vespalib::string _base_dir;
DummyFileHeaderContext _file_header_context;
int _tls_listen_port;
+ int _slobrok_port;
+ int _status_port;
+ int _rpc_client_port;
TransLogServer _tls;
vespalib::string _tls_spec;
matching::QueryLimiter _query_limiter;
@@ -320,6 +611,17 @@ struct PersistenceProviderFixture {
std::shared_ptr<PersistenceEngine> _persistence_engine;
storage::spi::Context _context;
uint32_t _bucket_bits;
+ MyStorageConfig _service_layer_config;
+ MyRpcClientConfig _rpc_client_config;
+ ConfigSet _config_set;
+ std::shared_ptr<IConfigContext> _config_context;
+ storage::api::StorageMessageAddress _storage_address;
+ std::unique_ptr<mbus::Slobrok> _slobrok;
+ std::unique_ptr<MyServiceLayerProcess> _service_layer;
+ std::unique_ptr<MessageCodecProvider> _message_codec_provider;
+ std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
+ std::unique_ptr<MyMessageDispatcher> _rpc_message_dispatcher;
+ std::unique_ptr<StorageApiRpcService> _rpc_client;
PersistenceProviderFixture();
~PersistenceProviderFixture();
@@ -331,11 +633,14 @@ struct PersistenceProviderFixture {
std::unique_ptr<Document> make_document(uint32_t i) const;
std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const;
void create_buckets();
+ void start_service_layer();
+ void shutdown_service_layer();
+ void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, MyPendingTracker& pending_tracker);
};
PersistenceProviderFixture::PersistenceProviderFixture()
: _document_types(make_document_type()),
- _repo(std::make_shared<DocumentTypeRepo>(*_document_types)),
+ _repo(DocumentTypeRepoFactory::make(*_document_types)),
_doc_type_name("test"),
_document_type(_repo->getDocumentType(_doc_type_name.getName())),
_field(_document_type->getField("int")),
@@ -343,6 +648,9 @@ PersistenceProviderFixture::PersistenceProviderFixture()
_base_dir(base_dir),
_file_header_context(),
_tls_listen_port(9017),
+ _slobrok_port(9018),
+ _status_port(9019),
+ _rpc_client_port(9020),
_tls("tls", _tls_listen_port, _base_dir, _file_header_context),
_tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)),
_query_limiter(),
@@ -357,12 +665,25 @@ PersistenceProviderFixture::PersistenceProviderFixture()
_write_filter(),
_persistence_engine(),
_context(default_load_type, Priority(0), Trace::TraceLevel(0)),
- _bucket_bits(16)
+ _bucket_bits(16),
+ _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port),
+ _rpc_client_config("bm-rpc-client", _slobrok_port),
+ _config_set(),
+ _config_context(std::make_shared<ConfigContext>(_config_set)),
+ _storage_address("storage", storage::lib::NodeType::STORAGE, 0),
+ _slobrok(),
+ _service_layer(),
+ _message_codec_provider(),
+ _rpc_client_shared_rpc_resources(),
+ _rpc_message_dispatcher(),
+ _rpc_client()
{
create_document_db();
_persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false);
auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db);
_persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
+ _service_layer_config.add_builders(_config_set);
+ _rpc_client_config.add_builders(_config_set);
}
PersistenceProviderFixture::~PersistenceProviderFixture()
@@ -454,6 +775,58 @@ PersistenceProviderFixture::create_buckets()
}
}
+void
+PersistenceProviderFixture::start_service_layer()
+{
+ LOG(info, "start slobrok");
+ _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
+ LOG(info, "start service layer");
+ config::ConfigUri config_uri("bm-servicelayer", _config_context);
+ _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
+ *_persistence_engine);
+ _service_layer->setupConfig(100ms);
+ _service_layer->createNode();
+ _service_layer->getNode().waitUntilInitialized();
+ _message_codec_provider = std::make_unique<MessageCodecProvider>(_repo, std::make_shared<documentapi::LoadTypeSet>());
+ LOG(info, "start rpc client shared resources");
+ config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
+ _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100);
+ _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
+ _rpc_message_dispatcher = std::make_unique<MyMessageDispatcher>();
+ _rpc_client = std::make_unique<StorageApiRpcService>(*_rpc_message_dispatcher, *_rpc_client_shared_rpc_resources, *_message_codec_provider, StorageApiRpcService::Params());
+ set_cluster_up(*_rpc_client_shared_rpc_resources, _storage_address);
+}
+
+void
+PersistenceProviderFixture::shutdown_service_layer()
+{
+ _rpc_client.reset();
+ _rpc_message_dispatcher.reset();
+ if (_rpc_client_shared_rpc_resources) {
+ LOG(info, "stop rpc client shared resources");
+ _rpc_client_shared_rpc_resources->shutdown();
+ _rpc_client_shared_rpc_resources.reset();
+ }
+ if (_service_layer) {
+ LOG(info, "stop service layer");
+ _service_layer->getNode().requestShutdown("controlled shutdown");
+ _service_layer->shutdown();
+ }
+ if (_slobrok) {
+ LOG(info, "stop slobrok");
+ _slobrok.reset();
+ }
+}
+
+void
+PersistenceProviderFixture::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, MyPendingTracker& pending_tracker)
+{
+ cmd->setSourceIndex(0);
+ cmd->setAddress(_storage_address);
+ _rpc_message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
+ _rpc_client->send_rpc_v1_request(std::move(cmd));
+}
+
vespalib::nbostream
make_put_feed(PersistenceProviderFixture &f, BMRange range)
{
@@ -501,7 +874,12 @@ put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbo
is >> bucket_id;
Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
auto document = std::make_unique<Document>(repo, is);
- provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker));
+ if (f._rpc_client) {
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket.getBucket(), std::move(document), time_bias + i);
+ f.send_rpc(std::move(cmd), pending_tracker);
+ } else {
+ provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker));
+ }
}
assert(is.empty());
pending_tracker.drain();
@@ -552,7 +930,12 @@ update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::
is >> bucket_id;
Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
auto document_update = DocumentUpdate::createHEAD(repo, is);
- provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker));
+ if (f._rpc_client) {
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket.getBucket(), std::move(document_update), time_bias + i);
+ f.send_rpc(std::move(cmd), pending_tracker);
+ } else {
+ provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker));
+ }
}
assert(is.empty());
pending_tracker.drain();
@@ -603,7 +986,12 @@ remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::
is >> bucket_id;
Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
DocumentId document_id(is);
- provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker));
+ if (f._rpc_client) {
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket.getBucket(), document_id, time_bias + i);
+ f.send_rpc(std::move(cmd), pending_tracker);
+ } else {
+ provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker));
+ }
}
assert(is.empty());
pending_tracker.drain();
@@ -635,6 +1023,9 @@ void benchmark_async_spi(const BMParams &bm_params)
provider.initialize();
LOG(info, "create %u buckets", f.num_buckets());
f.create_buckets();
+ if (bm_params.get_enable_service_layer()) {
+ f.start_service_layer();
+ }
vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024);
auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put");
auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update");
@@ -649,6 +1040,7 @@ void benchmark_async_spi(const BMParams &bm_params)
for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) {
run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params);
}
+ f.shutdown_service_layer();
}
class App : public FastOS_Application
@@ -682,7 +1074,8 @@ App::usage()
"[--documents documents]\n"
"[--put-passes put-passes]\n"
"[--update-passes update-passes]\n"
- "[--remove-passes remove-passes]" << std::endl;
+ "[--remove-passes remove-passes]\n"
+ "[--enable-service-layer]" << std::endl;
}
bool
@@ -696,14 +1089,16 @@ App::get_options()
{ "documents", 1, nullptr, 0 },
{ "put-passes", 1, nullptr, 0 },
{ "update-passes", 1, nullptr, 0 },
- { "remove-passes", 1, nullptr, 0 }
+ { "remove-passes", 1, nullptr, 0 },
+ { "enable-service-layer", 0, nullptr, 0 }
};
enum longopts_enum {
LONGOPT_THREADS,
LONGOPT_DOCUMENTS,
LONGOPT_PUT_PASSES,
LONGOPT_UPDATE_PASSES,
- LONGOPT_REMOVE_PASSES
+ LONGOPT_REMOVE_PASSES,
+ LONGOPT_ENABLE_SERVICE_LAYER
};
int opt_index = 1;
resetOptIndex(opt_index);
@@ -726,6 +1121,9 @@ App::get_options()
case LONGOPT_REMOVE_PASSES:
_bm_params.set_remove_passes(atoi(opt_argument));
break;
+ case LONGOPT_ENABLE_SERVICE_LAYER:
+ _bm_params.set_enable_service_layer(true);
+ break;
default:
return false;
}