summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt2
-rw-r--r--searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp1
-rw-r--r--searchcore/src/apps/vespa-feed-bm/.gitignore1
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt (renamed from searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt)12
-rw-r--r--searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h30
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker.h57
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp93
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h26
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp185
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h34
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp146
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h43
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp (renamed from searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp)479
-rw-r--r--searchcore/src/apps/vespa-spi-feed-bm/.gitignore1
-rw-r--r--searchcore/src/tests/proton/attribute/attribute_test.cpp14
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp1
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp3
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp3
-rw-r--r--searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp25
-rw-r--r--searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp37
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp35
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/onnx_models.h15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp90
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstate.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ifeedview.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp33
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h1
46 files changed, 1237 insertions, 209 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index f29712e7a5f..f98a3c87a2e 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -46,9 +46,9 @@ vespa_define_module(
src/apps/tests
src/apps/verify_ranksetup
src/apps/vespa-dump-feed
+ src/apps/vespa-feed-bm
src/apps/vespa-gen-testdocs
src/apps/vespa-proton-cmd
- src/apps/vespa-spi-feed-bm
src/apps/vespa-transactionlog-inspect
TESTS
diff --git a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp
index 2d028f47513..1d492cb558f 100644
--- a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp
+++ b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp
@@ -59,6 +59,7 @@ OnnxModels make_models(const OnnxModelsConfig &modelsCfg, const VerifyRanksetupC
for (const auto &entry: modelsCfg.model) {
if (auto file = get_file(entry.fileref, myCfg)) {
model_list.emplace_back(entry.name, file.value());
+ OnnxModels::configure(entry, model_list.back());
} else {
LOG(warning, "could not find file for onnx model '%s' (ref:'%s')\n",
entry.name.c_str(), entry.fileref.c_str());
diff --git a/searchcore/src/apps/vespa-feed-bm/.gitignore b/searchcore/src/apps/vespa-feed-bm/.gitignore
new file mode 100644
index 00000000000..0dc27e95ea8
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/.gitignore
@@ -0,0 +1 @@
+vespa-feed-bm
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index e188bc16ec0..4ced3fe173b 100644
--- a/searchcore/src/apps/vespa-spi-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -1,8 +1,11 @@
# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(searchcore_vespa_spi_feed_bm_app
+vespa_add_executable(searchcore_vespa_feed_bm_app
SOURCES
- vespa_spi_feed_bm.cpp
- OUTPUT_NAME vespa-spi-feed-bm
+ vespa_feed_bm.cpp
+ spi_bm_feed_handler.cpp
+ storage_api_rpc_bm_feed_handler.cpp
+ storage_api_chain_bm_feed_handler.cpp
+ OUTPUT_NAME vespa-feed-bm
DEPENDS
searchcore_server
searchcore_initializer
@@ -20,5 +23,8 @@ vespa_add_executable(searchcore_vespa_spi_feed_bm_app
searchcore_grouping
searchcore_proton_metrics
searchcore_fconfig
+ storageserver_storageapp
+ messagebus_messagebus-test
+ messagebus
searchlib_searchlib_uca
)
diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
new file mode 100644
index 00000000000..a3341bf14c9
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
@@ -0,0 +1,30 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+
+namespace document {
+class Bucket;
+class Document;
+class DocumentUpdate;
+class DocumentId;
+}
+
+namespace feedbm {
+
+class PendingTracker;
+
+/*
+ * Interface class for benchmark feed handler.
+ */
+class IBmFeedHandler
+{
+public:
+ virtual ~IBmFeedHandler() = default;
+ virtual void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) = 0;
+ virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0;
+ virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h
new file mode 100644
index 00000000000..3698832068f
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker.h
@@ -0,0 +1,57 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+
+namespace feedbm {
+
+/*
+ * Class to track number of pending operations, used as backpressure during
+ * benchmark feeding.
+ */
+class PendingTracker {
+ uint32_t _pending;
+ uint32_t _limit;
+ std::mutex _mutex;
+ std::condition_variable _cond;
+
+public:
+ PendingTracker(uint32_t limit)
+ : _pending(0u),
+ _limit(limit),
+ _mutex(),
+ _cond()
+ {
+ }
+
+ ~PendingTracker()
+ {
+ drain();
+ }
+
+ void release() {
+ std::unique_lock<std::mutex> guard(_mutex);
+ --_pending;
+ if (_pending < _limit) {
+ _cond.notify_all();
+ }
+ }
+ void retain() {
+ std::unique_lock<std::mutex> guard(_mutex);
+ while (_pending >= _limit) {
+ _cond.wait(guard);
+ }
+ ++_pending;
+ }
+
+ void drain() {
+ std::unique_lock<std::mutex> guard(_mutex);
+ while (_pending > 0) {
+ _cond.wait(guard);
+ }
+ }
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
new file mode 100644
index 00000000000..d53ece2fc42
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
@@ -0,0 +1,93 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "spi_bm_feed_handler.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/metrics/loadtype.h>
+#include <vespa/persistence/spi/persistenceprovider.h>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::spi::Bucket;
+using storage::spi::PartitionId;
+using storage::spi::PersistenceProvider;
+using storage::spi::Timestamp;
+
+namespace feedbm {
+
+namespace {
+
+storage::spi::LoadType default_load_type(0, "default");
+storage::spi::Context context(default_load_type, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
+
+class MyOperationComplete : public storage::spi::OperationComplete
+{
+ PendingTracker& _tracker;
+public:
+ MyOperationComplete(PendingTracker& tracker);
+ ~MyOperationComplete();
+ void onComplete(std::unique_ptr<storage::spi::Result> result) override;
+ void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
+};
+
+MyOperationComplete::MyOperationComplete(PendingTracker& tracker)
+ : _tracker(tracker)
+{
+ _tracker.retain();
+}
+
+MyOperationComplete::~MyOperationComplete()
+{
+ _tracker.release();
+}
+
+void
+MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
+{
+ (void) result;
+}
+
+void
+MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler)
+{
+ (void) resultHandler;
+}
+
+}
+
+SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider)
+ : IBmFeedHandler(),
+ _provider(provider)
+{
+}
+
+SpiBmFeedHandler::~SpiBmFeedHandler() = default;
+
+void
+SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ _provider.putAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(tracker));
+}
+
+void
+SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ _provider.updateAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(tracker));
+}
+
+void
+SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ _provider.removeAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(tracker));
+
+}
+
+void
+SpiBmFeedHandler::create_bucket(const document::Bucket& bucket)
+{
+ _provider.createBucket(Bucket(bucket, PartitionId(0)), context);
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
new file mode 100644
index 00000000000..5b56a4f21dd
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
@@ -0,0 +1,26 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace storage::spi { struct PersistenceProvider; }
+
+namespace feedbm {
+
+/*
+ * Benchmark feed handler for feed directly to persistence provider
+ */
+class SpiBmFeedHandler : public IBmFeedHandler
+{
+ storage::spi::PersistenceProvider& _provider;
+public:
+ SpiBmFeedHandler(storage::spi::PersistenceProvider& provider);
+ ~SpiBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+ void create_bucket(const document::Bucket& bucket);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
new file mode 100644
index 00000000000..6f1acd10fe4
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
@@ -0,0 +1,185 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_api_chain_bm_feed_handler.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storage/common/storage_chain_builder.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <cassert>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::StorageLink;
+
+namespace feedbm {
+
+namespace {
+
+std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() {
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
+ auto cmd = std::make_shared<storage::api::SetSystemStateCommand>(bundle);
+ cmd->setPriority(storage::api::StorageMessage::VERYHIGH);
+ return cmd;
+}
+
+}
+
+class BmStorageLink : public StorageLink
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+public:
+ BmStorageLink();
+ ~BmStorageLink() override;
+ bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
+ bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
+ void retain(uint64_t msg_id, PendingTracker &tracker) {
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+ }
+ bool release(uint64_t msg_id) {
+ PendingTracker *tracker = nullptr;
+ {
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ if (itr == _pending.end()) {
+ return false;
+ }
+ tracker = itr->second;
+ _pending.erase(itr);
+ }
+ tracker->release();
+ return true;
+ }
+};
+
+BmStorageLink::BmStorageLink()
+ : storage::StorageLink("vespa-bm-feed"),
+ _mutex(),
+ _pending()
+{
+}
+
+BmStorageLink::~BmStorageLink()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
+}
+
+bool
+BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
+{
+ (void) msg;
+ return false;
+}
+
+bool
+BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg)
+{
+ return release(msg->getMsgId());
+}
+
+struct StorageApiChainBmFeedHandler::Context {
+ BmStorageLink* bm_link;
+ Context()
+ : bm_link(nullptr)
+ {
+ }
+ ~Context() = default;
+};
+
+class MyStorageChainBuilder : public storage::StorageChainBuilder
+{
+ using Parent = storage::StorageChainBuilder;
+ std::shared_ptr<StorageApiChainBmFeedHandler::Context> _context;
+public:
+ MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context);
+ ~MyStorageChainBuilder() override;
+ void add(std::unique_ptr<StorageLink> link) override;
+};
+
+MyStorageChainBuilder::MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context)
+ : storage::StorageChainBuilder(),
+ _context(std::move(context))
+{
+}
+
+MyStorageChainBuilder::~MyStorageChainBuilder() = default;
+
+void
+MyStorageChainBuilder::add(std::unique_ptr<StorageLink> link)
+{
+ vespalib::string name = link->getName();
+ Parent::add(std::move(link));
+ if (name == "Communication manager") {
+ auto my_link = std::make_unique<BmStorageLink>();
+ _context->bm_link = my_link.get();
+ Parent::add(std::move(my_link));
+ }
+}
+
+StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<Context> context)
+ : IBmFeedHandler(),
+ _context(std::move(context))
+{
+ auto cmd = make_set_cluster_state_cmd();
+ PendingTracker tracker(1);
+ send_msg(std::move(cmd), tracker);
+ tracker.drain();
+}
+
+StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default;
+
+void
+StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+{
+ cmd->setSourceIndex(0);
+ auto bm_link = _context->bm_link;
+ bm_link->retain(cmd->getMsgId(), pending_tracker);
+ bm_link->sendDown(std::move(cmd));
+}
+
+void
+StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+std::shared_ptr<StorageApiChainBmFeedHandler::Context>
+StorageApiChainBmFeedHandler::get_context()
+{
+ return std::make_shared<Context>();
+}
+
+std::unique_ptr<storage::IStorageChainBuilder>
+StorageApiChainBmFeedHandler::get_storage_chain_builder(std::shared_ptr<Context> context)
+{
+ return std::make_unique<MyStorageChainBuilder>(std::move(context));
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
new file mode 100644
index 00000000000..521deddd19e
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
@@ -0,0 +1,34 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace storage { class IStorageChainBuilder; }
+namespace storage::api { class StorageCommand; }
+
+namespace feedbm {
+
+/*
+ * Benchmark feed handler for feed to service layer using storage api protocol
+ * directly on the storage chain.
+ */
+class StorageApiChainBmFeedHandler : public IBmFeedHandler
+{
+public:
+ struct Context;
+private:
+ std::shared_ptr<Context> _context;
+ void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+public:
+ StorageApiChainBmFeedHandler(std::shared_ptr<Context> context);
+ ~StorageApiChainBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+
+ static std::shared_ptr<Context> get_context();
+ static std::unique_ptr<storage::IStorageChainBuilder> get_storage_chain_builder(std::shared_ptr<Context> context);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
new file mode 100644
index 00000000000..c8d73444652
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
@@ -0,0 +1,146 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_api_rpc_bm_feed_handler.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storage/storageserver/message_dispatcher.h>
+#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
+#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
+#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/slobrok/sbmirror.h>
+#include <cassert>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using document::DocumentTypeRepo;
+using storage::api::StorageMessageAddress;
+using storage::rpc::SharedRpcResources;
+
+namespace feedbm {
+
+namespace {
+
+FRT_RPCRequest *
+make_set_cluster_state_request() {
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
+ storage::rpc::SlimeClusterStateBundleCodec codec;
+ auto encoded_bundle = codec.encode(bundle);
+ auto *req = new FRT_RPCRequest();
+ auto* params = req->GetParams();
+ params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
+ params->AddInt32(encoded_bundle._uncompressed_length);
+ params->AddData(std::move(*encoded_bundle._buffer));
+ req->SetMethodName("setdistributionstates");
+ return req;
+}
+
+void
+set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) {
+ auto req = make_set_cluster_state_request();
+ auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory());
+ auto target = target_resolver->resolve_rpc_target(storage_address);
+ target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
+ assert(!req->IsError());
+ req->SubRef();
+}
+
+}
+
+class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+public:
+ MyMessageDispatcher()
+ : storage::MessageDispatcher(),
+ _mutex(),
+ _pending()
+ {
+ }
+ ~MyMessageDispatcher() override;
+ void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override {
+ release(msg->getMsgId());
+ }
+ void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override {
+ release(msg->getMsgId());
+ }
+ void retain(uint64_t msg_id, PendingTracker &tracker) {
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+ }
+ void release(uint64_t msg_id) {
+ PendingTracker *tracker = nullptr;
+ {
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ assert(itr != _pending.end());
+ tracker = itr->second;
+ _pending.erase(itr);
+ }
+ tracker->release();
+ }
+};
+
+StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
+}
+
+StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo)
+ : IBmFeedHandler(),
+ _storage_address(std::make_unique<StorageMessageAddress>("storage", storage::lib::NodeType::STORAGE, 0)),
+ _shared_rpc_resources(shared_rpc_resources_in),
+ _message_dispatcher(std::make_unique<MyMessageDispatcher>()),
+ _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo, std::make_shared<documentapi::LoadTypeSet>())),
+ _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, storage::rpc::StorageApiRpcService::Params()))
+{
+ set_cluster_up(_shared_rpc_resources, *_storage_address);
+}
+
+StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default;
+
+void
+StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+{
+ cmd->setSourceIndex(0);
+ cmd->setAddress(*_storage_address);
+ _message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
+ _rpc_client->send_rpc_v1_request(std::move(cmd));
+}
+
+void
+StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
+ send_rpc(std::move(cmd), tracker);
+}
+
+void
+StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
+ send_rpc(std::move(cmd), tracker);
+}
+
+void
+StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
+ send_rpc(std::move(cmd), tracker);
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
new file mode 100644
index 00000000000..0fe92350eb2
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
@@ -0,0 +1,43 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace document { class DocumentTypeRepo; }
+namespace storage::api {
+class StorageMessageAddress;
+class StorageCommand;
+}
+
+namespace storage::rpc {
+class MessageCodecProvider;
+class SharedRpcResources;
+class StorageApiRpcService;
+}
+
+namespace feedbm {
+
+/*
+ * Benchmark feed handler for feed to service layer using storage api protocol
+ * over rpc.
+ */
+class StorageApiRpcBmFeedHandler : public IBmFeedHandler
+{
+ class MyMessageDispatcher;
+ std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
+ storage::rpc::SharedRpcResources& _shared_rpc_resources;
+ std::unique_ptr<MyMessageDispatcher> _message_dispatcher;
+ std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider;
+ std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client;
+
+ void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+public:
+ StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo);
+ ~StorageApiRpcBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index ecfa2a07cef..73d741c3fee 100644
--- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -1,7 +1,10 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "pending_tracker.h"
+#include "spi_bm_feed_handler.h"
+#include "storage_api_rpc_bm_feed_handler.h"
+#include "storage_api_chain_bm_feed_handler.h"
#include <vespa/vespalib/testkit/testapp.h>
-
#include <tests/proton/common/dummydbowner.h>
#include <vespa/config-imported-fields.h>
#include <vespa/config-rank-profiles.h>
@@ -11,6 +14,7 @@
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/repo/configbuilder.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/repo/document_type_repo_factory.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/update/assignvalueupdate.h>
#include <vespa/document/update/documentupdate.h>
@@ -37,11 +41,32 @@
#include <vespa/config-summary.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/fastos/app.h>
+#include <vespa/storage/bucketdb/config-stor-bucket-init.h>
+#include <vespa/storage/config/config-stor-bouncer.h>
+#include <vespa/storage/config/config-stor-communicationmanager.h>
+#include <vespa/storage/config/config-stor-opslogger.h>
+#include <vespa/storage/config/config-stor-prioritymapping.h>
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storage/config/config-stor-status.h>
+#include <vespa/storage/visiting/config-stor-visitor.h>
+#include <vespa/config-load-type.h>
+#include <vespa/config-persistence.h>
+#include <vespa/config-stor-distribution.h>
+#include <vespa/config-stor-filestor.h>
+#include <vespa/config-upgrading.h>
+#include <vespa/config-slobroks.h>
+#include <vespa/metrics/config-metricsmanager.h>
+#include <vespa/storageserver/app/servicelayerprocess.h>
+#include <vespa/storage/common/i_storage_chain_builder.h>
+#include <vespa/storage/storageserver/storagenode.h>
+#include <vespa/messagebus/config-messagebus.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <getopt.h>
#include <iostream>
#include <vespa/log/log.h>
-LOG_SETUP("vespa-spi-feed-bm");
+LOG_SETUP("vespa-feed-bm");
using namespace config;
using namespace proton;
@@ -51,7 +76,28 @@ using namespace vespa::config::search::summary;
using namespace vespa::config::search;
using namespace std::chrono_literals;
using vespa::config::content::core::BucketspacesConfig;
-
+using vespa::config::content::core::BucketspacesConfigBuilder;
+using vespa::config::content::StorDistributionConfigBuilder;
+using vespa::config::content::StorFilestorConfigBuilder;
+using vespa::config::content::PersistenceConfigBuilder;
+using vespa::config::content::core::StorBouncerConfigBuilder;
+using vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
+using vespa::config::content::core::StorBucketInitConfigBuilder;
+using vespa::config::content::core::StorOpsloggerConfigBuilder;
+using vespa::config::content::core::StorPrioritymappingConfigBuilder;
+using vespa::config::content::LoadTypeConfigBuilder;
+using vespa::config::content::UpgradingConfigBuilder;
+using vespa::config::content::core::StorServerConfigBuilder;
+using vespa::config::content::core::StorStatusConfigBuilder;
+using vespa::config::content::core::StorVisitorConfigBuilder;
+using metrics::MetricsmanagerConfigBuilder;
+using cloud::config::SlobroksConfigBuilder;
+using messagebus::MessagebusConfigBuilder;
+
+using config::ConfigContext;
+using config::ConfigUri;
+using config::ConfigSet;
+using config::IConfigContext;
using document::AssignValueUpdate;
using document::BucketId;
using document::BucketSpace;
@@ -59,7 +105,9 @@ using document::Document;
using document::DocumentId;
using document::DocumentType;
using document::DocumentTypeRepo;
+using document::DocumentTypeRepoFactory;
using document::DocumenttypesConfig;
+using document::DocumenttypesConfigBuilder;
using document::DocumentUpdate;
using document::Field;
using document::FieldUpdate;
@@ -70,20 +118,18 @@ using search::index::DummyFileHeaderContext;
using search::index::Schema;
using search::index::SchemaBuilder;
using search::transactionlog::TransLogServer;
-using storage::spi::Bucket;
-using storage::spi::PartitionId;
+using storage::rpc::SharedRpcResources;
using storage::spi::PersistenceProvider;
-using storage::spi::Priority;
-using storage::spi::Timestamp;
-using storage::spi::Trace;
using vespalib::makeLambdaTask;
+using feedbm::IBmFeedHandler;
+using feedbm::SpiBmFeedHandler;
+using feedbm::StorageApiRpcBmFeedHandler;
+using feedbm::StorageApiChainBmFeedHandler;
using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>;
namespace {
-storage::spi::LoadType default_load_type(0, "default");
-
vespalib::string base_dir = "testdb";
std::shared_ptr<DocumenttypesConfig> make_document_type() {
@@ -144,84 +190,6 @@ struct MyResourceWriteFilter : public IResourceWriteFilter
State getAcceptState() const override { return IResourceWriteFilter::State(); }
};
-class MyPendingTracker {
- uint32_t _pending;
- uint32_t _limit;
- std::mutex _mutex;
- std::condition_variable _cond;
-
-public:
- MyPendingTracker(uint32_t limit)
- : _pending(0u),
- _limit(limit),
- _mutex(),
- _cond()
- {
- }
-
- ~MyPendingTracker()
- {
- drain();
- }
-
- void release() {
- std::unique_lock<std::mutex> guard(_mutex);
- --_pending;
- if (_pending < _limit) {
- _cond.notify_all();
- }
- //LOG(info, "release, pending is now %u", _pending);
- }
- void retain() {
- std::unique_lock<std::mutex> guard(_mutex);
- while (_pending >= _limit) {
- _cond.wait(guard);
- }
- ++_pending;
- //LOG(info, "retain, pending is now %u", _pending);
- }
-
- void drain() {
- std::unique_lock<std::mutex> guard(_mutex);
- while (_pending > 0) {
- _cond.wait(guard);
- }
- }
-};
-
-class MyOperationComplete : public storage::spi::OperationComplete
-{
- MyPendingTracker& _tracker;
-public:
- MyOperationComplete(MyPendingTracker &tracker);
- ~MyOperationComplete();
- void onComplete(std::unique_ptr<storage::spi::Result> result) override;
- void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
-};
-
-MyOperationComplete::MyOperationComplete(MyPendingTracker& tracker)
- : _tracker(tracker)
-{
- _tracker.retain();
-}
-
-MyOperationComplete::~MyOperationComplete()
-{
- _tracker.release();
-}
-
-void
-MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
-{
- (void) result;
-}
-
-void
-MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * resultHandler)
-{
- (void) resultHandler;
-}
-
class BMRange
{
uint32_t _start;
@@ -242,6 +210,9 @@ class BMParams {
uint32_t _put_passes;
uint32_t _update_passes;
uint32_t _remove_passes;
+ uint32_t _rpc_network_threads;
+ bool _enable_service_layer;
+ bool _use_storage_chain;
uint32_t get_start(uint32_t thread_id) const {
return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads);
}
@@ -251,7 +222,10 @@ public:
_threads(32),
_put_passes(2),
_update_passes(1),
- _remove_passes(2)
+ _remove_passes(2),
+ _rpc_network_threads(1),
+ _enable_service_layer(false),
+ _use_storage_chain(false)
{
}
BMRange get_range(uint32_t thread_id) const {
@@ -262,11 +236,17 @@ public:
uint32_t get_put_passes() const { return _put_passes; }
uint32_t get_update_passes() const { return _update_passes; }
uint32_t get_remove_passes() const { return _remove_passes; }
+ uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
+ bool get_enable_service_layer() const { return _enable_service_layer; }
+ bool get_use_storage_chain() const { return _use_storage_chain; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_threads(uint32_t threads_in) { _threads = threads_in; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
+ void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
+ void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
+ void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; }
bool check() const;
};
@@ -289,11 +269,177 @@ BMParams::check() const
std::cerr << "Put passes too low: " << _put_passes << std::endl;
return false;
}
+ if (_rpc_network_threads < 1) {
+ std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl;
+ return false;
+ }
return true;
}
+class MyServiceLayerProcess : public storage::ServiceLayerProcess {
+ PersistenceProvider& _provider;
+
+public:
+ MyServiceLayerProcess(const config::ConfigUri & configUri,
+ PersistenceProvider &provider,
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder);
+ ~MyServiceLayerProcess() override { shutdown(); }
+
+ void shutdown() override;
+ void setupProvider() override;
+ PersistenceProvider& getProvider() override;
+};
+
+MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri,
+ PersistenceProvider &provider,
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder)
+ : ServiceLayerProcess(configUri),
+ _provider(provider)
+{
+ if (chain_builder) {
+ set_storage_chain_builder(std::move(chain_builder));
+ }
+}
+
+void
+MyServiceLayerProcess::shutdown()
+{
+ ServiceLayerProcess::shutdown();
}
+void
+MyServiceLayerProcess::setupProvider()
+{
+}
+
+PersistenceProvider&
+MyServiceLayerProcess::getProvider()
+{
+ return _provider;
+}
+
+struct MyStorageConfig
+{
+ vespalib::string config_id;
+ DocumenttypesConfigBuilder documenttypes;
+ PersistenceConfigBuilder persistence;
+ StorDistributionConfigBuilder stor_distribution;
+ StorFilestorConfigBuilder stor_filestor;
+ StorBouncerConfigBuilder stor_bouncer;
+ StorCommunicationmanagerConfigBuilder stor_communicationmanager;
+ StorBucketInitConfigBuilder stor_bucket_init;
+ StorOpsloggerConfigBuilder stor_opslogger;
+ StorPrioritymappingConfigBuilder stor_prioritymapping;
+ UpgradingConfigBuilder upgrading;
+ StorServerConfigBuilder stor_server;
+ StorStatusConfigBuilder stor_status;
+ StorVisitorConfigBuilder stor_visitor;
+ BucketspacesConfigBuilder bucketspaces;
+ LoadTypeConfigBuilder load_type;
+ MetricsmanagerConfigBuilder metricsmanager;
+ SlobroksConfigBuilder slobroks;
+ MessagebusConfigBuilder messagebus;
+
+ MyStorageConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int status_port, uint32_t rpc_network_threads)
+ : config_id(config_id_in),
+ documenttypes(documenttypes_in),
+ persistence(),
+ stor_distribution(),
+ stor_filestor(),
+ stor_bouncer(),
+ stor_communicationmanager(),
+ stor_bucket_init(),
+ stor_opslogger(),
+ stor_prioritymapping(),
+ upgrading(),
+ stor_server(),
+ stor_status(),
+ stor_visitor(),
+ bucketspaces(),
+ load_type(),
+ metricsmanager(),
+ slobroks(),
+ messagebus()
+ {
+ {
+ auto &dc = stor_distribution;
+ {
+ StorDistributionConfigBuilder::Group group;
+ {
+ StorDistributionConfigBuilder::Group::Nodes node;
+ node.index = 0;
+ group.nodes.push_back(std::move(node));
+ }
+ group.index = "invalid";
+ group.name = "invalid";
+ group.capacity = 1.0;
+ group.partitions = "";
+ dc.group.push_back(std::move(group));
+ }
+ dc.redundancy = 1;
+ dc.readyCopies = 1;
+ }
+ stor_server.rootFolder = "storage";
+ {
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+ }
+ stor_communicationmanager.useDirectStorageapiRpc = true;
+ stor_communicationmanager.rpc.numNetworkThreads = rpc_network_threads;
+ stor_status.httpport = status_port;
+ }
+
+ ~MyStorageConfig();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &documenttypes);
+ set.addBuilder(config_id, &persistence);
+ set.addBuilder(config_id, &stor_distribution);
+ set.addBuilder(config_id, &stor_filestor);
+ set.addBuilder(config_id, &stor_bouncer);
+ set.addBuilder(config_id, &stor_communicationmanager);
+ set.addBuilder(config_id, &stor_bucket_init);
+ set.addBuilder(config_id, &stor_opslogger);
+ set.addBuilder(config_id, &stor_prioritymapping);
+ set.addBuilder(config_id, &upgrading);
+ set.addBuilder(config_id, &stor_server);
+ set.addBuilder(config_id, &stor_status);
+ set.addBuilder(config_id, &stor_visitor);
+ set.addBuilder(config_id, &bucketspaces);
+ set.addBuilder(config_id, &load_type);
+ set.addBuilder(config_id, &metricsmanager);
+ set.addBuilder(config_id, &slobroks);
+ set.addBuilder(config_id, &messagebus);
+ }
+};
+
+MyStorageConfig::~MyStorageConfig() = default;
+
+struct MyRpcClientConfig {
+ vespalib::string config_id;
+ SlobroksConfigBuilder slobroks;
+
+ MyRpcClientConfig(const vespalib::string &config_id_in, int slobrok_port)
+ : config_id(config_id_in),
+ slobroks()
+ {
+ {
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+ }
+ }
+ ~MyRpcClientConfig();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &slobroks);
+ }
+};
+
+MyRpcClientConfig::~MyRpcClientConfig() = default;
+
+}
struct PersistenceProviderFixture {
std::shared_ptr<DocumenttypesConfig> _document_types;
@@ -305,6 +451,9 @@ struct PersistenceProviderFixture {
vespalib::string _base_dir;
DummyFileHeaderContext _file_header_context;
int _tls_listen_port;
+ int _slobrok_port;
+ int _status_port;
+ int _rpc_client_port;
TransLogServer _tls;
vespalib::string _tls_spec;
matching::QueryLimiter _query_limiter;
@@ -318,24 +467,33 @@ struct PersistenceProviderFixture {
MyPersistenceEngineOwner _persistence_owner;
MyResourceWriteFilter _write_filter;
std::shared_ptr<PersistenceEngine> _persistence_engine;
- storage::spi::Context _context;
uint32_t _bucket_bits;
-
- PersistenceProviderFixture();
+ MyStorageConfig _service_layer_config;
+ MyRpcClientConfig _rpc_client_config;
+ ConfigSet _config_set;
+ std::shared_ptr<IConfigContext> _config_context;
+ std::unique_ptr<IBmFeedHandler> _feed_handler;
+ std::unique_ptr<mbus::Slobrok> _slobrok;
+ std::unique_ptr<MyServiceLayerProcess> _service_layer;
+ std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
+
+ PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
void create_document_db();
uint32_t num_buckets() const { return (1u << _bucket_bits); }
BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); }
- Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); }
+ document::Bucket make_bucket(uint32_t i) const { return document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))); }
DocumentId make_document_id(uint32_t i) const;
std::unique_ptr<Document> make_document(uint32_t i) const;
std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const;
void create_buckets();
+ void start_service_layer(bool use_storage_chain);
+ void shutdown_service_layer();
};
-PersistenceProviderFixture::PersistenceProviderFixture()
+PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
: _document_types(make_document_type()),
- _repo(std::make_shared<DocumentTypeRepo>(*_document_types)),
+ _repo(DocumentTypeRepoFactory::make(*_document_types)),
_doc_type_name("test"),
_document_type(_repo->getDocumentType(_doc_type_name.getName())),
_field(_document_type->getField("int")),
@@ -343,6 +501,9 @@ PersistenceProviderFixture::PersistenceProviderFixture()
_base_dir(base_dir),
_file_header_context(),
_tls_listen_port(9017),
+ _slobrok_port(9018),
+ _status_port(9019),
+ _rpc_client_port(9020),
_tls("tls", _tls_listen_port, _base_dir, _file_header_context),
_tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)),
_query_limiter(),
@@ -356,13 +517,23 @@ PersistenceProviderFixture::PersistenceProviderFixture()
_persistence_owner(),
_write_filter(),
_persistence_engine(),
- _context(default_load_type, Priority(0), Trace::TraceLevel(0)),
- _bucket_bits(16)
+ _bucket_bits(16),
+ _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port, params.get_rpc_network_threads()),
+ _rpc_client_config("bm-rpc-client", _slobrok_port),
+ _config_set(),
+ _config_context(std::make_shared<ConfigContext>(_config_set)),
+ _feed_handler(),
+ _slobrok(),
+ _service_layer(),
+ _rpc_client_shared_rpc_resources()
{
create_document_db();
_persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false);
auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db);
_persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
+ _service_layer_config.add_builders(_config_set);
+ _rpc_client_config.add_builders(_config_set);
+ _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine);
}
PersistenceProviderFixture::~PersistenceProviderFixture()
@@ -448,9 +619,59 @@ PersistenceProviderFixture::make_document_update(uint32_t i) const
void
PersistenceProviderFixture::create_buckets()
{
- auto &provider = *_persistence_engine;
+ SpiBmFeedHandler feed_handler(*_persistence_engine);
for (unsigned int i = 0; i < num_buckets(); ++i) {
- provider.createBucket(make_bucket(i), _context);
+ feed_handler.create_bucket(make_bucket(i));
+ }
+}
+
+void
+PersistenceProviderFixture::start_service_layer(bool use_storage_chain)
+{
+ LOG(info, "start slobrok");
+ _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
+ LOG(info, "start service layer");
+ config::ConfigUri config_uri("bm-servicelayer", _config_context);
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder;
+ std::shared_ptr<StorageApiChainBmFeedHandler::Context> context;
+ if (use_storage_chain) {
+ context = StorageApiChainBmFeedHandler::get_context();
+ chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(context);
+ }
+ _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
+ *_persistence_engine,
+ std::move(chain_builder));
+ _service_layer->setupConfig(100ms);
+ _service_layer->createNode();
+ _service_layer->getNode().waitUntilInitialized();
+ LOG(info, "start rpc client shared resources");
+ config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
+ _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100);
+ _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
+ if (use_storage_chain) {
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(context));
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo);
+ }
+}
+
+void
+PersistenceProviderFixture::shutdown_service_layer()
+{
+ _feed_handler.reset();
+ if (_rpc_client_shared_rpc_resources) {
+ LOG(info, "stop rpc client shared resources");
+ _rpc_client_shared_rpc_resources->shutdown();
+ _rpc_client_shared_rpc_resources.reset();
+ }
+ if (_service_layer) {
+ LOG(info, "stop service layer");
+ _service_layer->getNode().requestShutdown("controlled shutdown");
+ _service_layer->shutdown();
+ }
+ if (_slobrok) {
+ LOG(info, "stop slobrok");
+ _slobrok.reset();
}
}
@@ -490,18 +711,16 @@ void
put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
- MyPendingTracker pending_tracker(100);
- auto &provider = *f._persistence_engine;
- auto &context = f._context;
+ feedbm::PendingTracker pending_tracker(100);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
- Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ document::Bucket bucket(bucket_space, bucket_id);
auto document = std::make_unique<Document>(repo, is);
- provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker));
+ f._feed_handler->put(bucket, std::move(document), time_bias + i, pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -541,18 +760,16 @@ void
update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
- MyPendingTracker pending_tracker(100);
- auto &provider = *f._persistence_engine;
- auto &context = f._context;
+ feedbm::PendingTracker pending_tracker(100);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
- Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ document::Bucket bucket(bucket_space, bucket_id);
auto document_update = DocumentUpdate::createHEAD(repo, is);
- provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker));
+ f._feed_handler->update(bucket, std::move(document_update), time_bias + i, pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -593,17 +810,15 @@ void
remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
- MyPendingTracker pending_tracker(100);
- auto &provider = *f._persistence_engine;
- auto &context = f._context;
+ feedbm::PendingTracker pending_tracker(100);
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
- Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ document::Bucket bucket(bucket_space, bucket_id);
DocumentId document_id(is);
- provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker));
+ f._feed_handler->remove(bucket, document_id, time_bias + i, pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -629,12 +844,15 @@ run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu
void benchmark_async_spi(const BMParams &bm_params)
{
vespalib::rmdir(base_dir, true);
- PersistenceProviderFixture f;
+ PersistenceProviderFixture f(bm_params);
auto &provider = *f._persistence_engine;
LOG(info, "start initialize");
provider.initialize();
LOG(info, "create %u buckets", f.num_buckets());
f.create_buckets();
+ if (bm_params.get_enable_service_layer()) {
+ f.start_service_layer(bm_params.get_use_storage_chain());
+ }
vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024);
auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put");
auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update");
@@ -649,6 +867,7 @@ void benchmark_async_spi(const BMParams &bm_params)
for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) {
run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params);
}
+ f.shutdown_service_layer();
}
class App : public FastOS_Application
@@ -673,16 +892,19 @@ void
App::usage()
{
std::cerr <<
- "vespa-spi-feed-bm version 0.0\n"
+ "vespa-feed-bm version 0.0\n"
"\n"
"USAGE:\n";
std::cerr <<
- "vespa-spi-feed-bm\n"
+ "vespa-feed-bm\n"
"[--threads threads]\n"
"[--documents documents]\n"
"[--put-passes put-passes]\n"
"[--update-passes update-passes]\n"
- "[--remove-passes remove-passes]" << std::endl;
+ "[--remove-passes remove-passes]\n"
+ "[--rpc-network-threads threads]\n"
+ "[--enable-service-layer]\n"
+ "[--use-storage-chain]" << std::endl;
}
bool
@@ -696,14 +918,20 @@ App::get_options()
{ "documents", 1, nullptr, 0 },
{ "put-passes", 1, nullptr, 0 },
{ "update-passes", 1, nullptr, 0 },
- { "remove-passes", 1, nullptr, 0 }
+ { "remove-passes", 1, nullptr, 0 },
+ { "rpc-network-threads", 1, nullptr, 0 },
+ { "enable-service-layer", 0, nullptr, 0 },
+ { "use-storage-chain", 0, nullptr, 0 }
};
enum longopts_enum {
LONGOPT_THREADS,
LONGOPT_DOCUMENTS,
LONGOPT_PUT_PASSES,
LONGOPT_UPDATE_PASSES,
- LONGOPT_REMOVE_PASSES
+ LONGOPT_REMOVE_PASSES,
+ LONGOPT_RPC_NETWORK_THREADS,
+ LONGOPT_ENABLE_SERVICE_LAYER,
+ LONGOPT_USE_STORAGE_CHAIN
};
int opt_index = 1;
resetOptIndex(opt_index);
@@ -726,6 +954,15 @@ App::get_options()
case LONGOPT_REMOVE_PASSES:
_bm_params.set_remove_passes(atoi(opt_argument));
break;
+ case LONGOPT_RPC_NETWORK_THREADS:
+ _bm_params.set_rpc_network_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_ENABLE_SERVICE_LAYER:
+ _bm_params.set_enable_service_layer(true);
+ break;
+ case LONGOPT_USE_STORAGE_CHAIN:
+ _bm_params.set_use_storage_chain(true);
+ break;
default:
return false;
}
@@ -751,7 +988,7 @@ App::Main()
int
main(int argc, char* argv[])
{
- DummyFileHeaderContext::setCreator("vespa-spi-feed-bm");
+ DummyFileHeaderContext::setCreator("vespa-feed-bm");
App app;
auto exit_value = app.Entry(argc, argv);
vespalib::rmdir(base_dir, true);
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/.gitignore b/searchcore/src/apps/vespa-spi-feed-bm/.gitignore
deleted file mode 100644
index 02fff2fe280..00000000000
--- a/searchcore/src/apps/vespa-spi-feed-bm/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-vespa-spi-feed-bm
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp
index 41de6827244..8711a21e5e6 100644
--- a/searchcore/src/tests/proton/attribute/attribute_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp
@@ -192,6 +192,11 @@ public:
void assertExecuteHistory(std::vector<uint32_t> expExecuteHistory) {
EXPECT_EQ(expExecuteHistory, _attributeFieldWriter->getExecuteHistory());
}
+ SerialNum test_force_commit(AttributeVector &attr, SerialNum serialNum) {
+ commit(serialNum);
+ _attributeFieldWriter->sync();
+ return attr.getStatus().getLastSyncToken();
+ }
};
AttributeWriterTest::~AttributeWriterTest() = default;
@@ -975,6 +980,15 @@ TEST_F(AttributeWriterTest, forceCommit_clears_search_cache_in_imported_attribut
EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_b")->getSearchCache()->size());
}
+TEST_F(AttributeWriterTest, ignores_force_commit_serial_not_greater_than_create_serial)
+{
+ auto a1 = addAttribute("a1");
+ a1->setCreateSerialNum(100);
+ EXPECT_EQ(0u, test_force_commit(*a1, 50u));
+ EXPECT_EQ(0u, test_force_commit(*a1, 100u));
+ EXPECT_EQ(150u, test_force_commit(*a1, 150u));
+}
+
class StructWriterTestBase : public AttributeWriterTest {
public:
DocumentType _type;
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index ad0ce0b26c4..72592cca681 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -434,6 +434,7 @@ struct MyTlsWriter : TlsWriter {
MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {}
void appendOperation(const FeedOperation &, DoneCallback) override { ++store_count; }
+ CommitResult startCommit(DoneCallback) override { return CommitResult(); }
bool erase(SerialNum) override { ++erase_count; return erase_return; }
SerialNum sync(SerialNum syncTo) override {
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
index 31882061b1c..04444647b5d 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
@@ -172,6 +172,9 @@ struct MyStorer : public IOperationStorer {
++_compactCnt;
}
}
+ CommitResult startCommit(DoneCallback) override {
+ return CommitResult();
+ }
};
struct MyFrozenBucketHandler : public IFrozenBucketHandler {
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 8df62705cb3..f033dfd50a8 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -235,6 +235,9 @@ public:
// Implements IOperationStorer
void appendOperation(const FeedOperation &op, DoneCallback) override;
+ CommitResult startCommit(DoneCallback) override {
+ return CommitResult();
+ }
uint32_t getHeartBeats() const {
return _heartBeats;
diff --git a/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp b/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp
index 508a60480d0..421ebffafa4 100644
--- a/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp
+++ b/searchcore/src/tests/proton/matching/index_environment/index_environment_test.cpp
@@ -8,6 +8,7 @@ using namespace proton::matching;
using search::fef::FieldInfo;
using search::fef::FieldType;
using search::fef::Properties;
+using search::fef::OnnxModel;
using search::index::Schema;
using search::index::schema::CollectionType;
using search::index::schema::DataType;
@@ -16,8 +17,8 @@ using SIAF = Schema::ImportedAttributeField;
OnnxModels make_models() {
OnnxModels::Vector list;
- list.emplace_back("model1", "path1");
- list.emplace_back("model2", "path2");
+ list.emplace_back(OnnxModel("model1", "path1").input_feature("input1","feature1").output_name("output1", "out1"));
+ list.emplace_back(OnnxModel("model2", "path2"));
return OnnxModels(list);
}
@@ -104,10 +105,22 @@ TEST_F("require that imported attribute fields are extracted in index environmen
EXPECT_EQUAL("[documentmetastore]", f.env.getField(2)->name());
}
-TEST_F("require that onnx model paths can be obtained", Fixture(buildEmptySchema())) {
- EXPECT_EQUAL(f1.env.getOnnxModelFullPath("model1").value(), vespalib::string("path1"));
- EXPECT_EQUAL(f1.env.getOnnxModelFullPath("model2").value(), vespalib::string("path2"));
- EXPECT_FALSE(f1.env.getOnnxModelFullPath("model3").has_value());
+TEST_F("require that onnx model config can be obtained", Fixture(buildEmptySchema())) {
+ {
+ auto model = f1.env.getOnnxModel("model1");
+ ASSERT_TRUE(model != nullptr);
+ EXPECT_EQUAL(model->file_path(), vespalib::string("path1"));
+ EXPECT_EQUAL(model->input_feature("input1").value(), vespalib::string("feature1"));
+ EXPECT_EQUAL(model->output_name("output1").value(), vespalib::string("out1"));
+ }
+ {
+ auto model = f1.env.getOnnxModel("model2");
+ ASSERT_TRUE(model != nullptr);
+ EXPECT_EQUAL(model->file_path(), vespalib::string("path2"));
+ EXPECT_FALSE(model->input_feature("input1").has_value());
+ EXPECT_FALSE(model->output_name("output1").has_value());
+ }
+ EXPECT_TRUE(f1.env.getOnnxModel("model3") == nullptr);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp b/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp
index 1cc8d0280f6..c46990732b7 100644
--- a/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp
+++ b/searchcore/src/tests/proton/verify_ranksetup/verify_ranksetup_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/searchcommon/common/schema.h>
#include <vespa/searchlib/fef/indexproperties.h>
+#include <vespa/searchlib/fef/onnx_model.h>
#include <string>
#include <vector>
#include <map>
@@ -18,6 +19,7 @@ const char *invalid_feature = "invalid_feature_name and format";
using namespace search::fef::indexproperties;
using namespace search::index;
+using search::fef::OnnxModel;
using search::index::schema::CollectionType;
using search::index::schema::DataType;
@@ -69,9 +71,12 @@ struct Setup {
std::map<std::string,std::string> properties;
std::map<std::string,std::string> constants;
std::vector<bool> extra_profiles;
- std::map<std::string,std::string> onnx_models;
+ std::map<std::string,OnnxModel> onnx_models;
Setup();
~Setup();
+ void add_onnx_model(const OnnxModel &model) {
+ onnx_models.insert_or_assign(model.name(), model);
+ }
void index(const std::string &name, schema::DataType data_type,
schema::CollectionType collection_type)
{
@@ -155,8 +160,20 @@ struct Setup {
void write_onnx_models(const Writer &out) {
size_t idx = 0;
for (const auto &entry: onnx_models) {
- out.fmt("model[%zu].name \"%s\"\n", idx, entry.first.c_str());
+ out.fmt("model[%zu].name \"%s\"\n", idx, entry.second.name().c_str());
out.fmt("model[%zu].fileref \"onnx_ref_%zu\"\n", idx, idx);
+ size_t idx2 = 0;
+ for (const auto &input: entry.second.inspect_input_features()) {
+ out.fmt("model[%zu].input[%zu].name \"%s\"\n", idx, idx2, input.first.c_str());
+ out.fmt("model[%zu].input[%zu].source \"%s\"\n", idx, idx2, input.second.c_str());
+ ++idx2;
+ }
+ idx2 = 0;
+ for (const auto &output: entry.second.inspect_output_names()) {
+ out.fmt("model[%zu].output[%zu].name \"%s\"\n", idx, idx2, output.first.c_str());
+ out.fmt("model[%zu].output[%zu].as \"%s\"\n", idx, idx2, output.second.c_str());
+ ++idx2;
+ }
++idx;
}
}
@@ -164,7 +181,7 @@ struct Setup {
size_t idx = 0;
for (const auto &entry: onnx_models) {
out.fmt("file[%zu].ref \"onnx_ref_%zu\"\n", idx, idx);
- out.fmt("file[%zu].path \"%s\"\n", idx, entry.second.c_str());
+ out.fmt("file[%zu].path \"%s\"\n", idx, entry.second.file_path().c_str());
++idx;
}
}
@@ -225,7 +242,12 @@ struct SimpleSetup : Setup {
struct OnnxSetup : Setup {
OnnxSetup() : Setup() {
- onnx_models["simple"] = TEST_PATH("../../../../../eval/src/tests/tensor/onnx_wrapper/simple.onnx");
+ add_onnx_model(OnnxModel("simple", TEST_PATH("../../../../../eval/src/tests/tensor/onnx_wrapper/simple.onnx")));
+ add_onnx_model(OnnxModel("mapped", TEST_PATH("../../../../../eval/src/tests/tensor/onnx_wrapper/simple.onnx"))
+ .input_feature("query_tensor", "rankingExpression(qt)")
+ .input_feature("attribute_tensor", "rankingExpression(at)")
+ .input_feature("bias_tensor", "rankingExpression(bt)")
+ .output_name("output", "result"));
}
};
@@ -350,6 +372,13 @@ TEST_F("require that input type mismatch makes onnx model fail verification", On
f.verify_invalid({"onnxModel(simple)"});
}
+TEST_F("require that onnx model can have inputs and outputs mapped", OnnxSetup()) {
+ f.rank_expr("qt", "tensor<float>(a[1],b[4]):[[1,2,3,4]]");
+ f.rank_expr("at", "tensor<float>(a[4],b[1]):[[5],[6],[7],[8]]");
+ f.rank_expr("bt", "tensor<float>(a[1],b[1]):[[9]]");
+ f.verify_valid({"onnxModel(mapped).result"});
+}
+
//-----------------------------------------------------------------------------
TEST_F("cleanup files", Setup()) {
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index a49b27caf36..13695b969bf 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -212,7 +212,9 @@ void
applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVector &attr)
{
if (attr.getStatus().getLastSyncToken() <= serialNum) {
- attr.commit(serialNum, serialNum);
+ if (serialNum > attr.getCreateSerialNum()) {
+ attr.commit(serialNum, serialNum);
+ }
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp b/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp
index 1aa89b78966..73f909c115c 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feeddebugger.cpp
@@ -41,7 +41,7 @@ FeedDebugger::FeedDebugger() :
_enableDebugging = ! (_debugLidList.empty() && _debugDocIdList.empty());
}
-FeedDebugger::~FeedDebugger() {}
+FeedDebugger::~FeedDebugger() = default;
ns_log::Logger::LogLevel
FeedDebugger::getDebugDebuggerInternal(uint32_t lid, const document::DocumentId * docid) const
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
index b04bac5ef26..1f979d1566c 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
@@ -7,6 +7,7 @@ namespace proton::documentmetastore {
LidReuseDelayerConfig::LidReuseDelayerConfig(const DocumentDBConfig & configSnapshot)
: _visibilityDelay(configSnapshot.getMaintenanceConfigSP()->getVisibilityDelay()),
+ _allowEarlyAck(configSnapshot.getMaintenanceConfigSP()->allowEarlyAck()),
_hasIndexedOrAttributeFields(configSnapshot.getSchemaSP()->getNumIndexFields() > 0 ||
configSnapshot.getSchemaSP()->getNumAttributeFields() > 0)
{
@@ -18,6 +19,7 @@ LidReuseDelayerConfig::LidReuseDelayerConfig()
LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in)
: _visibilityDelay(visibilityDelay),
+ _allowEarlyAck(visibilityDelay > 1ms),
_hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
index 82dab433a22..c81a2ff399f 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
@@ -15,6 +15,7 @@ class LidReuseDelayerConfig
{
private:
vespalib::duration _visibilityDelay;
+ bool _allowEarlyAck;
bool _hasIndexedOrAttributeFields;
public:
LidReuseDelayerConfig();
@@ -22,6 +23,7 @@ public:
explicit LidReuseDelayerConfig(const DocumentDBConfig &configSnapshot);
vespalib::duration visibilityDelay() const { return _visibilityDelay; }
bool hasIndexedOrAttributeFields() const { return _hasIndexedOrAttributeFields; }
+ bool allowEarlyAck() const { return _allowEarlyAck; }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
index 03dfd83a132..ed2202c830b 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
@@ -16,6 +16,7 @@ LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &docume
: _writeService(writeService),
_documentMetaStore(documentMetaStore),
_immediateCommit(config.visibilityDelay() == vespalib::duration::zero()),
+ _allowEarlyAck(config.allowEarlyAck()),
_config(config),
_pendingLids()
{
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
index 5f1de878b4a..ba407ab57f8 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
@@ -27,6 +27,7 @@ class LidReuseDelayer
searchcorespi::index::IThreadingService &_writeService;
IStore &_documentMetaStore;
const bool _immediateCommit;
+ const bool _allowEarlyAck;
LidReuseDelayerConfig _config;
std::vector<uint32_t> _pendingLids; // lids waiting for commit
@@ -38,7 +39,8 @@ public:
bool delayReuse(const std::vector<uint32_t> &lids);
std::vector<uint32_t> getReuseLids();
- bool getImmediateCommit() const { return _immediateCommit; }
+ bool needImmediateCommit() const { return _immediateCommit; }
+ bool allowEarlyAck() const { return _allowEarlyAck; }
const LidReuseDelayerConfig & getConfig() const { return _config; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp
index 5743a3d44d6..013f359c4f9 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.cpp
@@ -131,13 +131,10 @@ IndexEnvironment::hintFieldAccess(uint32_t ) const { }
void
IndexEnvironment::hintAttributeAccess(const string &) const { }
-std::optional<vespalib::string>
-IndexEnvironment::getOnnxModelFullPath(const vespalib::string &name) const
+const search::fef::OnnxModel *
+IndexEnvironment::getOnnxModel(const vespalib::string &name) const
{
- if (const auto model = _onnxModels.getModel(name)) {
- return model->filePath;
- }
- return std::nullopt;
+ return _onnxModels.getModel(name);
}
IndexEnvironment::~IndexEnvironment() = default;
diff --git a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h
index d0e9a516cd0..ad51eb17b4d 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/indexenvironment.h
@@ -69,7 +69,7 @@ public:
return _constantValueRepo.getConstant(name);
}
- std::optional<vespalib::string> getOnnxModelFullPath(const vespalib::string &name) const override;
+ const search::fef::OnnxModel *getOnnxModel(const vespalib::string &name) const override;
~IndexEnvironment() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp
index bdcf3e21d8e..ed80ca28bd6 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.cpp
@@ -1,25 +1,10 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "onnx_models.h"
+#include <assert.h>
namespace proton::matching {
-OnnxModels::Model::Model(const vespalib::string &name_in,
- const vespalib::string &filePath_in)
- : name(name_in),
- filePath(filePath_in)
-{
-}
-
-OnnxModels::Model::~Model() = default;
-
-bool
-OnnxModels::Model::operator==(const Model &rhs) const
-{
- return (name == rhs.name) &&
- (filePath == rhs.filePath);
-}
-
OnnxModels::OnnxModels()
: _models()
{
@@ -30,15 +15,15 @@ OnnxModels::~OnnxModels() = default;
OnnxModels::OnnxModels(const Vector &models)
: _models()
{
- for (const auto &model : models) {
- _models.insert(std::make_pair(model.name, model));
+ for (const auto &model: models) {
+ _models.emplace(model.name(), model);
}
}
bool
OnnxModels::operator==(const OnnxModels &rhs) const
{
- return _models == rhs._models;
+ return (_models == rhs._models);
}
const OnnxModels::Model *
@@ -51,4 +36,16 @@ OnnxModels::getModel(const vespalib::string &name) const
return nullptr;
}
+void
+OnnxModels::configure(const ModelConfig &config, Model &model)
+{
+ assert(config.name == model.name());
+ for (const auto &input: config.input) {
+ model.input_feature(input.name, input.source);
+ }
+ for (const auto &output: config.output) {
+ model.output_name(output.name, output.as);
+ }
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h
index fdaae657711..65ba524d8fc 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/onnx_models.h
@@ -3,6 +3,8 @@
#pragma once
#include <vespa/vespalib/stllike/string.h>
+#include <vespa/searchlib/fef/onnx_model.h>
+#include <vespa/searchcore/config/config-onnx-models.h>
#include <map>
#include <vector>
@@ -14,16 +16,8 @@ namespace proton::matching {
*/
class OnnxModels {
public:
- struct Model {
- vespalib::string name;
- vespalib::string filePath;
-
- Model(const vespalib::string &name_in,
- const vespalib::string &filePath_in);
- ~Model();
- bool operator==(const Model &rhs) const;
- };
-
+ using ModelConfig = vespa::config::search::core::OnnxModelsConfig::Model;
+ using Model = search::fef::OnnxModel;
using Vector = std::vector<Model>;
private:
@@ -38,6 +32,7 @@ public:
bool operator==(const OnnxModels &rhs) const;
const Model *getModel(const vespalib::string &name) const;
size_t size() const { return _models.size(); }
+ static void configure(const ModelConfig &config, Model &model);
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
index 066e135741e..ea5d46f02ad 100644
--- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp
@@ -218,6 +218,16 @@ CombiningFeedView::heartBeat(search::SerialNum serialNum)
}
}
+bool
+CombiningFeedView::allowEarlyAck() const {
+ for (const auto &view : _views) {
+ if ( ! view->allowEarlyAck() ) {
+ return false;
+ }
+ }
+ return true;
+}
+
void
CombiningFeedView::sync()
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
index d1da0408318..3a37fdc37cb 100644
--- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h
@@ -83,6 +83,7 @@ public:
// Called by document db executor
void setCalculator(const IBucketStateCalculator::SP &newCalc);
+ bool allowEarlyAck() const override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
index 4c0485baec6..5ed0ad7492c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
@@ -135,6 +135,7 @@ public:
}
vespalib::duration getVisibilityDelay() const { return _visibilityDelay; }
bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); }
+ bool allowEarlyAck() const { return _visibilityDelay > 1ms; }
const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const {
return _lidSpaceCompaction;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 46bcb0e49bb..c63785faa35 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -911,7 +911,16 @@ DocumentDB::syncFeedView()
IFeedView::SP newFeedView(_subDBs.getFeedView());
_writeService.sync();
- _visibility.commit();
+ /*
+ * Don't call commit() on visibility handler during transaction
+ * log replay since the serial number used for the commit will be
+ * too high until the replay is complete. This check can be
+ * removed again when feed handler has improved tracking of serial
+ * numbers during replay.
+ */
+ if (_state.getAllowReconfig()) {
+ _visibility.commit();
+ }
_writeService.sync();
_feedView.set(newFeedView);
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
index a8996abc856..c8b701e82f8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
@@ -321,6 +321,7 @@ DocumentDBConfigManager::update(const ConfigSnapshot &snapshot)
LOG(info, "Got file path from file acquirer: '%s' (name='%s', ref='%s')",
filePath.c_str(), rc.name.c_str(), rc.fileref.c_str());
models.emplace_back(rc.name, filePath);
+ OnnxModels::configure(rc, models.back());
}
}
newOnnxModels = std::make_shared<OnnxModels>(models);
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 734ef01d33a..8b82478c1a4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -58,17 +58,20 @@ class TlsMgrWriter : public TlsWriter {
std::shared_ptr<search::transactionlog::Writer> _writer;
public:
TlsMgrWriter(TransactionLogManager &tls_mgr,
- const search::transactionlog::WriterFactory & factory) :
- _tls_mgr(tls_mgr),
- _writer(factory.getWriter(tls_mgr.getDomainName()))
+ const search::transactionlog::WriterFactory & factory)
+ : _tls_mgr(tls_mgr),
+ _writer(factory.getWriter(tls_mgr.getDomainName()))
{ }
void appendOperation(const FeedOperation &op, DoneCallback onDone) override;
+ [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override {
+ return _writer->startCommit(std::move(onDone));
+ }
bool erase(SerialNum oldest_to_keep) override;
SerialNum sync(SerialNum syncTo) override;
};
-
-void TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) {
+void
+TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) {
using Packet = search::transactionlog::Packet;
vespalib::nbostream stream;
op.serialize(stream);
@@ -77,9 +80,11 @@ void TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone)
Packet::Entry entry(op.getSerialNum(), op.getType(), vespalib::ConstBufferRef(stream.data(), stream.size()));
Packet packet(entry.serializedSize());
packet.add(entry);
- _writer->commit(packet, std::move(onDone));
+ _writer->append(packet, std::move(onDone));
}
-bool TlsMgrWriter::erase(SerialNum oldest_to_keep) {
+
+bool
+TlsMgrWriter::erase(SerialNum oldest_to_keep) {
return _tls_mgr.getSession()->erase(oldest_to_keep);
}
@@ -88,22 +93,40 @@ TlsMgrWriter::sync(SerialNum syncTo)
{
for (int retryCount = 0; retryCount < 10; ++retryCount) {
SerialNum syncedTo(0);
- LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo);
+ LOG(debug, "Trying tls sync(%" PRIu64 ")", syncTo);
bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo);
if (!res) {
- LOG(spam, "Tls sync failed, retrying");
+ LOG(debug, "Tls sync failed, retrying");
sleep(1);
continue;
}
if (syncedTo >= syncTo) {
- LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo);
+ LOG(debug, "Tls sync complete, reached %" PRIu64", returning", syncedTo);
return syncedTo;
}
- LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo);
+ LOG(debug, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo);
}
throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo));
}
+class OnCommitDone : public search::IDestructorCallback {
+public:
+ OnCommitDone(Executor & executor, std::unique_ptr<Executor::Task> task)
+ : _executor(executor),
+ _task(std::move(task))
+ {}
+ ~OnCommitDone() override { _executor.execute(std::move(_task)); }
+private:
+ Executor & _executor;
+ std::unique_ptr<Executor::Task> _task;
+};
+
+template <typename T>
+struct KeepAlive : public search::IDestructorCallback {
+ explicit KeepAlive(T toKeep) : _toKeep(std::move(toKeep)) { }
+ ~KeepAlive() override = default;
+ T _toKeep;
+};
} // namespace
void
@@ -379,6 +402,9 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_tlsReplayProgress(),
_serialNum(0),
_prunedSerialNum(0),
+ _numOperationsPendingCommit(0),
+ _numOperationsCompleted(0),
+ _numCommitsCompleted(0),
_delayedPrune(false),
_feedLock(),
_feedState(make_shared<InitState>(getDocTypeName())),
@@ -472,17 +498,57 @@ FeedHandler::getTransactionLogReplayDone() const {
}
void
+FeedHandler::onCommitDone(size_t numPendingAtStart) {
+ assert(numPendingAtStart <= _numOperationsPendingCommit);
+ _numOperationsPendingCommit -= numPendingAtStart;
+ _numOperationsCompleted += numPendingAtStart;
+ _numCommitsCompleted++;
+ if (_numOperationsPendingCommit > 0) {
+ enqueCommitTask();
+ }
+ LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu",
+ _numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit);
+}
+
+void FeedHandler::enqueCommitTask() {
+ _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); }));
+}
+
+void
+FeedHandler::initiateCommit() {
+ auto onCommitDoneContext = std::make_shared<OnCommitDone>(
+ _writeService.master(),
+ makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit]() {
+ onCommitDone(numPendingAtStart);
+ }));
+ auto commitResult = _tlsWriter->startCommit(onCommitDoneContext);
+ if (_activeFeedView && ! _activeFeedView->allowEarlyAck()) {
+ using KeepAlivePair = KeepAlive<std::pair<CommitResult, DoneCallback>>;
+ auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext));
+ _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlivePair>(std::move(pair)));
+ }
+}
+
+void
FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) {
if (!op.getSerialNum()) {
const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
_tlsWriter->appendOperation(op, std::move(onDone));
+ if (++_numOperationsPendingCommit == 1) {
+ enqueCommitTask();
+ }
+}
+
+FeedHandler::CommitResult
+FeedHandler::startCommit(DoneCallback onDone) {
+ return _tlsWriter->startCommit(std::move(onDone));
}
void
FeedHandler::storeOperationSync(const FeedOperation &op) {
vespalib::Gate gate;
- appendOperation(op, make_shared<search::GateCallback>(gate));
+ appendAndCommitOperation(op, make_shared<search::GateCallback>(gate));
gate.await();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 29961f4a6cc..c295b26a759 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -76,6 +76,9 @@ private:
// the serial num of the last message in the transaction log
SerialNum _serialNum;
SerialNum _prunedSerialNum;
+ size_t _numOperationsPendingCommit;
+ size_t _numOperationsCompleted;
+ size_t _numCommitsCompleted;
bool _delayedPrune;
mutable std::shared_mutex _feedLock;
FeedStateSP _feedState;
@@ -125,6 +128,9 @@ private:
FeedStateSP getFeedState() const;
void changeFeedState(FeedStateSP newState);
void doChangeFeedState(FeedStateSP newState);
+ void onCommitDone(size_t numPendingAtStart);
+ void initiateCommit();
+ void enqueCommitTask();
public:
FeedHandler(const FeedHandler &) = delete;
FeedHandler & operator = (const FeedHandler &) = delete;
@@ -226,6 +232,7 @@ public:
void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override;
void syncTls(SerialNum syncTo);
void appendOperation(const FeedOperation &op, DoneCallback onDone) override;
+ [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override;
void storeOperationSync(const FeedOperation & op);
void considerDelayedPrune();
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.h b/searchcore/src/vespa/searchcore/proton/server/feedstate.h
index 6de1d7a4322..7d559eb4375 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstate.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.h
@@ -11,7 +11,7 @@ namespace vespalib {
namespace proton {
class FeedOperation;
-class PacketWrapper;
+struct PacketWrapper;
/**
* Class representing the current state of a feed handler.
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
index 47b81a9a17f..c3b76a9db75 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
@@ -14,12 +14,18 @@ class FeedOperation;
struct IOperationStorer
{
using DoneCallback = search::transactionlog::Writer::DoneCallback;
+ using CommitResult = search::transactionlog::Writer::CommitResult;
virtual ~IOperationStorer() = default;
/**
* Assign serial number to (if not set) and store the given operation.
*/
virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0;
+ [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0;
+ void appendAndCommitOperation(const FeedOperation &op, DoneCallback onDone) {
+ appendOperation(op, onDone);
+ (void) startCommit(std::move(onDone));
+ }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
index 4b028a289a9..8dd7ae8474e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h
@@ -65,6 +65,7 @@ public:
virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation & pruneOp) = 0;
virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) = 0;
virtual ILidCommitState & getUncommittedLidsTracker() = 0;
+ virtual bool allowEarlyAck() const = 0;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index d423e095ad9..468850b4409 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -82,7 +82,7 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats)
uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit);
vespalib::Gate gate;
- _opStorer.appendOperation(op, std::make_shared<search::GateCallback>(gate));
+ _opStorer.appendAndCommitOperation(op, std::make_shared<search::GateCallback>(gate));
gate.await();
_handler.handleCompactLidSpace(op);
EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit);
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
index 37497eaa998..35549f21471 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
@@ -63,13 +63,13 @@ public:
~LidSpaceCompactionJob();
// Implements IDiskMemUsageListener
- virtual void notifyDiskMemUsage(DiskMemUsageState state) override;
+ void notifyDiskMemUsage(DiskMemUsageState state) override;
// Implements IClusterStateChangedNofifier
- virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
+ void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
// Implements IMaintenanceJob
- virtual bool run() override;
+ bool run() override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index d94bb2e3d03..d4b542a0af8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -105,7 +105,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
AttributeUsageFilter &attributeUsageFilter) {
controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig()));
controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));
- if (config.hasVisibilityDelay()) {
+ if (config.hasVisibilityDelay() && config.allowEarlyAck()) {
controller.registerJobInMasterThread(std::make_unique<DocumentDBCommitJob>(commit, config.getVisibilityDelay()));
}
const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB());
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 217a3bb24d3..90875aa8591 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -210,11 +210,11 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c
}
std::unique_ptr<PendingLidTrackerBase>
-createUncommitedLidTracker(bool needImmediateCommit) {
- if (needImmediateCommit) {
- return std::make_unique<PendingLidTracker>();
- } else {
+createUncommitedLidTracker(bool allowEarlyAck) {
+ if (allowEarlyAck) {
return std::make_unique<TwoPhasePendingLidTracker>();
+ } else {
+ return std::make_unique<PendingLidTracker>();
}
}
@@ -229,7 +229,7 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams
_docType(nullptr),
_lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get(), ctx._lidReuseDelayerConfig),
_pendingLidsForDocStore(),
- _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.getImmediateCommit())),
+ _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.allowEarlyAck())),
_schema(ctx._schema),
_writeService(ctx._writeService),
_params(params),
@@ -263,7 +263,7 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, DoneCallback onDone)
void
StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone)
{
- (void) serialNum;
+ LOG(debug, "internalForceCommit: serial=%" PRIu64 ".", serialNum);
_writeService.summary().execute(makeLambdaTask([onDone=onCommitDone]() {(void) onDone;}));
std::vector<uint32_t> lidsToReuse;
lidsToReuse = _lidReuseDelayer.getReuseLids();
@@ -275,7 +275,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp
void
StoreOnlyFeedView::considerEarlyAck(FeedToken & token)
{
- if ( ! needCommit() && token) {
+ if (allowEarlyAck() && token) {
token.reset();
}
}
@@ -327,7 +327,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
if (putOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(std::move(token), std::move(uncommitted),
@@ -345,8 +345,13 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
}
bool
-StoreOnlyFeedView::needCommit() const {
- return _lidReuseDelayer.getImmediateCommit();
+StoreOnlyFeedView::needImmediateCommit() const {
+ return _lidReuseDelayer.needImmediateCommit();
+}
+
+bool
+StoreOnlyFeedView::allowEarlyAck() const {
+ return _lidReuseDelayer.allowEarlyAck();
}
void
@@ -483,7 +488,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
auto uncommitted = _pendingLidsForCommit->produce(updOp.getLid());
considerEarlyAck(token);
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate());
UpdateScope updateScope(*_schema, upd);
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope);
@@ -657,7 +662,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token unc
std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u),
std::move(moveDoneCtx));
removeSummary(serialNum, lid, onWriteDone);
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
removeAttributes(serialNum, lid, immediateCommit, onWriteDone);
removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone);
}
@@ -770,7 +775,7 @@ StoreOnlyFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp)
void
StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp)
{
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
size_t rm_count = removeDocuments(delOp, true, immediateCommit);
LOG(debug, "internalDeleteBucket(): docType(%s), bucket(%s), lidsToRemove(%zu)",
_params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count);
@@ -809,7 +814,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()),
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 7d91ea86a22..20942423995 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -161,7 +161,7 @@ private:
void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone);
void heartBeatSummary(SerialNum serialNum);
- bool needCommit() const;
+ bool needImmediateCommit() const;
bool useDocumentStore(SerialNum replaySerialNum) const {
@@ -264,6 +264,7 @@ public:
void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override;
void handleCompactLidSpace(const CompactLidSpaceOperation &op) override;
ILidCommitState & getUncommittedLidsTracker() override;
+ bool allowEarlyAck() const final override;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
index adfc911c8df..b96fd77409c 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h
@@ -34,6 +34,7 @@ struct DummyFeedView : public IFeedView
void handleCompactLidSpace(const CompactLidSpaceOperation &) override {}
void forceCommit(search::SerialNum, DoneCallback) override { }
ILidCommitState & getUncommittedLidsTracker() override;
+ bool allowEarlyAck() const override { return false; }
};
}