summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-24 16:15:22 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-24 16:52:18 +0200
commitee92e3bd735b67fd839029dabbd6eebd2b56e42d (patch)
tree3cbcbbeb927b24490f20b5ea08d8b24b4ad957e7 /searchcore
parent039b0138aeeefaf77100dd15ba85c7024b7edfcc (diff)
Factor out feed handlers for benchmark feed.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt2
-rw-r--r--searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h30
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker.h57
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp93
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h26
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp145
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h43
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp252
8 files changed, 420 insertions, 228 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index 9fa47c77b03..1d031e2ea30 100644
--- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -2,6 +2,8 @@
vespa_add_executable(searchcore_vespa_feed_bm_app
SOURCES
vespa_feed_bm.cpp
+ spi_bm_feed_handler.cpp
+ storage_api_rpc_bm_feed_handler.cpp
OUTPUT_NAME vespa-feed-bm
DEPENDS
searchcore_server
diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
new file mode 100644
index 00000000000..a3341bf14c9
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
@@ -0,0 +1,30 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+
+namespace document {
+class Bucket;
+class Document;
+class DocumentUpdate;
+class DocumentId;
+}
+
+namespace feedbm {
+
+class PendingTracker;
+
+/*
+ * Interface class for benchmark feed handler.
+ */
+class IBmFeedHandler
+{
+public:
+ virtual ~IBmFeedHandler() = default;
+ virtual void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) = 0;
+ virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0;
+ virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h
new file mode 100644
index 00000000000..3698832068f
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h
@@ -0,0 +1,57 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+
+namespace feedbm {
+
+/*
+ * Class to track number of pending operations, used as backpressure during
+ * benchmark feeding.
+ */
+class PendingTracker {
+ uint32_t _pending;
+ uint32_t _limit;
+ std::mutex _mutex;
+ std::condition_variable _cond;
+
+public:
+ PendingTracker(uint32_t limit)
+ : _pending(0u),
+ _limit(limit),
+ _mutex(),
+ _cond()
+ {
+ }
+
+ ~PendingTracker()
+ {
+ drain();
+ }
+
+ void release() {
+ std::unique_lock<std::mutex> guard(_mutex);
+ --_pending;
+ if (_pending < _limit) {
+ _cond.notify_all();
+ }
+ }
+ void retain() {
+ std::unique_lock<std::mutex> guard(_mutex);
+ while (_pending >= _limit) {
+ _cond.wait(guard);
+ }
+ ++_pending;
+ }
+
+ void drain() {
+ std::unique_lock<std::mutex> guard(_mutex);
+ while (_pending > 0) {
+ _cond.wait(guard);
+ }
+ }
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
new file mode 100644
index 00000000000..d53ece2fc42
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
@@ -0,0 +1,93 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "spi_bm_feed_handler.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/metrics/loadtype.h>
+#include <vespa/persistence/spi/persistenceprovider.h>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::spi::Bucket;
+using storage::spi::PartitionId;
+using storage::spi::PersistenceProvider;
+using storage::spi::Timestamp;
+
+namespace feedbm {
+
+namespace {
+
+storage::spi::LoadType default_load_type(0, "default");
+storage::spi::Context context(default_load_type, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
+
+class MyOperationComplete : public storage::spi::OperationComplete
+{
+ PendingTracker& _tracker;
+public:
+ MyOperationComplete(PendingTracker& tracker);
+ ~MyOperationComplete();
+ void onComplete(std::unique_ptr<storage::spi::Result> result) override;
+ void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
+};
+
+MyOperationComplete::MyOperationComplete(PendingTracker& tracker)
+ : _tracker(tracker)
+{
+ _tracker.retain();
+}
+
+MyOperationComplete::~MyOperationComplete()
+{
+ _tracker.release();
+}
+
+void
+MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
+{
+ (void) result;
+}
+
+void
+MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler)
+{
+ (void) resultHandler;
+}
+
+}
+
+SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider)
+ : IBmFeedHandler(),
+ _provider(provider)
+{
+}
+
+SpiBmFeedHandler::~SpiBmFeedHandler() = default;
+
+void
+SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ _provider.putAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(tracker));
+}
+
+void
+SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ _provider.updateAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(tracker));
+}
+
+void
+SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ _provider.removeAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(tracker));
+
+}
+
+void
+SpiBmFeedHandler::create_bucket(const document::Bucket& bucket)
+{
+ _provider.createBucket(Bucket(bucket, PartitionId(0)), context);
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
new file mode 100644
index 00000000000..5b56a4f21dd
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
@@ -0,0 +1,26 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace storage::spi { struct PersistenceProvider; }
+
+namespace feedbm {
+
+/*
+ * Benchmark feed handler for feed directly to persistence provider
+ */
+class SpiBmFeedHandler : public IBmFeedHandler
+{
+ storage::spi::PersistenceProvider& _provider;
+public:
+ SpiBmFeedHandler(storage::spi::PersistenceProvider& provider);
+ ~SpiBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+ void create_bucket(const document::Bucket& bucket);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
new file mode 100644
index 00000000000..e2f3a951b99
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
@@ -0,0 +1,145 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_api_rpc_bm_feed_handler.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storage/storageserver/message_dispatcher.h>
+#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
+#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
+#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/slobrok/sbmirror.h>
+#include <cassert>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using document::DocumentTypeRepo;
+using storage::api::StorageMessageAddress;
+using storage::rpc::SharedRpcResources;
+
+namespace feedbm {
+
+namespace {
+
+FRT_RPCRequest *make_set_cluster_state_request() {
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
+ storage::rpc::SlimeClusterStateBundleCodec codec;
+ auto encoded_bundle = codec.encode(bundle);
+ auto *req = new FRT_RPCRequest();
+ auto* params = req->GetParams();
+ params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
+ params->AddInt32(encoded_bundle._uncompressed_length);
+ const auto buf_len = encoded_bundle._buffer->getDataLen();
+ params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len);
+ req->SetMethodName("setdistributionstates");
+ return req;
+}
+
+void set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) {
+ auto req = make_set_cluster_state_request();
+ auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory());
+ auto target = target_resolver->resolve_rpc_target(storage_address);
+ target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
+ assert(!req->IsError());
+ req->SubRef();
+}
+
+}
+
+class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+public:
+ MyMessageDispatcher()
+ : storage::MessageDispatcher(),
+ _mutex(),
+ _pending()
+ {
+ }
+ ~MyMessageDispatcher() override;
+ void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override {
+ release(msg->getMsgId());
+ }
+ void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override {
+ release(msg->getMsgId());
+ }
+ void retain(uint64_t msg_id, PendingTracker &tracker) {
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+ }
+ void release(uint64_t msg_id) {
+ PendingTracker *tracker = nullptr;
+ {
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ assert(itr != _pending.end());
+ tracker = itr->second;
+ _pending.erase(itr);
+ }
+ tracker->release();
+ }
+};
+
+StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
+}
+
+StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo)
+ : IBmFeedHandler(),
+ _storage_address(std::make_unique<StorageMessageAddress>("storage", storage::lib::NodeType::STORAGE, 0)),
+ _shared_rpc_resources(shared_rpc_resources_in),
+ _message_dispatcher(std::make_unique<MyMessageDispatcher>()),
+ _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo, std::make_shared<documentapi::LoadTypeSet>())),
+ _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, storage::rpc::StorageApiRpcService::Params()))
+{
+ set_cluster_up(_shared_rpc_resources, *_storage_address);
+}
+
+StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default;
+
+void
+StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+{
+ cmd->setSourceIndex(0);
+ cmd->setAddress(*_storage_address);
+ _message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
+ _rpc_client->send_rpc_v1_request(std::move(cmd));
+}
+
+void
+StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
+ send_rpc(std::move(cmd), tracker);
+}
+
+void
+StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
+ send_rpc(std::move(cmd), tracker);
+}
+
+void
+StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
+ send_rpc(std::move(cmd), tracker);
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
new file mode 100644
index 00000000000..0fe92350eb2
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
@@ -0,0 +1,43 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace document { class DocumentTypeRepo; }
+namespace storage::api {
+class StorageMessageAddress;
+class StorageCommand;
+}
+
+namespace storage::rpc {
+class MessageCodecProvider;
+class SharedRpcResources;
+class StorageApiRpcService;
+}
+
+namespace feedbm {
+
+/*
+ * Benchmark feed handler for feed to service layer using storage api protocol
+ * over rpc.
+ */
+class StorageApiRpcBmFeedHandler : public IBmFeedHandler
+{
+ class MyMessageDispatcher;
+ std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
+ storage::rpc::SharedRpcResources& _shared_rpc_resources;
+ std::unique_ptr<MyMessageDispatcher> _message_dispatcher;
+ std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider;
+ std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client;
+
+ void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+public:
+ StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo);
+ ~StorageApiRpcBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 85b310ccbad..4208e9448ff 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -1,7 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "pending_tracker.h"
+#include "spi_bm_feed_handler.h"
+#include "storage_api_rpc_bm_feed_handler.h"
#include <vespa/vespalib/testkit/testapp.h>
-
#include <tests/proton/common/dummydbowner.h>
#include <vespa/config-imported-fields.h>
#include <vespa/config-rank-profiles.h>
@@ -57,21 +59,7 @@
#include <vespa/storage/storageserver/storagenode.h>
#include <vespa/messagebus/config-messagebus.h>
#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/storage/storageserver/message_dispatcher.h>
-#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
-#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
-#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
-#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
-#include <vespa/documentapi/loadtypes/loadtypeset.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/vespalib/stllike/hash_map.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
-#include <vespa/fnet/frt/target.h>
-#include <vespa/slobrok/sbmirror.h>
#include <getopt.h>
#include <iostream>
@@ -123,29 +111,22 @@ using document::Field;
using document::FieldUpdate;
using document::IntFieldValue;
using document::test::makeBucketSpace;
-using documentapi::LoadTypeSet;
using search::TuneFileDocumentDB;
using search::index::DummyFileHeaderContext;
using search::index::Schema;
using search::index::SchemaBuilder;
using search::transactionlog::TransLogServer;
-using storage::rpc::MessageCodecProvider;
using storage::rpc::SharedRpcResources;
-using storage::rpc::StorageApiRpcService;
-using storage::spi::Bucket;
-using storage::spi::PartitionId;
using storage::spi::PersistenceProvider;
-using storage::spi::Priority;
-using storage::spi::Timestamp;
-using storage::spi::Trace;
using vespalib::makeLambdaTask;
+using feedbm::IBmFeedHandler;
+using feedbm::SpiBmFeedHandler;
+using feedbm::StorageApiRpcBmFeedHandler;
using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>;
namespace {
-storage::spi::LoadType default_load_type(0, "default");
-
vespalib::string base_dir = "testdb";
std::shared_ptr<DocumenttypesConfig> make_document_type() {
@@ -206,84 +187,6 @@ struct MyResourceWriteFilter : public IResourceWriteFilter
State getAcceptState() const override { return IResourceWriteFilter::State(); }
};
-class MyPendingTracker {
- uint32_t _pending;
- uint32_t _limit;
- std::mutex _mutex;
- std::condition_variable _cond;
-
-public:
- MyPendingTracker(uint32_t limit)
- : _pending(0u),
- _limit(limit),
- _mutex(),
- _cond()
- {
- }
-
- ~MyPendingTracker()
- {
- drain();
- }
-
- void release() {
- std::unique_lock<std::mutex> guard(_mutex);
- --_pending;
- if (_pending < _limit) {
- _cond.notify_all();
- }
- //LOG(info, "release, pending is now %u", _pending);
- }
- void retain() {
- std::unique_lock<std::mutex> guard(_mutex);
- while (_pending >= _limit) {
- _cond.wait(guard);
- }
- ++_pending;
- //LOG(info, "retain, pending is now %u", _pending);
- }
-
- void drain() {
- std::unique_lock<std::mutex> guard(_mutex);
- while (_pending > 0) {
- _cond.wait(guard);
- }
- }
-};
-
-class MyOperationComplete : public storage::spi::OperationComplete
-{
- MyPendingTracker& _tracker;
-public:
- MyOperationComplete(MyPendingTracker &tracker);
- ~MyOperationComplete();
- void onComplete(std::unique_ptr<storage::spi::Result> result) override;
- void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
-};
-
-MyOperationComplete::MyOperationComplete(MyPendingTracker& tracker)
- : _tracker(tracker)
-{
- _tracker.retain();
-}
-
-MyOperationComplete::~MyOperationComplete()
-{
- _tracker.release();
-}
-
-void
-MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
-{
- (void) result;
-}
-
-void
-MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler)
-{
- (void) resultHandler;
-}
-
class BMRange
{
uint32_t _start;
@@ -525,71 +428,6 @@ struct MyRpcClientConfig {
MyRpcClientConfig::~MyRpcClientConfig() = default;
-class MyMessageDispatcher : public storage::MessageDispatcher
-{
- std::mutex _mutex;
- vespalib::hash_map<uint64_t, MyPendingTracker *> _pending;
-public:
- MyMessageDispatcher()
- : storage::MessageDispatcher(),
- _mutex(),
- _pending()
- {
- }
- ~MyMessageDispatcher() override;
- void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override {
- release(msg->getMsgId());
- }
- void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override {
- release(msg->getMsgId());
- }
- void retain(uint64_t msg_id, MyPendingTracker &tracker) {
- tracker.retain();
- std::lock_guard lock(_mutex);
- _pending.insert(std::make_pair(msg_id, &tracker));
- }
- void release(uint64_t msg_id) {
- MyPendingTracker *tracker = nullptr;
- {
- std::lock_guard lock(_mutex);
- auto itr = _pending.find(msg_id);
- assert(itr != _pending.end());
- tracker = itr->second;
- _pending.erase(itr);
- }
- tracker->release();
- }
-};
-
-MyMessageDispatcher::~MyMessageDispatcher()
-{
- std::lock_guard lock(_mutex);
- assert(_pending.empty());
-}
-
-FRT_RPCRequest *make_set_cluster_state_request() {
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
- storage::rpc::SlimeClusterStateBundleCodec codec;
- auto encoded_bundle = codec.encode(bundle);
- auto *req = new FRT_RPCRequest();
- auto* params = req->GetParams();
- params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
- params->AddInt32(encoded_bundle._uncompressed_length);
- const auto buf_len = encoded_bundle._buffer->getDataLen();
- params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len);
- req->SetMethodName("setdistributionstates");
- return req;
-}
-
-void set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) {
- auto req = make_set_cluster_state_request();
- auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory());
- auto target = target_resolver->resolve_rpc_target(storage_address);
- target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
- assert(!req->IsError());
- req->SubRef();
-}
-
}
struct PersistenceProviderFixture {
@@ -618,33 +456,28 @@ struct PersistenceProviderFixture {
MyPersistenceEngineOwner _persistence_owner;
MyResourceWriteFilter _write_filter;
std::shared_ptr<PersistenceEngine> _persistence_engine;
- storage::spi::Context _context;
uint32_t _bucket_bits;
MyStorageConfig _service_layer_config;
MyRpcClientConfig _rpc_client_config;
ConfigSet _config_set;
std::shared_ptr<IConfigContext> _config_context;
- storage::api::StorageMessageAddress _storage_address;
+ std::unique_ptr<IBmFeedHandler> _feed_handler;
std::unique_ptr<mbus::Slobrok> _slobrok;
std::unique_ptr<MyServiceLayerProcess> _service_layer;
- std::unique_ptr<MessageCodecProvider> _message_codec_provider;
std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
- std::unique_ptr<MyMessageDispatcher> _rpc_message_dispatcher;
- std::unique_ptr<StorageApiRpcService> _rpc_client;
PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
void create_document_db();
uint32_t num_buckets() const { return (1u << _bucket_bits); }
BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); }
- Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); }
+ document::Bucket make_bucket(uint32_t i) const { return document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))); }
DocumentId make_document_id(uint32_t i) const;
std::unique_ptr<Document> make_document(uint32_t i) const;
std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const;
void create_buckets();
void start_service_layer();
void shutdown_service_layer();
- void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, MyPendingTracker& pending_tracker);
};
PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
@@ -673,19 +506,15 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_persistence_owner(),
_write_filter(),
_persistence_engine(),
- _context(default_load_type, Priority(0), Trace::TraceLevel(0)),
_bucket_bits(16),
_service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port, params.get_rpc_network_threads()),
_rpc_client_config("bm-rpc-client", _slobrok_port),
_config_set(),
_config_context(std::make_shared<ConfigContext>(_config_set)),
- _storage_address("storage", storage::lib::NodeType::STORAGE, 0),
+ _feed_handler(),
_slobrok(),
_service_layer(),
- _message_codec_provider(),
- _rpc_client_shared_rpc_resources(),
- _rpc_message_dispatcher(),
- _rpc_client()
+ _rpc_client_shared_rpc_resources()
{
create_document_db();
_persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false);
@@ -693,6 +522,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
_service_layer_config.add_builders(_config_set);
_rpc_client_config.add_builders(_config_set);
+ _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine);
}
PersistenceProviderFixture::~PersistenceProviderFixture()
@@ -778,9 +608,9 @@ PersistenceProviderFixture::make_document_update(uint32_t i) const
void
PersistenceProviderFixture::create_buckets()
{
- auto &provider = *_persistence_engine;
+ SpiBmFeedHandler feed_handler(*_persistence_engine);
for (unsigned int i = 0; i < num_buckets(); ++i) {
- provider.createBucket(make_bucket(i), _context);
+ feed_handler.create_bucket(make_bucket(i));
}
}
@@ -796,21 +626,17 @@ PersistenceProviderFixture::start_service_layer()
_service_layer->setupConfig(100ms);
_service_layer->createNode();
_service_layer->getNode().waitUntilInitialized();
- _message_codec_provider = std::make_unique<MessageCodecProvider>(_repo, std::make_shared<documentapi::LoadTypeSet>());
LOG(info, "start rpc client shared resources");
config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
_rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100);
_rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
- _rpc_message_dispatcher = std::make_unique<MyMessageDispatcher>();
- _rpc_client = std::make_unique<StorageApiRpcService>(*_rpc_message_dispatcher, *_rpc_client_shared_rpc_resources, *_message_codec_provider, StorageApiRpcService::Params());
- set_cluster_up(*_rpc_client_shared_rpc_resources, _storage_address);
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo);
}
void
PersistenceProviderFixture::shutdown_service_layer()
{
- _rpc_client.reset();
- _rpc_message_dispatcher.reset();
+ _feed_handler.reset();
if (_rpc_client_shared_rpc_resources) {
LOG(info, "stop rpc client shared resources");
_rpc_client_shared_rpc_resources->shutdown();
@@ -827,15 +653,6 @@ PersistenceProviderFixture::shutdown_service_layer()
}
}
-void
-PersistenceProviderFixture::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, MyPendingTracker& pending_tracker)
-{
- cmd->setSourceIndex(0);
- cmd->setAddress(_storage_address);
- _rpc_message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
- _rpc_client->send_rpc_v1_request(std::move(cmd));
-}
-
vespalib::nbostream
make_put_feed(PersistenceProviderFixture &f, BMRange range)
{
@@ -872,23 +689,16 @@ void
put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
- MyPendingTracker pending_tracker(100);
- auto &provider = *f._persistence_engine;
- auto &context = f._context;
+ feedbm::PendingTracker pending_tracker(100);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
- Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ document::Bucket bucket(bucket_space, bucket_id);
auto document = std::make_unique<Document>(repo, is);
- if (f._rpc_client) {
- auto cmd = std::make_unique<storage::api::PutCommand>(bucket.getBucket(), std::move(document), time_bias + i);
- f.send_rpc(std::move(cmd), pending_tracker);
- } else {
- provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker));
- }
+ f._feed_handler->put(bucket, std::move(document), time_bias + i, pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -928,23 +738,16 @@ void
update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
- MyPendingTracker pending_tracker(100);
- auto &provider = *f._persistence_engine;
- auto &context = f._context;
+ feedbm::PendingTracker pending_tracker(100);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
- Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ document::Bucket bucket(bucket_space, bucket_id);
auto document_update = DocumentUpdate::createHEAD(repo, is);
- if (f._rpc_client) {
- auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket.getBucket(), std::move(document_update), time_bias + i);
- f.send_rpc(std::move(cmd), pending_tracker);
- } else {
- provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker));
- }
+ f._feed_handler->update(bucket, std::move(document_update), time_bias + i, pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -985,22 +788,15 @@ void
remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
- MyPendingTracker pending_tracker(100);
- auto &provider = *f._persistence_engine;
- auto &context = f._context;
+ feedbm::PendingTracker pending_tracker(100);
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
- Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ document::Bucket bucket(bucket_space, bucket_id);
DocumentId document_id(is);
- if (f._rpc_client) {
- auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket.getBucket(), document_id, time_bias + i);
- f.send_rpc(std::move(cmd), pending_tracker);
- } else {
- provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker));
- }
+ f._feed_handler->remove(bucket, document_id, time_bias + i, pending_tracker);
}
assert(is.empty());
pending_tracker.drain();