diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-24 16:15:22 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-24 16:52:18 +0200 |
commit | ee92e3bd735b67fd839029dabbd6eebd2b56e42d (patch) | |
tree | 3cbcbbeb927b24490f20b5ea08d8b24b4ad957e7 /searchcore | |
parent | 039b0138aeeefaf77100dd15ba85c7024b7edfcc (diff) |
Factor out feed handlers for benchmark feed.
Diffstat (limited to 'searchcore')
8 files changed, 420 insertions, 228 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index 9fa47c77b03..1d031e2ea30 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -2,6 +2,8 @@ vespa_add_executable(searchcore_vespa_feed_bm_app SOURCES vespa_feed_bm.cpp + spi_bm_feed_handler.cpp + storage_api_rpc_bm_feed_handler.cpp OUTPUT_NAME vespa-feed-bm DEPENDS searchcore_server diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h new file mode 100644 index 00000000000..a3341bf14c9 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h @@ -0,0 +1,30 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <memory> + +namespace document { +class Bucket; +class Document; +class DocumentUpdate; +class DocumentId; +} + +namespace feedbm { + +class PendingTracker; + +/* + * Interface class for benchmark feed handler. + */ +class IBmFeedHandler +{ +public: + virtual ~IBmFeedHandler() = default; + virtual void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) = 0; + virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0; + virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h new file mode 100644 index 00000000000..3698832068f --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h @@ -0,0 +1,57 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <mutex> +#include <condition_variable> + +namespace feedbm { + +/* + * Class to track number of pending operations, used as backpressure during + * benchmark feeding. + */ +class PendingTracker { + uint32_t _pending; + uint32_t _limit; + std::mutex _mutex; + std::condition_variable _cond; + +public: + PendingTracker(uint32_t limit) + : _pending(0u), + _limit(limit), + _mutex(), + _cond() + { + } + + ~PendingTracker() + { + drain(); + } + + void release() { + std::unique_lock<std::mutex> guard(_mutex); + --_pending; + if (_pending < _limit) { + _cond.notify_all(); + } + } + void retain() { + std::unique_lock<std::mutex> guard(_mutex); + while (_pending >= _limit) { + _cond.wait(guard); + } + ++_pending; + } + + void drain() { + std::unique_lock<std::mutex> guard(_mutex); + while (_pending > 0) { + _cond.wait(guard); + } + } +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp new file mode 100644 index 00000000000..d53ece2fc42 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp @@ -0,0 +1,93 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "spi_bm_feed_handler.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/metrics/loadtype.h> +#include <vespa/persistence/spi/persistenceprovider.h> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::spi::Bucket; +using storage::spi::PartitionId; +using storage::spi::PersistenceProvider; +using storage::spi::Timestamp; + +namespace feedbm { + +namespace { + +storage::spi::LoadType default_load_type(0, "default"); +storage::spi::Context context(default_load_type, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); + +class MyOperationComplete : public storage::spi::OperationComplete +{ + PendingTracker& _tracker; +public: + MyOperationComplete(PendingTracker& tracker); + ~MyOperationComplete(); + void onComplete(std::unique_ptr<storage::spi::Result> result) override; + void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; +}; + +MyOperationComplete::MyOperationComplete(PendingTracker& tracker) + : _tracker(tracker) +{ + _tracker.retain(); +} + +MyOperationComplete::~MyOperationComplete() +{ + _tracker.release(); +} + +void +MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) +{ + (void) result; +} + +void +MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler) +{ + (void) resultHandler; +} + +} + +SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider) + : IBmFeedHandler(), + _provider(provider) +{ +} + +SpiBmFeedHandler::~SpiBmFeedHandler() = default; + +void +SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + _provider.putAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(tracker)); +} + +void +SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + _provider.updateAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(tracker)); +} + +void +SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + _provider.removeAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(tracker)); + +} + +void +SpiBmFeedHandler::create_bucket(const document::Bucket& bucket) +{ + _provider.createBucket(Bucket(bucket, PartitionId(0)), context); +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h new file mode 100644 index 00000000000..5b56a4f21dd --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace storage::spi { struct PersistenceProvider; } + +namespace feedbm { + +/* + * Benchmark feed handler for feed directly to persistence provider + */ +class SpiBmFeedHandler : public IBmFeedHandler +{ + storage::spi::PersistenceProvider& _provider; +public: + SpiBmFeedHandler(storage::spi::PersistenceProvider& provider); + ~SpiBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + void create_bucket(const document::Bucket& bucket); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp new file mode 100644 index 00000000000..e2f3a951b99 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp @@ -0,0 +1,145 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_rpc_bm_feed_handler.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/documentapi/loadtypes/loadtypeset.h> +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/storageserver/message_dispatcher.h> +#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> +#include <vespa/storage/storageserver/rpc/message_codec_provider.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> +#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/slobrok/sbmirror.h> +#include <cassert> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using document::DocumentTypeRepo; +using storage::api::StorageMessageAddress; +using storage::rpc::SharedRpcResources; + +namespace feedbm { + +namespace { + +FRT_RPCRequest *make_set_cluster_state_request() { + storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); + storage::rpc::SlimeClusterStateBundleCodec codec; + auto encoded_bundle = codec.encode(bundle); + auto *req = new FRT_RPCRequest(); + auto* params = req->GetParams(); + params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type)); + params->AddInt32(encoded_bundle._uncompressed_length); + const auto buf_len = encoded_bundle._buffer->getDataLen(); + params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len); + req->SetMethodName("setdistributionstates"); + return req; +} + +void set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) { + auto req = make_set_cluster_state_request(); + auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory()); + auto target = target_resolver->resolve_rpc_target(storage_address); + target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout + assert(!req->IsError()); + req->SubRef(); +} + +} + +class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher +{ + std::mutex _mutex; + vespalib::hash_map<uint64_t, PendingTracker *> _pending; +public: + MyMessageDispatcher() + : storage::MessageDispatcher(), + _mutex(), + _pending() + { + } + ~MyMessageDispatcher() override; + void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override { + release(msg->getMsgId()); + } + void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override { + release(msg->getMsgId()); + } + void retain(uint64_t msg_id, PendingTracker &tracker) { + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); + } + void release(uint64_t msg_id) { + PendingTracker *tracker = nullptr; + { + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + assert(itr != _pending.end()); + tracker = itr->second; + _pending.erase(itr); + } + tracker->release(); + } +}; + +StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo) + : IBmFeedHandler(), + _storage_address(std::make_unique<StorageMessageAddress>("storage", storage::lib::NodeType::STORAGE, 0)), + _shared_rpc_resources(shared_rpc_resources_in), + _message_dispatcher(std::make_unique<MyMessageDispatcher>()), + _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo, std::make_shared<documentapi::LoadTypeSet>())), + _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, storage::rpc::StorageApiRpcService::Params())) +{ + set_cluster_up(_shared_rpc_resources, *_storage_address); +} + +StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default; + +void +StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +{ + cmd->setSourceIndex(0); + cmd->setAddress(*_storage_address); + _message_dispatcher->retain(cmd->getMsgId(), pending_tracker); + _rpc_client->send_rpc_v1_request(std::move(cmd)); +} + +void +StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); + send_rpc(std::move(cmd), tracker); +} + +void +StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); + send_rpc(std::move(cmd), tracker); +} + +void +StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); + send_rpc(std::move(cmd), tracker); +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h new file mode 100644 index 00000000000..0fe92350eb2 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h @@ -0,0 +1,43 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace document { class DocumentTypeRepo; } +namespace storage::api { +class StorageMessageAddress; +class StorageCommand; +} + +namespace storage::rpc { +class MessageCodecProvider; +class SharedRpcResources; +class StorageApiRpcService; +} + +namespace feedbm { + +/* + * Benchmark feed handler for feed to service layer using storage api protocol + * over rpc. + */ +class StorageApiRpcBmFeedHandler : public IBmFeedHandler +{ + class MyMessageDispatcher; + std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; + storage::rpc::SharedRpcResources& _shared_rpc_resources; + std::unique_ptr<MyMessageDispatcher> _message_dispatcher; + std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider; + std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client; + + void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); +public: + StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo); + ~StorageApiRpcBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 85b310ccbad..4208e9448ff 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -1,7 +1,9 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "pending_tracker.h" +#include "spi_bm_feed_handler.h" +#include "storage_api_rpc_bm_feed_handler.h" #include <vespa/vespalib/testkit/testapp.h> - #include <tests/proton/common/dummydbowner.h> #include <vespa/config-imported-fields.h> #include <vespa/config-rank-profiles.h> @@ -57,21 +59,7 @@ #include <vespa/storage/storageserver/storagenode.h> #include <vespa/messagebus/config-messagebus.h> #include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/storage/storageserver/message_dispatcher.h> -#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> -#include <vespa/storage/storageserver/rpc/message_codec_provider.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> -#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> -#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> -#include <vespa/documentapi/loadtypes/loadtypeset.h> -#include <vespa/storageapi/messageapi/storagemessage.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/vespalib/stllike/hash_map.h> -#include <vespa/vespalib/stllike/hash_map.hpp> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> -#include <vespa/fnet/frt/target.h> -#include <vespa/slobrok/sbmirror.h> #include <getopt.h> #include <iostream> @@ -123,29 +111,22 @@ using document::Field; using document::FieldUpdate; using document::IntFieldValue; using document::test::makeBucketSpace; -using documentapi::LoadTypeSet; using search::TuneFileDocumentDB; using search::index::DummyFileHeaderContext; using search::index::Schema; using search::index::SchemaBuilder; using search::transactionlog::TransLogServer; -using storage::rpc::MessageCodecProvider; using storage::rpc::SharedRpcResources; -using storage::rpc::StorageApiRpcService; -using storage::spi::Bucket; -using storage::spi::PartitionId; using storage::spi::PersistenceProvider; -using storage::spi::Priority; -using storage::spi::Timestamp; -using storage::spi::Trace; using vespalib::makeLambdaTask; +using feedbm::IBmFeedHandler; +using feedbm::SpiBmFeedHandler; +using feedbm::StorageApiRpcBmFeedHandler; using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>; namespace { -storage::spi::LoadType default_load_type(0, "default"); - vespalib::string base_dir = "testdb"; std::shared_ptr<DocumenttypesConfig> make_document_type() { @@ -206,84 +187,6 @@ struct MyResourceWriteFilter : public IResourceWriteFilter State getAcceptState() const override { return IResourceWriteFilter::State(); } }; -class MyPendingTracker { - uint32_t _pending; - uint32_t _limit; - std::mutex _mutex; - std::condition_variable _cond; - -public: - MyPendingTracker(uint32_t limit) - : _pending(0u), - _limit(limit), - _mutex(), - _cond() - { - } - - ~MyPendingTracker() - { - drain(); - } - - void release() { - std::unique_lock<std::mutex> guard(_mutex); - --_pending; - if (_pending < _limit) { - _cond.notify_all(); - } - //LOG(info, "release, pending is now %u", _pending); - } - void retain() { - std::unique_lock<std::mutex> guard(_mutex); - while (_pending >= _limit) { - _cond.wait(guard); - } - ++_pending; - //LOG(info, "retain, pending is now %u", _pending); - } - - void drain() { - std::unique_lock<std::mutex> guard(_mutex); - while (_pending > 0) { - _cond.wait(guard); - } - } -}; - -class MyOperationComplete : public storage::spi::OperationComplete -{ - MyPendingTracker& _tracker; -public: - MyOperationComplete(MyPendingTracker &tracker); - ~MyOperationComplete(); - void onComplete(std::unique_ptr<storage::spi::Result> result) override; - void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; -}; - -MyOperationComplete::MyOperationComplete(MyPendingTracker& tracker) - : _tracker(tracker) -{ - _tracker.retain(); -} - -MyOperationComplete::~MyOperationComplete() -{ - _tracker.release(); -} - -void -MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) -{ - (void) result; -} - -void -MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler) -{ - (void) resultHandler; -} - class BMRange { uint32_t _start; @@ -525,71 +428,6 @@ struct MyRpcClientConfig { MyRpcClientConfig::~MyRpcClientConfig() = default; -class MyMessageDispatcher : public storage::MessageDispatcher -{ - std::mutex _mutex; - vespalib::hash_map<uint64_t, MyPendingTracker *> _pending; -public: - MyMessageDispatcher() - : storage::MessageDispatcher(), - _mutex(), - _pending() - { - } - ~MyMessageDispatcher() override; - void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override { - release(msg->getMsgId()); - } - void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override { - release(msg->getMsgId()); - } - void retain(uint64_t msg_id, MyPendingTracker &tracker) { - tracker.retain(); - std::lock_guard lock(_mutex); - _pending.insert(std::make_pair(msg_id, &tracker)); - } - void release(uint64_t msg_id) { - MyPendingTracker *tracker = nullptr; - { - std::lock_guard lock(_mutex); - auto itr = _pending.find(msg_id); - assert(itr != _pending.end()); - tracker = itr->second; - _pending.erase(itr); - } - tracker->release(); - } -}; - -MyMessageDispatcher::~MyMessageDispatcher() -{ - std::lock_guard lock(_mutex); - assert(_pending.empty()); -} - -FRT_RPCRequest *make_set_cluster_state_request() { - storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); - storage::rpc::SlimeClusterStateBundleCodec codec; - auto encoded_bundle = codec.encode(bundle); - auto *req = new FRT_RPCRequest(); - auto* params = req->GetParams(); - params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type)); - params->AddInt32(encoded_bundle._uncompressed_length); - const auto buf_len = encoded_bundle._buffer->getDataLen(); - params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len); - req->SetMethodName("setdistributionstates"); - return req; -} - -void set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) { - auto req = make_set_cluster_state_request(); - auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory()); - auto target = target_resolver->resolve_rpc_target(storage_address); - target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout - assert(!req->IsError()); - req->SubRef(); -} - } struct PersistenceProviderFixture { @@ -618,33 +456,28 @@ struct PersistenceProviderFixture { MyPersistenceEngineOwner _persistence_owner; MyResourceWriteFilter _write_filter; std::shared_ptr<PersistenceEngine> _persistence_engine; - storage::spi::Context _context; uint32_t _bucket_bits; MyStorageConfig _service_layer_config; MyRpcClientConfig _rpc_client_config; ConfigSet _config_set; std::shared_ptr<IConfigContext> _config_context; - storage::api::StorageMessageAddress _storage_address; + std::unique_ptr<IBmFeedHandler> _feed_handler; std::unique_ptr<mbus::Slobrok> _slobrok; std::unique_ptr<MyServiceLayerProcess> _service_layer; - std::unique_ptr<MessageCodecProvider> _message_codec_provider; std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; - std::unique_ptr<MyMessageDispatcher> _rpc_message_dispatcher; - std::unique_ptr<StorageApiRpcService> _rpc_client; PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); void create_document_db(); uint32_t num_buckets() const { return (1u << _bucket_bits); } BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); } - Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); } + document::Bucket make_bucket(uint32_t i) const { return document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))); } DocumentId make_document_id(uint32_t i) const; std::unique_ptr<Document> make_document(uint32_t i) const; std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const; void create_buckets(); void start_service_layer(); void shutdown_service_layer(); - void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, MyPendingTracker& pending_tracker); }; PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) @@ -673,19 +506,15 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _persistence_owner(), _write_filter(), _persistence_engine(), - _context(default_load_type, Priority(0), Trace::TraceLevel(0)), _bucket_bits(16), _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port, params.get_rpc_network_threads()), _rpc_client_config("bm-rpc-client", _slobrok_port), _config_set(), _config_context(std::make_shared<ConfigContext>(_config_set)), - _storage_address("storage", storage::lib::NodeType::STORAGE, 0), + _feed_handler(), _slobrok(), _service_layer(), - _message_codec_provider(), - _rpc_client_shared_rpc_resources(), - _rpc_message_dispatcher(), - _rpc_client() + _rpc_client_shared_rpc_resources() { create_document_db(); _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false); @@ -693,6 +522,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); _service_layer_config.add_builders(_config_set); _rpc_client_config.add_builders(_config_set); + _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine); } PersistenceProviderFixture::~PersistenceProviderFixture() @@ -778,9 +608,9 @@ PersistenceProviderFixture::make_document_update(uint32_t i) const void PersistenceProviderFixture::create_buckets() { - auto &provider = *_persistence_engine; + SpiBmFeedHandler feed_handler(*_persistence_engine); for (unsigned int i = 0; i < num_buckets(); ++i) { - provider.createBucket(make_bucket(i), _context); + feed_handler.create_bucket(make_bucket(i)); } } @@ -796,21 +626,17 @@ PersistenceProviderFixture::start_service_layer() _service_layer->setupConfig(100ms); _service_layer->createNode(); _service_layer->getNode().waitUntilInitialized(); - _message_codec_provider = std::make_unique<MessageCodecProvider>(_repo, std::make_shared<documentapi::LoadTypeSet>()); LOG(info, "start rpc client shared resources"); config::ConfigUri client_config_uri("bm-rpc-client", _config_context); _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100); _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); - _rpc_message_dispatcher = std::make_unique<MyMessageDispatcher>(); - _rpc_client = std::make_unique<StorageApiRpcService>(*_rpc_message_dispatcher, *_rpc_client_shared_rpc_resources, *_message_codec_provider, StorageApiRpcService::Params()); - set_cluster_up(*_rpc_client_shared_rpc_resources, _storage_address); + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); } void PersistenceProviderFixture::shutdown_service_layer() { - _rpc_client.reset(); - _rpc_message_dispatcher.reset(); + _feed_handler.reset(); if (_rpc_client_shared_rpc_resources) { LOG(info, "stop rpc client shared resources"); _rpc_client_shared_rpc_resources->shutdown(); @@ -827,15 +653,6 @@ PersistenceProviderFixture::shutdown_service_layer() } } -void -PersistenceProviderFixture::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, MyPendingTracker& pending_tracker) -{ - cmd->setSourceIndex(0); - cmd->setAddress(_storage_address); - _rpc_message_dispatcher->retain(cmd->getMsgId(), pending_tracker); - _rpc_client->send_rpc_v1_request(std::move(cmd)); -} - vespalib::nbostream make_put_feed(PersistenceProviderFixture &f, BMRange range) { @@ -872,23 +689,16 @@ void put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); - MyPendingTracker pending_tracker(100); - auto &provider = *f._persistence_engine; - auto &context = f._context; + feedbm::PendingTracker pending_tracker(100); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; - Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + document::Bucket bucket(bucket_space, bucket_id); auto document = std::make_unique<Document>(repo, is); - if (f._rpc_client) { - auto cmd = std::make_unique<storage::api::PutCommand>(bucket.getBucket(), std::move(document), time_bias + i); - f.send_rpc(std::move(cmd), pending_tracker); - } else { - provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker)); - } + f._feed_handler->put(bucket, std::move(document), time_bias + i, pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -928,23 +738,16 @@ void update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); - MyPendingTracker pending_tracker(100); - auto &provider = *f._persistence_engine; - auto &context = f._context; + feedbm::PendingTracker pending_tracker(100); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; - Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + document::Bucket bucket(bucket_space, bucket_id); auto document_update = DocumentUpdate::createHEAD(repo, is); - if (f._rpc_client) { - auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket.getBucket(), std::move(document_update), time_bias + i); - f.send_rpc(std::move(cmd), pending_tracker); - } else { - provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker)); - } + f._feed_handler->update(bucket, std::move(document_update), time_bias + i, pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -985,22 +788,15 @@ void remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); - MyPendingTracker pending_tracker(100); - auto &provider = *f._persistence_engine; - auto &context = f._context; + feedbm::PendingTracker pending_tracker(100); vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; - Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + document::Bucket bucket(bucket_space, bucket_id); DocumentId document_id(is); - if (f._rpc_client) { - auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket.getBucket(), document_id, time_bias + i); - f.send_rpc(std::move(cmd), pending_tracker); - } else { - provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker)); - } + f._feed_handler->remove(bucket, document_id, time_bias + i, pending_tracker); } assert(is.empty()); pending_tracker.drain(); |