diff options
Diffstat (limited to 'searchcore')
46 files changed, 1237 insertions, 209 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index f29712e7a5f..f98a3c87a2e 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -46,9 +46,9 @@ vespa_define_module( src/apps/tests src/apps/verify_ranksetup src/apps/vespa-dump-feed + src/apps/vespa-feed-bm src/apps/vespa-gen-testdocs src/apps/vespa-proton-cmd - src/apps/vespa-spi-feed-bm src/apps/vespa-transactionlog-inspect TESTS diff --git a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp index 2d028f47513..1d492cb558f 100644 --- a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp +++ b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp @@ -59,6 +59,7 @@ OnnxModels make_models(const OnnxModelsConfig &modelsCfg, const VerifyRanksetupC for (const auto &entry: modelsCfg.model) { if (auto file = get_file(entry.fileref, myCfg)) { model_list.emplace_back(entry.name, file.value()); + OnnxModels::configure(entry, model_list.back()); } else { LOG(warning, "could not find file for onnx model '%s' (ref:'%s')\n", entry.name.c_str(), entry.fileref.c_str()); diff --git a/searchcore/src/apps/vespa-feed-bm/.gitignore b/searchcore/src/apps/vespa-feed-bm/.gitignore new file mode 100644 index 00000000000..0dc27e95ea8 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/.gitignore @@ -0,0 +1 @@ +vespa-feed-bm diff --git a/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index e188bc16ec0..4ced3fe173b 100644 --- a/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -1,8 +1,11 @@ # Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(searchcore_vespa_spi_feed_bm_app +vespa_add_executable(searchcore_vespa_feed_bm_app SOURCES - vespa_spi_feed_bm.cpp - OUTPUT_NAME vespa-spi-feed-bm + vespa_feed_bm.cpp + spi_bm_feed_handler.cpp + storage_api_rpc_bm_feed_handler.cpp + storage_api_chain_bm_feed_handler.cpp + OUTPUT_NAME vespa-feed-bm DEPENDS searchcore_server searchcore_initializer @@ -20,5 +23,8 @@ vespa_add_executable(searchcore_vespa_spi_feed_bm_app searchcore_grouping searchcore_proton_metrics searchcore_fconfig + storageserver_storageapp + messagebus_messagebus-test + messagebus searchlib_searchlib_uca ) diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h new file mode 100644 index 00000000000..a3341bf14c9 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h @@ -0,0 +1,30 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <memory> + +namespace document { +class Bucket; +class Document; +class DocumentUpdate; +class DocumentId; +} + +namespace feedbm { + +class PendingTracker; + +/* + * Interface class for benchmark feed handler. + */ +class IBmFeedHandler +{ +public: + virtual ~IBmFeedHandler() = default; + virtual void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) = 0; + virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0; + virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h new file mode 100644 index 00000000000..3698832068f --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h @@ -0,0 +1,57 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <mutex> +#include <condition_variable> + +namespace feedbm { + +/* + * Class to track number of pending operations, used as backpressure during + * benchmark feeding. + */ +class PendingTracker { + uint32_t _pending; + uint32_t _limit; + std::mutex _mutex; + std::condition_variable _cond; + +public: + PendingTracker(uint32_t limit) + : _pending(0u), + _limit(limit), + _mutex(), + _cond() + { + } + + ~PendingTracker() + { + drain(); + } + + void release() { + std::unique_lock<std::mutex> guard(_mutex); + --_pending; + if (_pending < _limit) { + _cond.notify_all(); + } + } + void retain() { + std::unique_lock<std::mutex> guard(_mutex); + while (_pending >= _limit) { + _cond.wait(guard); + } + ++_pending; + } + + void drain() { + std::unique_lock<std::mutex> guard(_mutex); + while (_pending > 0) { + _cond.wait(guard); + } + } +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp new file mode 100644 index 00000000000..d53ece2fc42 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp @@ -0,0 +1,93 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "spi_bm_feed_handler.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/metrics/loadtype.h> +#include <vespa/persistence/spi/persistenceprovider.h> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::spi::Bucket; +using storage::spi::PartitionId; +using storage::spi::PersistenceProvider; +using storage::spi::Timestamp; + +namespace feedbm { + +namespace { + +storage::spi::LoadType default_load_type(0, "default"); +storage::spi::Context context(default_load_type, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); + +class MyOperationComplete : public storage::spi::OperationComplete +{ + PendingTracker& _tracker; +public: + MyOperationComplete(PendingTracker& tracker); + ~MyOperationComplete(); + void onComplete(std::unique_ptr<storage::spi::Result> result) override; + void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; +}; + +MyOperationComplete::MyOperationComplete(PendingTracker& tracker) + : _tracker(tracker) +{ + _tracker.retain(); +} + +MyOperationComplete::~MyOperationComplete() +{ + _tracker.release(); +} + +void +MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) +{ + (void) result; +} + +void +MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler) +{ + (void) resultHandler; +} + +} + +SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider) + : IBmFeedHandler(), + _provider(provider) +{ +} + +SpiBmFeedHandler::~SpiBmFeedHandler() = default; + +void +SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + _provider.putAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(tracker)); +} + +void +SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + _provider.updateAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(tracker)); +} + +void +SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + _provider.removeAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(tracker)); + +} + +void +SpiBmFeedHandler::create_bucket(const document::Bucket& bucket) +{ + _provider.createBucket(Bucket(bucket, PartitionId(0)), context); +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h new file mode 100644 index 00000000000..5b56a4f21dd --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace storage::spi { struct PersistenceProvider; } + +namespace feedbm { + +/* + * Benchmark feed handler for feed directly to persistence provider + */ +class SpiBmFeedHandler : public IBmFeedHandler +{ + storage::spi::PersistenceProvider& _provider; +public: + SpiBmFeedHandler(storage::spi::PersistenceProvider& provider); + ~SpiBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + void create_bucket(const document::Bucket& bucket); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp new file mode 100644 index 00000000000..6f1acd10fe4 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp @@ -0,0 +1,185 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_chain_bm_feed_handler.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storage/common/storagelink.h> +#include <vespa/storage/common/storage_chain_builder.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> +#include <cassert> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::StorageLink; + +namespace feedbm { + +namespace { + +std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() { + storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); + auto cmd = std::make_shared<storage::api::SetSystemStateCommand>(bundle); + cmd->setPriority(storage::api::StorageMessage::VERYHIGH); + return cmd; +} + +} + +class BmStorageLink : public StorageLink +{ + std::mutex _mutex; + vespalib::hash_map<uint64_t, PendingTracker *> _pending; +public: + BmStorageLink(); + ~BmStorageLink() override; + bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override; + bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override; + void retain(uint64_t msg_id, PendingTracker &tracker) { + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); + } + bool release(uint64_t msg_id) { + PendingTracker *tracker = nullptr; + { + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + if (itr == _pending.end()) { + return false; + } + tracker = itr->second; + _pending.erase(itr); + } + tracker->release(); + return true; + } +}; + +BmStorageLink::BmStorageLink() + : storage::StorageLink("vespa-bm-feed"), + _mutex(), + _pending() +{ +} + +BmStorageLink::~BmStorageLink() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +bool +BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) +{ + (void) msg; + return false; +} + +bool +BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) +{ + return release(msg->getMsgId()); +} + +struct StorageApiChainBmFeedHandler::Context { + BmStorageLink* bm_link; + Context() + : bm_link(nullptr) + { + } + ~Context() = default; +}; + +class MyStorageChainBuilder : public storage::StorageChainBuilder +{ + using Parent = storage::StorageChainBuilder; + std::shared_ptr<StorageApiChainBmFeedHandler::Context> _context; +public: + MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context); + ~MyStorageChainBuilder() override; + void add(std::unique_ptr<StorageLink> link) override; +}; + +MyStorageChainBuilder::MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context) + : storage::StorageChainBuilder(), + _context(std::move(context)) +{ +} + +MyStorageChainBuilder::~MyStorageChainBuilder() = default; + +void +MyStorageChainBuilder::add(std::unique_ptr<StorageLink> link) +{ + vespalib::string name = link->getName(); + Parent::add(std::move(link)); + if (name == "Communication manager") { + auto my_link = std::make_unique<BmStorageLink>(); + _context->bm_link = my_link.get(); + Parent::add(std::move(my_link)); + } +} + +StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<Context> context) + : IBmFeedHandler(), + _context(std::move(context)) +{ + auto cmd = make_set_cluster_state_cmd(); + PendingTracker tracker(1); + send_msg(std::move(cmd), tracker); + tracker.drain(); +} + +StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default; + +void +StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +{ + cmd->setSourceIndex(0); + auto bm_link = _context->bm_link; + bm_link->retain(cmd->getMsgId(), pending_tracker); + bm_link->sendDown(std::move(cmd)); +} + +void +StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); + send_msg(std::move(cmd), tracker); +} + +std::shared_ptr<StorageApiChainBmFeedHandler::Context> +StorageApiChainBmFeedHandler::get_context() +{ + return std::make_shared<Context>(); +} + +std::unique_ptr<storage::IStorageChainBuilder> +StorageApiChainBmFeedHandler::get_storage_chain_builder(std::shared_ptr<Context> context) +{ + return std::make_unique<MyStorageChainBuilder>(std::move(context)); +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h new file mode 100644 index 00000000000..521deddd19e --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace storage { class IStorageChainBuilder; } +namespace storage::api { class StorageCommand; } + +namespace feedbm { + +/* + * Benchmark feed handler for feed to service layer using storage api protocol + * directly on the storage chain. + */ +class StorageApiChainBmFeedHandler : public IBmFeedHandler +{ +public: + struct Context; +private: + std::shared_ptr<Context> _context; + void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); +public: + StorageApiChainBmFeedHandler(std::shared_ptr<Context> context); + ~StorageApiChainBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + + static std::shared_ptr<Context> get_context(); + static std::unique_ptr<storage::IStorageChainBuilder> get_storage_chain_builder(std::shared_ptr<Context> context); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp new file mode 100644 index 00000000000..c8d73444652 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp @@ -0,0 +1,146 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_rpc_bm_feed_handler.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/documentapi/loadtypes/loadtypeset.h> +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/storageserver/message_dispatcher.h> +#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> +#include <vespa/storage/storageserver/rpc/message_codec_provider.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> +#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/slobrok/sbmirror.h> +#include <cassert> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using document::DocumentTypeRepo; +using storage::api::StorageMessageAddress; +using storage::rpc::SharedRpcResources; + +namespace feedbm { + +namespace { + +FRT_RPCRequest * +make_set_cluster_state_request() { + storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); + storage::rpc::SlimeClusterStateBundleCodec codec; + auto encoded_bundle = codec.encode(bundle); + auto *req = new FRT_RPCRequest(); + auto* params = req->GetParams(); + params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type)); + params->AddInt32(encoded_bundle._uncompressed_length); + params->AddData(std::move(*encoded_bundle._buffer)); + req->SetMethodName("setdistributionstates"); + return req; +} + +void +set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) { + auto req = make_set_cluster_state_request(); + auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory()); + auto target = target_resolver->resolve_rpc_target(storage_address); + target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout + assert(!req->IsError()); + req->SubRef(); +} + +} + +class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher +{ + std::mutex _mutex; + vespalib::hash_map<uint64_t, PendingTracker *> _pending; +public: + MyMessageDispatcher() + : storage::MessageDispatcher(), + _mutex(), + _pending() + { + } + ~MyMessageDispatcher() override; + void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override { + release(msg->getMsgId()); + } + void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override { + release(msg->getMsgId()); + } + void retain(uint64_t msg_id, PendingTracker &tracker) { + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); + } + void release(uint64_t msg_id) { + PendingTracker *tracker = nullptr; + { + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + assert(itr != _pending.end()); + tracker = itr->second; + _pending.erase(itr); + } + tracker->release(); + } +}; + +StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo) + : IBmFeedHandler(), + _storage_address(std::make_unique<StorageMessageAddress>("storage", storage::lib::NodeType::STORAGE, 0)), + _shared_rpc_resources(shared_rpc_resources_in), + _message_dispatcher(std::make_unique<MyMessageDispatcher>()), + _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo, std::make_shared<documentapi::LoadTypeSet>())), + _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, storage::rpc::StorageApiRpcService::Params())) +{ + set_cluster_up(_shared_rpc_resources, *_storage_address); +} + +StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default; + +void +StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +{ + cmd->setSourceIndex(0); + cmd->setAddress(*_storage_address); + _message_dispatcher->retain(cmd->getMsgId(), pending_tracker); + _rpc_client->send_rpc_v1_request(std::move(cmd)); +} + +void +StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); + send_rpc(std::move(cmd), tracker); +} + +void +StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); + send_rpc(std::move(cmd), tracker); +} + +void +StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); + send_rpc(std::move(cmd), tracker); +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h new file mode 100644 index 00000000000..0fe92350eb2 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h @@ -0,0 +1,43 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace document { class DocumentTypeRepo; } +namespace storage::api { +class StorageMessageAddress; +class StorageCommand; +} + +namespace storage::rpc { +class MessageCodecProvider; +class SharedRpcResources; +class StorageApiRpcService; +} + +namespace feedbm { + +/* + * Benchmark feed handler for feed to service layer using storage api protocol + * over rpc. + */ +class StorageApiRpcBmFeedHandler : public IBmFeedHandler +{ + class MyMessageDispatcher; + std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; + storage::rpc::SharedRpcResources& _shared_rpc_resources; + std::unique_ptr<MyMessageDispatcher> _message_dispatcher; + std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider; + std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client; + + void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); +public: + StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo); + ~StorageApiRpcBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; +}; + +} diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index ecfa2a07cef..73d741c3fee 100644 --- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -1,7 +1,10 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "pending_tracker.h" +#include "spi_bm_feed_handler.h" +#include "storage_api_rpc_bm_feed_handler.h" +#include "storage_api_chain_bm_feed_handler.h" #include <vespa/vespalib/testkit/testapp.h> - #include <tests/proton/common/dummydbowner.h> #include <vespa/config-imported-fields.h> #include <vespa/config-rank-profiles.h> @@ -11,6 +14,7 @@ #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/repo/configbuilder.h> #include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/repo/document_type_repo_factory.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/document/update/documentupdate.h> @@ -37,11 +41,32 @@ #include <vespa/config-summary.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/fastos/app.h> +#include <vespa/storage/bucketdb/config-stor-bucket-init.h> +#include <vespa/storage/config/config-stor-bouncer.h> +#include <vespa/storage/config/config-stor-communicationmanager.h> +#include <vespa/storage/config/config-stor-opslogger.h> +#include <vespa/storage/config/config-stor-prioritymapping.h> +#include <vespa/storage/config/config-stor-server.h> +#include <vespa/storage/config/config-stor-status.h> +#include <vespa/storage/visiting/config-stor-visitor.h> +#include <vespa/config-load-type.h> +#include <vespa/config-persistence.h> +#include <vespa/config-stor-distribution.h> +#include <vespa/config-stor-filestor.h> +#include <vespa/config-upgrading.h> +#include <vespa/config-slobroks.h> +#include <vespa/metrics/config-metricsmanager.h> +#include <vespa/storageserver/app/servicelayerprocess.h> +#include <vespa/storage/common/i_storage_chain_builder.h> +#include <vespa/storage/storageserver/storagenode.h> +#include <vespa/messagebus/config-messagebus.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <getopt.h> #include <iostream> #include <vespa/log/log.h> -LOG_SETUP("vespa-spi-feed-bm"); +LOG_SETUP("vespa-feed-bm"); using namespace config; using namespace proton; @@ -51,7 +76,28 @@ using namespace vespa::config::search::summary; using namespace vespa::config::search; using namespace std::chrono_literals; using vespa::config::content::core::BucketspacesConfig; - +using vespa::config::content::core::BucketspacesConfigBuilder; +using vespa::config::content::StorDistributionConfigBuilder; +using vespa::config::content::StorFilestorConfigBuilder; +using vespa::config::content::PersistenceConfigBuilder; +using vespa::config::content::core::StorBouncerConfigBuilder; +using vespa::config::content::core::StorCommunicationmanagerConfigBuilder; +using vespa::config::content::core::StorBucketInitConfigBuilder; +using vespa::config::content::core::StorOpsloggerConfigBuilder; +using vespa::config::content::core::StorPrioritymappingConfigBuilder; +using vespa::config::content::LoadTypeConfigBuilder; +using vespa::config::content::UpgradingConfigBuilder; +using vespa::config::content::core::StorServerConfigBuilder; +using vespa::config::content::core::StorStatusConfigBuilder; +using vespa::config::content::core::StorVisitorConfigBuilder; +using metrics::MetricsmanagerConfigBuilder; +using cloud::config::SlobroksConfigBuilder; +using messagebus::MessagebusConfigBuilder; + +using config::ConfigContext; +using config::ConfigUri; +using config::ConfigSet; +using config::IConfigContext; using document::AssignValueUpdate; using document::BucketId; using document::BucketSpace; @@ -59,7 +105,9 @@ using document::Document; using document::DocumentId; using document::DocumentType; using document::DocumentTypeRepo; +using document::DocumentTypeRepoFactory; using document::DocumenttypesConfig; +using document::DocumenttypesConfigBuilder; using document::DocumentUpdate; using document::Field; using document::FieldUpdate; @@ -70,20 +118,18 @@ using search::index::DummyFileHeaderContext; using search::index::Schema; using search::index::SchemaBuilder; using search::transactionlog::TransLogServer; -using storage::spi::Bucket; -using storage::spi::PartitionId; +using storage::rpc::SharedRpcResources; using storage::spi::PersistenceProvider; -using storage::spi::Priority; -using storage::spi::Timestamp; -using storage::spi::Trace; using vespalib::makeLambdaTask; +using feedbm::IBmFeedHandler; +using feedbm::SpiBmFeedHandler; +using feedbm::StorageApiRpcBmFeedHandler; +using feedbm::StorageApiChainBmFeedHandler; using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>; namespace { -storage::spi::LoadType default_load_type(0, "default"); - vespalib::string base_dir = "testdb"; std::shared_ptr<DocumenttypesConfig> make_document_type() { @@ -144,84 +190,6 @@ struct MyResourceWriteFilter : public IResourceWriteFilter State getAcceptState() const override { return IResourceWriteFilter::State(); } }; -class MyPendingTracker { - uint32_t _pending; - uint32_t _limit; - std::mutex _mutex; - std::condition_variable _cond; - -public: - MyPendingTracker(uint32_t limit) - : _pending(0u), - _limit(limit), - _mutex(), - _cond() - { - } - - ~MyPendingTracker() - { - drain(); - } - - void release() { - std::unique_lock<std::mutex> guard(_mutex); - --_pending; - if (_pending < _limit) { - _cond.notify_all(); - } - //LOG(info, "release, pending is now %u", _pending); - } - void retain() { - std::unique_lock<std::mutex> guard(_mutex); - while (_pending >= _limit) { - _cond.wait(guard); - } - ++_pending; - //LOG(info, "retain, pending is now %u", _pending); - } - - void drain() { - std::unique_lock<std::mutex> guard(_mutex); - while (_pending > 0) { - _cond.wait(guard); - } - } -}; - -class MyOperationComplete : public storage::spi::OperationComplete -{ - MyPendingTracker& _tracker; -public: - MyOperationComplete(MyPendingTracker &tracker); - ~MyOperationComplete(); - void onComplete(std::unique_ptr<storage::spi::Result> result) override; - void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; -}; - -MyOperationComplete::MyOperationComplete(MyPendingTracker& tracker) - : _tracker(tracker) -{ - _tracker.retain(); -} - -MyOperationComplete::~MyOperationComplete() -{ - _tracker.release(); -} - -void -MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) -{ - (void) result; -} - -void -MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler) -{ - (void) resultHandler; -} - class BMRange { uint32_t _start; @@ -242,6 +210,9 @@ class BMParams { uint32_t _put_passes; uint32_t _update_passes; uint32_t _remove_passes; + uint32_t _rpc_network_threads; + bool _enable_service_layer; + bool _use_storage_chain; uint32_t get_start(uint32_t thread_id) const { return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads); } @@ -251,7 +222,10 @@ public: _threads(32), _put_passes(2), _update_passes(1), - _remove_passes(2) + _remove_passes(2), + _rpc_network_threads(1), + _enable_service_layer(false), + _use_storage_chain(false) { } BMRange get_range(uint32_t thread_id) const { @@ -262,11 +236,17 @@ public: uint32_t get_put_passes() const { return _put_passes; } uint32_t get_update_passes() const { return _update_passes; } uint32_t get_remove_passes() const { return _remove_passes; } + uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } + bool get_enable_service_layer() const { return _enable_service_layer; } + bool get_use_storage_chain() const { return _use_storage_chain; } void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_threads(uint32_t threads_in) { _threads = threads_in; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; } + void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } + void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } + void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } bool check() const; }; @@ -289,11 +269,177 @@ BMParams::check() const std::cerr << "Put passes too low: " << _put_passes << std::endl; return false; } + if (_rpc_network_threads < 1) { + std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl; + return false; + } return true; } +class MyServiceLayerProcess : public storage::ServiceLayerProcess { + PersistenceProvider& _provider; + +public: + MyServiceLayerProcess(const config::ConfigUri & configUri, + PersistenceProvider &provider, + std::unique_ptr<storage::IStorageChainBuilder> chain_builder); + ~MyServiceLayerProcess() override { shutdown(); } + + void shutdown() override; + void setupProvider() override; + PersistenceProvider& getProvider() override; +}; + +MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri, + PersistenceProvider &provider, + std::unique_ptr<storage::IStorageChainBuilder> chain_builder) + : ServiceLayerProcess(configUri), + _provider(provider) +{ + if (chain_builder) { + set_storage_chain_builder(std::move(chain_builder)); + } +} + +void +MyServiceLayerProcess::shutdown() +{ + ServiceLayerProcess::shutdown(); } +void +MyServiceLayerProcess::setupProvider() +{ +} + +PersistenceProvider& +MyServiceLayerProcess::getProvider() +{ + return _provider; +} + +struct MyStorageConfig +{ + vespalib::string config_id; + DocumenttypesConfigBuilder documenttypes; + PersistenceConfigBuilder persistence; + StorDistributionConfigBuilder stor_distribution; + StorFilestorConfigBuilder stor_filestor; + StorBouncerConfigBuilder stor_bouncer; + StorCommunicationmanagerConfigBuilder stor_communicationmanager; + StorBucketInitConfigBuilder stor_bucket_init; + StorOpsloggerConfigBuilder stor_opslogger; + StorPrioritymappingConfigBuilder stor_prioritymapping; + UpgradingConfigBuilder upgrading; + StorServerConfigBuilder stor_server; + StorStatusConfigBuilder stor_status; + StorVisitorConfigBuilder stor_visitor; + BucketspacesConfigBuilder bucketspaces; + LoadTypeConfigBuilder load_type; + MetricsmanagerConfigBuilder metricsmanager; + SlobroksConfigBuilder slobroks; + MessagebusConfigBuilder messagebus; + + MyStorageConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int status_port, uint32_t rpc_network_threads) + : config_id(config_id_in), + documenttypes(documenttypes_in), + persistence(), + stor_distribution(), + stor_filestor(), + stor_bouncer(), + stor_communicationmanager(), + stor_bucket_init(), + stor_opslogger(), + stor_prioritymapping(), + upgrading(), + stor_server(), + stor_status(), + stor_visitor(), + bucketspaces(), + load_type(), + metricsmanager(), + slobroks(), + messagebus() + { + { + auto &dc = stor_distribution; + { + StorDistributionConfigBuilder::Group group; + { + StorDistributionConfigBuilder::Group::Nodes node; + node.index = 0; + group.nodes.push_back(std::move(node)); + } + group.index = "invalid"; + group.name = "invalid"; + group.capacity = 1.0; + group.partitions = ""; + dc.group.push_back(std::move(group)); + } + dc.redundancy = 1; + dc.readyCopies = 1; + } + stor_server.rootFolder = "storage"; + { + SlobroksConfigBuilder::Slobrok slobrok; + slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); + slobroks.slobrok.push_back(std::move(slobrok)); + } + stor_communicationmanager.useDirectStorageapiRpc = true; + stor_communicationmanager.rpc.numNetworkThreads = rpc_network_threads; + stor_status.httpport = status_port; + } + + ~MyStorageConfig(); + + void add_builders(ConfigSet &set) { + set.addBuilder(config_id, &documenttypes); + set.addBuilder(config_id, &persistence); + set.addBuilder(config_id, &stor_distribution); + set.addBuilder(config_id, &stor_filestor); + set.addBuilder(config_id, &stor_bouncer); + set.addBuilder(config_id, &stor_communicationmanager); + set.addBuilder(config_id, &stor_bucket_init); + set.addBuilder(config_id, &stor_opslogger); + set.addBuilder(config_id, &stor_prioritymapping); + set.addBuilder(config_id, &upgrading); + set.addBuilder(config_id, &stor_server); + set.addBuilder(config_id, &stor_status); + set.addBuilder(config_id, &stor_visitor); + set.addBuilder(config_id, &bucketspaces); + set.addBuilder(config_id, &load_type); + set.addBuilder(config_id, &metricsmanager); + set.addBuilder(config_id, &slobroks); + set.addBuilder(config_id, &messagebus); + } +}; + +MyStorageConfig::~MyStorageConfig() = default; + +struct MyRpcClientConfig { + vespalib::string config_id; + SlobroksConfigBuilder slobroks; + + MyRpcClientConfig(const vespalib::string &config_id_in, int slobrok_port) + : config_id(config_id_in), + slobroks() + { + { + SlobroksConfigBuilder::Slobrok slobrok; + slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); + slobroks.slobrok.push_back(std::move(slobrok)); + } + } + ~MyRpcClientConfig(); + + void add_builders(ConfigSet &set) { + set.addBuilder(config_id, &slobroks); + } +}; + +MyRpcClientConfig::~MyRpcClientConfig() = default; + +} struct PersistenceProviderFixture { std::shared_ptr<DocumenttypesConfig> _document_types; @@ -305,6 +451,9 @@ struct PersistenceProviderFixture { vespalib::string _base_dir; DummyFileHeaderContext _file_header_context; int _tls_listen_port; + int _slobrok_port; + int _status_port; + int _rpc_client_port; TransLogServer _tls; vespalib::string _tls_spec; matching::QueryLimiter _query_limiter; @@ -318,24 +467,33 @@ struct PersistenceProviderFixture { MyPersistenceEngineOwner _persistence_owner; MyResourceWriteFilter _write_filter; std::shared_ptr<PersistenceEngine> _persistence_engine; - storage::spi::Context _context; uint32_t _bucket_bits; - - PersistenceProviderFixture(); + MyStorageConfig _service_layer_config; + MyRpcClientConfig _rpc_client_config; + ConfigSet _config_set; + std::shared_ptr<IConfigContext> _config_context; + std::unique_ptr<IBmFeedHandler> _feed_handler; + std::unique_ptr<mbus::Slobrok> _slobrok; + std::unique_ptr<MyServiceLayerProcess> _service_layer; + std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; + + PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); void create_document_db(); uint32_t num_buckets() const { return (1u << _bucket_bits); } BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); } - Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); } + document::Bucket make_bucket(uint32_t i) const { return document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))); } DocumentId make_document_id(uint32_t i) const; std::unique_ptr<Document> make_document(uint32_t i) const; std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const; void create_buckets(); + void start_service_layer(bool use_storage_chain); + void shutdown_service_layer(); }; -PersistenceProviderFixture::PersistenceProviderFixture() +PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) : _document_types(make_document_type()), - _repo(std::make_shared<DocumentTypeRepo>(*_document_types)), + _repo(DocumentTypeRepoFactory::make(*_document_types)), _doc_type_name("test"), _document_type(_repo->getDocumentType(_doc_type_name.getName())), _field(_document_type->getField("int")), @@ -343,6 +501,9 @@ PersistenceProviderFixture::PersistenceProviderFixture() _base_dir(base_dir), _file_header_context(), _tls_listen_port(9017), + _slobrok_port(9018), + _status_port(9019), + _rpc_client_port(9020), _tls("tls", _tls_listen_port, _base_dir, _file_header_context), _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), _query_limiter(), @@ -356,13 +517,23 @@ PersistenceProviderFixture::PersistenceProviderFixture() _persistence_owner(), _write_filter(), _persistence_engine(), - _context(default_load_type, Priority(0), Trace::TraceLevel(0)), - _bucket_bits(16) + _bucket_bits(16), + _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port, params.get_rpc_network_threads()), + _rpc_client_config("bm-rpc-client", _slobrok_port), + _config_set(), + _config_context(std::make_shared<ConfigContext>(_config_set)), + _feed_handler(), + _slobrok(), + _service_layer(), + _rpc_client_shared_rpc_resources() { create_document_db(); _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false); auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db); _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); + _service_layer_config.add_builders(_config_set); + _rpc_client_config.add_builders(_config_set); + _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine); } PersistenceProviderFixture::~PersistenceProviderFixture() @@ -448,9 +619,59 @@ PersistenceProviderFixture::make_document_update(uint32_t i) const void PersistenceProviderFixture::create_buckets() { - auto &provider = *_persistence_engine; + SpiBmFeedHandler feed_handler(*_persistence_engine); for (unsigned int i = 0; i < num_buckets(); ++i) { - provider.createBucket(make_bucket(i), _context); + feed_handler.create_bucket(make_bucket(i)); + } +} + +void +PersistenceProviderFixture::start_service_layer(bool use_storage_chain) +{ + LOG(info, "start slobrok"); + _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); + LOG(info, "start service layer"); + config::ConfigUri config_uri("bm-servicelayer", _config_context); + std::unique_ptr<storage::IStorageChainBuilder> chain_builder; + std::shared_ptr<StorageApiChainBmFeedHandler::Context> context; + if (use_storage_chain) { + context = StorageApiChainBmFeedHandler::get_context(); + chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(context); + } + _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, + *_persistence_engine, + std::move(chain_builder)); + _service_layer->setupConfig(100ms); + _service_layer->createNode(); + _service_layer->getNode().waitUntilInitialized(); + LOG(info, "start rpc client shared resources"); + config::ConfigUri client_config_uri("bm-rpc-client", _config_context); + _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100); + _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); + if (use_storage_chain) { + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(context)); + } else { + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); + } +} + +void +PersistenceProviderFixture::shutdown_service_layer() +{ + _feed_handler.reset(); + if (_rpc_client_shared_rpc_resources) { + LOG(info, "stop rpc client shared resources"); + _rpc_client_shared_rpc_resources->shutdown(); + _rpc_client_shared_rpc_resources.reset(); + } + if (_service_layer) { + LOG(info, "stop service layer"); + _service_layer->getNode().requestShutdown("controlled shutdown"); + _service_layer->shutdown(); + } + if (_slobrok) { + LOG(info, "stop slobrok"); + _slobrok.reset(); } } @@ -490,18 +711,16 @@ void put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); - MyPendingTracker pending_tracker(100); - auto &provider = *f._persistence_engine; - auto &context = f._context; + feedbm::PendingTracker pending_tracker(100); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; - Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + document::Bucket bucket(bucket_space, bucket_id); auto document = std::make_unique<Document>(repo, is); - provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker)); + f._feed_handler->put(bucket, std::move(document), time_bias + i, pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -541,18 +760,16 @@ void update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); - MyPendingTracker pending_tracker(100); - auto &provider = *f._persistence_engine; - auto &context = f._context; + feedbm::PendingTracker pending_tracker(100); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; - Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + document::Bucket bucket(bucket_space, bucket_id); auto document_update = DocumentUpdate::createHEAD(repo, is); - provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker)); + f._feed_handler->update(bucket, std::move(document_update), time_bias + i, pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -593,17 +810,15 @@ void remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); - MyPendingTracker pending_tracker(100); - auto &provider = *f._persistence_engine; - auto &context = f._context; + feedbm::PendingTracker pending_tracker(100); vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { is >> bucket_id; - Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + document::Bucket bucket(bucket_space, bucket_id); DocumentId document_id(is); - provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker)); + f._feed_handler->remove(bucket, document_id, time_bias + i, pending_tracker); } assert(is.empty()); pending_tracker.drain(); @@ -629,12 +844,15 @@ run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu void benchmark_async_spi(const BMParams &bm_params) { vespalib::rmdir(base_dir, true); - PersistenceProviderFixture f; + PersistenceProviderFixture f(bm_params); auto &provider = *f._persistence_engine; LOG(info, "start initialize"); provider.initialize(); LOG(info, "create %u buckets", f.num_buckets()); f.create_buckets(); + if (bm_params.get_enable_service_layer()) { + f.start_service_layer(bm_params.get_use_storage_chain()); + } vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024); auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put"); auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update"); @@ -649,6 +867,7 @@ void benchmark_async_spi(const BMParams &bm_params) for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) { run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params); } + f.shutdown_service_layer(); } class App : public FastOS_Application @@ -673,16 +892,19 @@ void App::usage() { std::cerr << - "vespa-spi-feed-bm version 0.0\n" + "vespa-feed-bm version 0.0\n" "\n" "USAGE:\n"; std::cerr << - "vespa-spi-feed-bm\n" + "vespa-feed-bm\n" "[--threads threads]\n" "[--documents documents]\n" "[--put-passes put-passes]\n" "[--update-passes update-passes]\n" - "[--remove-passes remove-passes]" << std::endl; + "[--remove-passes remove-passes]\n" + "[--rpc-network-threads threads]\n" + "[--enable-service-layer]\n" + "[--use-storage-chain]" << std::endl; } bool @@ -696,14 +918,20 @@ App::get_options() { "documents", 1, nullptr, 0 }, { "put-passes", 1, nullptr, 0 }, { "update-passes", 1, nullptr, 0 }, - { "remove-passes", 1, nullptr, 0 } + { "remove-passes", 1, nullptr, 0 }, + { "rpc-network-threads", 1, nullptr, 0 }, + { "enable-service-layer", 0, nullptr, 0 }, + { "use-storage-chain", 0, nullptr, 0 } }; enum longopts_enum { LONGOPT_THREADS, LONGOPT_DOCUMENTS, LONGOPT_PUT_PASSES, LONGOPT_UPDATE_PASSES, - LONGOPT_REMOVE_PASSES + LONGOPT_REMOVE_PASSES, + LONGOPT_RPC_NETWORK_THREADS, + LONGOPT_ENABLE_SERVICE_LAYER, + LONGOPT_USE_STORAGE_CHAIN }; int opt_index = 1; resetOptIndex(opt_index); @@ -726,6 +954,15 @@ App::get_options() case LONGOPT_REMOVE_PASSES: _bm_params.set_remove_passes(atoi(opt_argument)); break; + case LONGOPT_RPC_NETWORK_THREADS: + _bm_params.set_rpc_network_threads(atoi(opt_argument)); + break; + case LONGOPT_ENABLE_SERVICE_LAYER: + _bm_params.set_enable_service_layer(true); + break; + case LONGOPT_USE_STORAGE_CHAIN: + _bm_params.set_use_storage_chain(true); + break; default: return false; } @@ -751,7 +988,7 @@ App::Main() int main(int argc, char* argv[]) { - DummyFileHeaderContext::setCreator("vespa-spi-feed-bm"); + DummyFileHeaderContext::setCreator("vespa-feed-bm"); App app; auto exit_value = app.Entry(argc, argv); vespalib::rmdir(base_dir, true); diff --git a/searchcore/src/apps/vespa-spi-feed-bm/.gitignore b/searchcore/src/apps/vespa-spi-feed-bm/.gitignore deleted file mode 100644 index 02fff2fe280..00000000000 --- a/searchcore/src/apps/vespa-spi-feed-bm/.gitignore +++ /dev/null @@ -1 +0,0 @@ -vespa-spi-feed-bm diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp index 41de6827244..8711a21e5e6 100644 --- a/searchcore/src/tests/proton/attribute/attribute_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp @@ -192,6 +192,11 @@ public: void assertExecuteHistory(std::vector<uint32_t> expExecuteHistory) { EXPECT_EQ(expExecuteHistory, _attributeFieldWriter->getExecuteHistory()); } + SerialNum test_force_commit(AttributeVector &attr, SerialNum serialNum) { + commit(serialNum); + _attributeFieldWriter->sync(); + return attr.getStatus().getLastSyncToken(); + } }; AttributeWriterTest::~AttributeWriterTest() = default; @@ -975,6 +980,15 @@ TEST_F(AttributeWriterTest, forceCommit_clears_search_cache_in_imported_attribut EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_b")->getSearchCache()->size()); } +TEST_F(AttributeWriterTest, ignores_force_commit_serial_not_greater_than_create_serial) +{ + auto a1 = addAttribute("a1"); + a1->setCreateSerialNum(100); + EXPECT_EQ(0u, test_force_commit(*a1, 50u)); + EXPECT_EQ(0u, test_force_commit(*a1, 100u)); + EXPECT_EQ(150u, test_force_commit(*a1, 150u)); +} + class StructWriterTestBase : public AttributeWriterTest { public: DocumentType _type; diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index ad0ce0b26c4..72592cca681 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -434,6 +434,7 @@ struct MyTlsWriter : TlsWriter { MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {} void appendOperation(const FeedOperation &, DoneCallback) override { ++store_count; } + CommitResult startCommit(DoneCallback) override { return CommitResult(); } bool erase(SerialNum) override { ++erase_count; return erase_return; } SerialNum sync(SerialNum syncTo) override { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index 31882061b1c..04444647b5d 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -172,6 +172,9 @@ struct MyStorer : public IOperationStorer { ++_compactCnt; } } + CommitResult startCommit(DoneCallback) override { + return CommitResult(); + } }; struct MyFrozenBucketHandler : public IFrozenBucketHandler { diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 8df62705cb3..f033dfd50a8 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -235,6 +235,9 @@ public: // Implements IOperationStorer void appendOperation(const FeedOperation &op, DoneCallback) override; + CommitResult startCommit(DoneCallback) override { + return CommitResult(); + } uint32_t getHeartBeats() const { return _heartBeats; diff --git a/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp b/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp index 508a60480d0..421ebffafa4 100644 --- a/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp +++ b/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp @@ -8,6 +8,7 @@ using namespace proton::matching; using search::fef::FieldInfo; using search::fef::FieldType; using search::fef::Properties; +using search::fef::OnnxModel; using search::index::Schema; using search::index::schema::CollectionType; using search::index::schema::DataType; @@ -16,8 +17,8 @@ using SIAF = Schema::ImportedAttributeField; OnnxModels make_models() { OnnxModels::Vector list; - list.emplace_back("model1", "path1"); - list.emplace_back("model2", "path2"); + list.emplace_back(OnnxModel("model1", "path1").input_feature("input1","feature1").output_name("output1", "out1")); + list.emplace_back(OnnxModel("model2", "path2")); return OnnxModels(list); } @@ -104,10 +105,22 @@ TEST_F("require that imported attribute fields are extracted in index environmen EXPECT_EQUAL("[documentmetastore]", f.env.getField(2)->name()); } -TEST_F("require that onnx model paths can be obtained", Fixture(buildEmptySchema())) { - EXPECT_EQUAL(f1.env.getOnnxModelFullPath("model1").value(), vespalib::string("path1")); - EXPECT_EQUAL(f1.env.getOnnxModelFullPath("model2").value(), vespalib::string("path2")); - EXPECT_FALSE(f1.env.getOnnxModelFullPath("model3").has_value()); +TEST_F("require that onnx model config can be obtained", Fixture(buildEmptySchema())) { + { + auto model = f1.env.getOnnxModel("model1"); + ASSERT_TRUE(model != nullptr); + EXPECT_EQUAL(model->file_path(), vespalib::string("path1")); + EXPECT_EQUAL(model->input_feature("input1").value(), vespalib::string("feature1")); + EXPECT_EQUAL(model->output_name("output1").value(), vespalib::string("out1")); + } + { + auto model = f1.env.getOnnxModel("model2"); + ASSERT_TRUE(model != nullptr); + EXPECT_EQUAL(model->file_path(), vespalib::string("path2")); + EXPECT_FALSE(model->input_feature("input1").has_value()); + EXPECT_FALSE(model->output_name("output1").has_value()); + } + EXPECT_TRUE(f1.env.getOnnxModel("model3") == nullptr); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp b/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp index 1cc8d0280f6..c46990732b7 100644 --- a/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp +++ b/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/searchcommon/common/schema.h> #include <vespa/searchlib/fef/indexproperties.h> +#include <vespa/searchlib/fef/onnx_model.h> #include <string> #include <vector> #include <map> @@ -18,6 +19,7 @@ const char *invalid_feature = "invalid_feature_name and format"; using namespace search::fef::indexproperties; using namespace search::index; +using search::fef::OnnxModel; using search::index::schema::CollectionType; using search::index::schema::DataType; @@ -69,9 +71,12 @@ struct Setup { std::map<std::string,std::string> properties; std::map<std::string,std::string> constants; std::vector<bool> extra_profiles; - std::map<std::string,std::string> onnx_models; + std::map<std::string,OnnxModel> onnx_models; Setup(); ~Setup(); + void add_onnx_model(const OnnxModel &model) { + onnx_models.insert_or_assign(model.name(), model); + } void index(const std::string &name, schema::DataType data_type, schema::CollectionType collection_type) { @@ -155,8 +160,20 @@ struct Setup { void write_onnx_models(const Writer &out) { size_t idx = 0; for (const auto &entry: onnx_models) { - out.fmt("model[%zu].name \"%s\"\n", idx, entry.first.c_str()); + out.fmt("model[%zu].name \"%s\"\n", idx, entry.second.name().c_str()); out.fmt("model[%zu].fileref \"onnx_ref_%zu\"\n", idx, idx); + size_t idx2 = 0; + for (const auto &input: entry.second.inspect_input_features()) { + out.fmt("model[%zu].input[%zu].name \"%s\"\n", idx, idx2, input.first.c_str()); + out.fmt("model[%zu].input[%zu].source \"%s\"\n", idx, idx2, input.second.c_str()); + ++idx2; + } + idx2 = 0; + for (const auto &output: entry.second.inspect_output_names()) { + out.fmt("model[%zu].output[%zu].name \"%s\"\n", idx, idx2, output.first.c_str()); + out.fmt("model[%zu].output[%zu].as \"%s\"\n", idx, idx2, output.second.c_str()); + ++idx2; + } ++idx; } } @@ -164,7 +181,7 @@ struct Setup { size_t idx = 0; for (const auto &entry: onnx_models) { out.fmt("file[%zu].ref \"onnx_ref_%zu\"\n", idx, idx); - out.fmt("file[%zu].path \"%s\"\n", idx, entry.second.c_str()); + out.fmt("file[%zu].path \"%s\"\n", idx, entry.second.file_path().c_str()); ++idx; } } @@ -225,7 +242,12 @@ struct SimpleSetup : Setup { struct OnnxSetup : Setup { OnnxSetup() : Setup() { - onnx_models["simple"] = TEST_PATH("../../../../../eval/src/tests/tensor/onnx_wrapper/simple.onnx"); + add_onnx_model(OnnxModel("simple", TEST_PATH("../../../../../eval/src/tests/tensor/onnx_wrapper/simple.onnx"))); + add_onnx_model(OnnxModel("mapped", TEST_PATH("../../../../../eval/src/tests/tensor/onnx_wrapper/simple.onnx")) + .input_feature("query_tensor", "rankingExpression(qt)") + .input_feature("attribute_tensor", "rankingExpression(at)") + .input_feature("bias_tensor", "rankingExpression(bt)") + .output_name("output", "result")); } }; @@ -350,6 +372,13 @@ TEST_F("require that input type mismatch makes onnx model fail verification", On f.verify_invalid({"onnxModel(simple)"}); } +TEST_F("require that onnx model can have inputs and outputs mapped", OnnxSetup()) { + f.rank_expr("qt", "tensor<float>(a[1],b[4]):[[1,2,3,4]]"); + f.rank_expr("at", "tensor<float>(a[4],b[1]):[[5],[6],[7],[8]]"); + f.rank_expr("bt", "tensor<float>(a[1],b[1]):[[9]]"); + f.verify_valid({"onnxModel(mapped).result"}); +} + //----------------------------------------------------------------------------- TEST_F("cleanup files", Setup()) { diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index a49b27caf36..13695b969bf 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -212,7 +212,9 @@ void applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVector &attr) { if (attr.getStatus().getLastSyncToken() <= serialNum) { - attr.commit(serialNum, serialNum); + if (serialNum > attr.getCreateSerialNum()) { + attr.commit(serialNum, serialNum); + } } } diff --git a/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp b/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp index 1aa89b78966..73f909c115c 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp @@ -41,7 +41,7 @@ FeedDebugger::FeedDebugger() : _enableDebugging = ! (_debugLidList.empty() && _debugDocIdList.empty()); } -FeedDebugger::~FeedDebugger() {} +FeedDebugger::~FeedDebugger() = default; ns_log::Logger::LogLevel FeedDebugger::getDebugDebuggerInternal(uint32_t lid, const document::DocumentId * docid) const diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp index b04bac5ef26..1f979d1566c 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp @@ -7,6 +7,7 @@ namespace proton::documentmetastore { LidReuseDelayerConfig::LidReuseDelayerConfig(const DocumentDBConfig & configSnapshot) : _visibilityDelay(configSnapshot.getMaintenanceConfigSP()->getVisibilityDelay()), + _allowEarlyAck(configSnapshot.getMaintenanceConfigSP()->allowEarlyAck()), _hasIndexedOrAttributeFields(configSnapshot.getSchemaSP()->getNumIndexFields() > 0 || configSnapshot.getSchemaSP()->getNumAttributeFields() > 0) { @@ -18,6 +19,7 @@ LidReuseDelayerConfig::LidReuseDelayerConfig() LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in) : _visibilityDelay(visibilityDelay), + _allowEarlyAck(visibilityDelay > 1ms), _hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in) { } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h index 82dab433a22..c81a2ff399f 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h @@ -15,6 +15,7 @@ class LidReuseDelayerConfig { private: vespalib::duration _visibilityDelay; + bool _allowEarlyAck; bool _hasIndexedOrAttributeFields; public: LidReuseDelayerConfig(); @@ -22,6 +23,7 @@ public: explicit LidReuseDelayerConfig(const DocumentDBConfig &configSnapshot); vespalib::duration visibilityDelay() const { return _visibilityDelay; } bool hasIndexedOrAttributeFields() const { return _hasIndexedOrAttributeFields; } + bool allowEarlyAck() const { return _allowEarlyAck; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp index 03dfd83a132..ed2202c830b 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp @@ -16,6 +16,7 @@ LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &docume : _writeService(writeService), _documentMetaStore(documentMetaStore), _immediateCommit(config.visibilityDelay() == vespalib::duration::zero()), + _allowEarlyAck(config.allowEarlyAck()), _config(config), _pendingLids() { diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h index 5f1de878b4a..ba407ab57f8 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h @@ -27,6 +27,7 @@ class LidReuseDelayer searchcorespi::index::IThreadingService &_writeService; IStore &_documentMetaStore; const bool _immediateCommit; + const bool _allowEarlyAck; LidReuseDelayerConfig _config; std::vector<uint32_t> _pendingLids; // lids waiting for commit @@ -38,7 +39,8 @@ public: bool delayReuse(const std::vector<uint32_t> &lids); std::vector<uint32_t> getReuseLids(); - bool getImmediateCommit() const { return _immediateCommit; } + bool needImmediateCommit() const { return _immediateCommit; } + bool allowEarlyAck() const { return _allowEarlyAck; } const LidReuseDelayerConfig & getConfig() const { return _config; } }; diff --git a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp index 5743a3d44d6..013f359c4f9 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp @@ -131,13 +131,10 @@ IndexEnvironment::hintFieldAccess(uint32_t ) const { } void IndexEnvironment::hintAttributeAccess(const string &) const { } -std::optional<vespalib::string> -IndexEnvironment::getOnnxModelFullPath(const vespalib::string &name) const +const search::fef::OnnxModel * +IndexEnvironment::getOnnxModel(const vespalib::string &name) const { - if (const auto model = _onnxModels.getModel(name)) { - return model->filePath; - } - return std::nullopt; + return _onnxModels.getModel(name); } IndexEnvironment::~IndexEnvironment() = default; diff --git a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h index d0e9a516cd0..ad51eb17b4d 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h +++ b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h @@ -69,7 +69,7 @@ public: return _constantValueRepo.getConstant(name); } - std::optional<vespalib::string> getOnnxModelFullPath(const vespalib::string &name) const override; + const search::fef::OnnxModel *getOnnxModel(const vespalib::string &name) const override; ~IndexEnvironment() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp index bdcf3e21d8e..ed80ca28bd6 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp @@ -1,25 +1,10 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "onnx_models.h" +#include <assert.h> namespace proton::matching { -OnnxModels::Model::Model(const vespalib::string &name_in, - const vespalib::string &filePath_in) - : name(name_in), - filePath(filePath_in) -{ -} - -OnnxModels::Model::~Model() = default; - -bool -OnnxModels::Model::operator==(const Model &rhs) const -{ - return (name == rhs.name) && - (filePath == rhs.filePath); -} - OnnxModels::OnnxModels() : _models() { @@ -30,15 +15,15 @@ OnnxModels::~OnnxModels() = default; OnnxModels::OnnxModels(const Vector &models) : _models() { - for (const auto &model : models) { - _models.insert(std::make_pair(model.name, model)); + for (const auto &model: models) { + _models.emplace(model.name(), model); } } bool OnnxModels::operator==(const OnnxModels &rhs) const { - return _models == rhs._models; + return (_models == rhs._models); } const OnnxModels::Model * @@ -51,4 +36,16 @@ OnnxModels::getModel(const vespalib::string &name) const return nullptr; } +void +OnnxModels::configure(const ModelConfig &config, Model &model) +{ + assert(config.name == model.name()); + for (const auto &input: config.input) { + model.input_feature(input.name, input.source); + } + for (const auto &output: config.output) { + model.output_name(output.name, output.as); + } +} + } diff --git a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h index fdaae657711..65ba524d8fc 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h +++ b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h @@ -3,6 +3,8 @@ #pragma once #include <vespa/vespalib/stllike/string.h> +#include <vespa/searchlib/fef/onnx_model.h> +#include <vespa/searchcore/config/config-onnx-models.h> #include <map> #include <vector> @@ -14,16 +16,8 @@ namespace proton::matching { */ class OnnxModels { public: - struct Model { - vespalib::string name; - vespalib::string filePath; - - Model(const vespalib::string &name_in, - const vespalib::string &filePath_in); - ~Model(); - bool operator==(const Model &rhs) const; - }; - + using ModelConfig = vespa::config::search::core::OnnxModelsConfig::Model; + using Model = search::fef::OnnxModel; using Vector = std::vector<Model>; private: @@ -38,6 +32,7 @@ public: bool operator==(const OnnxModels &rhs) const; const Model *getModel(const vespalib::string &name) const; size_t size() const { return _models.size(); } + static void configure(const ModelConfig &config, Model &model); }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index 066e135741e..ea5d46f02ad 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -218,6 +218,16 @@ CombiningFeedView::heartBeat(search::SerialNum serialNum) } } +bool +CombiningFeedView::allowEarlyAck() const { + for (const auto &view : _views) { + if ( ! view->allowEarlyAck() ) { + return false; + } + } + return true; +} + void CombiningFeedView::sync() { diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index d1da0408318..3a37fdc37cb 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -83,6 +83,7 @@ public: // Called by document db executor void setCalculator(const IBucketStateCalculator::SP &newCalc); + bool allowEarlyAck() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h index 4c0485baec6..5ed0ad7492c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h @@ -135,6 +135,7 @@ public: } vespalib::duration getVisibilityDelay() const { return _visibilityDelay; } bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); } + bool allowEarlyAck() const { return _visibilityDelay > 1ms; } const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const { return _lidSpaceCompaction; } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 46bcb0e49bb..c63785faa35 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -911,7 +911,16 @@ DocumentDB::syncFeedView() IFeedView::SP newFeedView(_subDBs.getFeedView()); _writeService.sync(); - _visibility.commit(); + /* + * Don't call commit() on visibility handler during transaction + * log replay since the serial number used for the commit will be + * too high until the replay is complete. This check can be + * removed again when feed handler has improved tracking of serial + * numbers during replay. + */ + if (_state.getAllowReconfig()) { + _visibility.commit(); + } _writeService.sync(); _feedView.set(newFeedView); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index a8996abc856..c8b701e82f8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -321,6 +321,7 @@ DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) LOG(info, "Got file path from file acquirer: '%s' (name='%s', ref='%s')", filePath.c_str(), rc.name.c_str(), rc.fileref.c_str()); models.emplace_back(rc.name, filePath); + OnnxModels::configure(rc, models.back()); } } newOnnxModels = std::make_shared<OnnxModels>(models); diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 734ef01d33a..8b82478c1a4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -58,17 +58,20 @@ class TlsMgrWriter : public TlsWriter { std::shared_ptr<search::transactionlog::Writer> _writer; public: TlsMgrWriter(TransactionLogManager &tls_mgr, - const search::transactionlog::WriterFactory & factory) : - _tls_mgr(tls_mgr), - _writer(factory.getWriter(tls_mgr.getDomainName())) + const search::transactionlog::WriterFactory & factory) + : _tls_mgr(tls_mgr), + _writer(factory.getWriter(tls_mgr.getDomainName())) { } void appendOperation(const FeedOperation &op, DoneCallback onDone) override; + [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override { + return _writer->startCommit(std::move(onDone)); + } bool erase(SerialNum oldest_to_keep) override; SerialNum sync(SerialNum syncTo) override; }; - -void TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) { +void +TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) { using Packet = search::transactionlog::Packet; vespalib::nbostream stream; op.serialize(stream); @@ -77,9 +80,11 @@ void TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) Packet::Entry entry(op.getSerialNum(), op.getType(), vespalib::ConstBufferRef(stream.data(), stream.size())); Packet packet(entry.serializedSize()); packet.add(entry); - _writer->commit(packet, std::move(onDone)); + _writer->append(packet, std::move(onDone)); } -bool TlsMgrWriter::erase(SerialNum oldest_to_keep) { + +bool +TlsMgrWriter::erase(SerialNum oldest_to_keep) { return _tls_mgr.getSession()->erase(oldest_to_keep); } @@ -88,22 +93,40 @@ TlsMgrWriter::sync(SerialNum syncTo) { for (int retryCount = 0; retryCount < 10; ++retryCount) { SerialNum syncedTo(0); - LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo); + LOG(debug, "Trying tls sync(%" PRIu64 ")", syncTo); bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo); if (!res) { - LOG(spam, "Tls sync failed, retrying"); + LOG(debug, "Tls sync failed, retrying"); sleep(1); continue; } if (syncedTo >= syncTo) { - LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo); + LOG(debug, "Tls sync complete, reached %" PRIu64", returning", syncedTo); return syncedTo; } - LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); + LOG(debug, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); } +class OnCommitDone : public search::IDestructorCallback { +public: + OnCommitDone(Executor & executor, std::unique_ptr<Executor::Task> task) + : _executor(executor), + _task(std::move(task)) + {} + ~OnCommitDone() override { _executor.execute(std::move(_task)); } +private: + Executor & _executor; + std::unique_ptr<Executor::Task> _task; +}; + +template <typename T> +struct KeepAlive : public search::IDestructorCallback { + explicit KeepAlive(T toKeep) : _toKeep(std::move(toKeep)) { } + ~KeepAlive() override = default; + T _toKeep; +}; } // namespace void @@ -379,6 +402,9 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), + _numOperationsPendingCommit(0), + _numOperationsCompleted(0), + _numCommitsCompleted(0), _delayedPrune(false), _feedLock(), _feedState(make_shared<InitState>(getDocTypeName())), @@ -472,17 +498,57 @@ FeedHandler::getTransactionLogReplayDone() const { } void +FeedHandler::onCommitDone(size_t numPendingAtStart) { + assert(numPendingAtStart <= _numOperationsPendingCommit); + _numOperationsPendingCommit -= numPendingAtStart; + _numOperationsCompleted += numPendingAtStart; + _numCommitsCompleted++; + if (_numOperationsPendingCommit > 0) { + enqueCommitTask(); + } + LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu", + _numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit); +} + +void FeedHandler::enqueCommitTask() { + _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); })); +} + +void +FeedHandler::initiateCommit() { + auto onCommitDoneContext = std::make_shared<OnCommitDone>( + _writeService.master(), + makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit]() { + onCommitDone(numPendingAtStart); + })); + auto commitResult = _tlsWriter->startCommit(onCommitDoneContext); + if (_activeFeedView && ! _activeFeedView->allowEarlyAck()) { + using KeepAlivePair = KeepAlive<std::pair<CommitResult, DoneCallback>>; + auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext)); + _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlivePair>(std::move(pair))); + } +} + +void FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } _tlsWriter->appendOperation(op, std::move(onDone)); + if (++_numOperationsPendingCommit == 1) { + enqueCommitTask(); + } +} + +FeedHandler::CommitResult +FeedHandler::startCommit(DoneCallback onDone) { + return _tlsWriter->startCommit(std::move(onDone)); } void FeedHandler::storeOperationSync(const FeedOperation &op) { vespalib::Gate gate; - appendOperation(op, make_shared<search::GateCallback>(gate)); + appendAndCommitOperation(op, make_shared<search::GateCallback>(gate)); gate.await(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 29961f4a6cc..c295b26a759 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -76,6 +76,9 @@ private: // the serial num of the last message in the transaction log SerialNum _serialNum; SerialNum _prunedSerialNum; + size_t _numOperationsPendingCommit; + size_t _numOperationsCompleted; + size_t _numCommitsCompleted; bool _delayedPrune; mutable std::shared_mutex _feedLock; FeedStateSP _feedState; @@ -125,6 +128,9 @@ private: FeedStateSP getFeedState() const; void changeFeedState(FeedStateSP newState); void doChangeFeedState(FeedStateSP newState); + void onCommitDone(size_t numPendingAtStart); + void initiateCommit(); + void enqueCommitTask(); public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; @@ -226,6 +232,7 @@ public: void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); void appendOperation(const FeedOperation &op, DoneCallback onDone) override; + [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override; void storeOperationSync(const FeedOperation & op); void considerDelayedPrune(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.h b/searchcore/src/vespa/searchcore/proton/server/feedstate.h index 6de1d7a4322..7d559eb4375 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstate.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.h @@ -11,7 +11,7 @@ namespace vespalib { namespace proton { class FeedOperation; -class PacketWrapper; +struct PacketWrapper; /** * Class representing the current state of a feed handler. diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index 47b81a9a17f..c3b76a9db75 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -14,12 +14,18 @@ class FeedOperation; struct IOperationStorer { using DoneCallback = search::transactionlog::Writer::DoneCallback; + using CommitResult = search::transactionlog::Writer::CommitResult; virtual ~IOperationStorer() = default; /** * Assign serial number to (if not set) and store the given operation. */ virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0; + [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0; + void appendAndCommitOperation(const FeedOperation &op, DoneCallback onDone) { + appendOperation(op, onDone); + (void) startCommit(std::move(onDone)); + } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index 4b028a289a9..8dd7ae8474e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -65,6 +65,7 @@ public: virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation & pruneOp) = 0; virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) = 0; virtual ILidCommitState & getUncommittedLidsTracker() = 0; + virtual bool allowEarlyAck() const = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index d423e095ad9..468850b4409 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -82,7 +82,7 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); vespalib::Gate gate; - _opStorer.appendOperation(op, std::make_shared<search::GateCallback>(gate)); + _opStorer.appendAndCommitOperation(op, std::make_shared<search::GateCallback>(gate)); gate.await(); _handler.handleCompactLidSpace(op); EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index 37497eaa998..35549f21471 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -63,13 +63,13 @@ public: ~LidSpaceCompactionJob(); // Implements IDiskMemUsageListener - virtual void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyDiskMemUsage(DiskMemUsageState state) override; // Implements IClusterStateChangedNofifier - virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; // Implements IMaintenanceJob - virtual bool run() override; + bool run() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index d94bb2e3d03..d4b542a0af8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -105,7 +105,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, AttributeUsageFilter &attributeUsageFilter) { controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig())); controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval())); - if (config.hasVisibilityDelay()) { + if (config.hasVisibilityDelay() && config.allowEarlyAck()) { controller.registerJobInMasterThread(std::make_unique<DocumentDBCommitJob>(commit, config.getVisibilityDelay())); } const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB()); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 217a3bb24d3..90875aa8591 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -210,11 +210,11 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c } std::unique_ptr<PendingLidTrackerBase> -createUncommitedLidTracker(bool needImmediateCommit) { - if (needImmediateCommit) { - return std::make_unique<PendingLidTracker>(); - } else { +createUncommitedLidTracker(bool allowEarlyAck) { + if (allowEarlyAck) { return std::make_unique<TwoPhasePendingLidTracker>(); + } else { + return std::make_unique<PendingLidTracker>(); } } @@ -229,7 +229,7 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams _docType(nullptr), _lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get(), ctx._lidReuseDelayerConfig), _pendingLidsForDocStore(), - _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.getImmediateCommit())), + _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.allowEarlyAck())), _schema(ctx._schema), _writeService(ctx._writeService), _params(params), @@ -263,7 +263,7 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, DoneCallback onDone) void StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) { - (void) serialNum; + LOG(debug, "internalForceCommit: serial=%" PRIu64 ".", serialNum); _writeService.summary().execute(makeLambdaTask([onDone=onCommitDone]() {(void) onDone;})); std::vector<uint32_t> lidsToReuse; lidsToReuse = _lidReuseDelayer.getReuseLids(); @@ -275,7 +275,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp void StoreOnlyFeedView::considerEarlyAck(FeedToken & token) { - if ( ! needCommit() && token) { + if (allowEarlyAck() && token) { token.reset(); } } @@ -327,7 +327,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); if (putOp.getValidDbdId(_params._subDbId)) { - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(std::move(token), std::move(uncommitted), @@ -345,8 +345,13 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) } bool -StoreOnlyFeedView::needCommit() const { - return _lidReuseDelayer.getImmediateCommit(); +StoreOnlyFeedView::needImmediateCommit() const { + return _lidReuseDelayer.needImmediateCommit(); +} + +bool +StoreOnlyFeedView::allowEarlyAck() const { + return _lidReuseDelayer.allowEarlyAck(); } void @@ -483,7 +488,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) auto uncommitted = _pendingLidsForCommit->produce(updOp.getLid()); considerEarlyAck(token); - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate()); UpdateScope updateScope(*_schema, upd); updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope); @@ -657,7 +662,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token unc std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), std::move(moveDoneCtx)); removeSummary(serialNum, lid, onWriteDone); - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); removeAttributes(serialNum, lid, immediateCommit, onWriteDone); removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone); } @@ -770,7 +775,7 @@ StoreOnlyFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp) void StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp) { - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); size_t rm_count = removeDocuments(delOp, true, immediateCommit); LOG(debug, "internalDeleteBucket(): docType(%s), bucket(%s), lidsToRemove(%zu)", _params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count); @@ -809,7 +814,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()), diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 7d91ea86a22..20942423995 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -161,7 +161,7 @@ private: void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone); void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone); void heartBeatSummary(SerialNum serialNum); - bool needCommit() const; + bool needImmediateCommit() const; bool useDocumentStore(SerialNum replaySerialNum) const { @@ -264,6 +264,7 @@ public: void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; ILidCommitState & getUncommittedLidsTracker() override; + bool allowEarlyAck() const final override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index adfc911c8df..b96fd77409c 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -34,6 +34,7 @@ struct DummyFeedView : public IFeedView void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} void forceCommit(search::SerialNum, DoneCallback) override { } ILidCommitState & getUncommittedLidsTracker() override; + bool allowEarlyAck() const override { return false; } }; } |