summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-30 15:25:46 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-30 15:25:46 +0200
commit6337d6d35aaecf788b82ff6fa9ecd190d1b55f37 (patch)
treecc3d70dc83bcb04dde5ad6723873006bcd2486af /searchcore
parentff205ce5e2eccafeb0957007fb2671f1488e57c3 (diff)
Move out classes to separate files. Enable feeding via distributor.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt3
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp54
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h21
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp34
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h26
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp64
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.h31
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h20
-rw-r--r--searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h4
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp20
-rw-r--r--searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h4
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp133
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h16
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp64
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h7
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp95
16 files changed, 417 insertions, 179 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index 897073397ef..b5465b1f0dd 100644
--- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -2,6 +2,9 @@
vespa_add_executable(searchcore_vespa_feed_bm_app
SOURCES
vespa_feed_bm.cpp
+ bm_cluster_controller.cpp
+ bm_storage_chain_builder.cpp
+ bm_storage_link.cpp
spi_bm_feed_handler.cpp
storage_api_rpc_bm_feed_handler.cpp
storage_api_chain_bm_feed_handler.cpp
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
new file mode 100644
index 00000000000..324f98625f3
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
@@ -0,0 +1,54 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_cluster_controller.h"
+#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/slobrok/sbmirror.h>
+
+using storage::api::StorageMessageAddress;
+using storage::rpc::SharedRpcResources;
+using storage::lib::NodeType;
+
+namespace feedbm {
+
+namespace {
+
+FRT_RPCRequest *
+make_set_cluster_state_request()
+{
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
+ storage::rpc::SlimeClusterStateBundleCodec codec;
+ auto encoded_bundle = codec.encode(bundle);
+ auto *req = new FRT_RPCRequest();
+ auto* params = req->GetParams();
+ params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
+ params->AddInt32(encoded_bundle._uncompressed_length);
+ params->AddData(std::move(*encoded_bundle._buffer));
+ req->SetMethodName("setdistributionstates");
+ return req;
+}
+
+}
+
+BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in)
+ : _shared_rpc_resources(shared_rpc_resources_in)
+{
+}
+
+void
+BmClusterController::set_cluster_up(bool distributor)
+{
+ StorageMessageAddress storage_address("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0);
+ auto req = make_set_cluster_state_request();
+ auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(), _shared_rpc_resources.target_factory());
+ auto target = target_resolver->resolve_rpc_target(storage_address);
+ target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
+ assert(!req->IsError());
+ req->SubRef();
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h
new file mode 100644
index 00000000000..699036be5c9
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace storage::api { class StorageMessageAddress; }
+namespace storage::rpc { class SharedRpcResources; }
+
+namespace feedbm {
+
+/*
+ * Fake cluster controller that sets cluster state to be up.
+ */
+class BmClusterController
+{
+ storage::rpc::SharedRpcResources& _shared_rpc_resources;
+public:
+ BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in);
+ void set_cluster_up(bool distributor);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp
new file mode 100644
index 00000000000..bbe0de70ce2
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp
@@ -0,0 +1,34 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_storage_chain_builder.h"
+#include "bm_storage_link_context.h"
+#include "bm_storage_link.h"
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bm_storage_chain_builder");
+
+namespace feedbm {
+
+BmStorageChainBuilder::BmStorageChainBuilder()
+ : storage::StorageChainBuilder(),
+ _context(std::make_shared<BmStorageLinkContext>())
+{
+}
+
+BmStorageChainBuilder::~BmStorageChainBuilder() = default;
+
+void
+BmStorageChainBuilder::add(std::unique_ptr<storage::StorageLink> link)
+{
+ vespalib::string name = link->getName();
+ storage::StorageChainBuilder::add(std::move(link));
+ LOG(info, "Added storage link '%s'", name.c_str());
+ if (name == "Communication manager") {
+ auto my_link = std::make_unique<BmStorageLink>();
+ LOG(info, "Adding extra storage link '%s'", my_link->getName().c_str());
+ _context->bm_link = my_link.get();
+ storage::StorageChainBuilder::add(std::move(my_link));
+ }
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h
new file mode 100644
index 00000000000..b1347b5e953
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h
@@ -0,0 +1,26 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/storage/common/storage_chain_builder.h>
+
+namespace feedbm {
+
+class BmStorageLinkContext;
+
+/*
+ * Storage chain builder that inserts a BmStorageLink right below the
+ * communication manager. This allows sending benchmark feed to chain.
+ */
+class BmStorageChainBuilder : public storage::StorageChainBuilder
+{
+ std::shared_ptr<BmStorageLinkContext> _context;
+public:
+ BmStorageChainBuilder();
+ ~BmStorageChainBuilder() override;
+ const std::shared_ptr<BmStorageLinkContext>& get_context() { return _context; }
+ void add(std::unique_ptr<storage::StorageLink> link) override;
+};
+
+}
+
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
new file mode 100644
index 00000000000..79517e98094
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
@@ -0,0 +1,64 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_storage_link.h"
+#include "pending_tracker.h"
+#include <vespa/vespalib/stllike/hash_map.hpp>
+
+namespace feedbm {
+
+
+void
+BmStorageLink::retain(uint64_t msg_id, PendingTracker &tracker)
+{
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+}
+
+PendingTracker *
+BmStorageLink::release(uint64_t msg_id)
+{
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ if (itr == _pending.end()) {
+ return nullptr;
+ }
+ auto tracker = itr->second;
+ _pending.erase(itr);
+ return tracker;
+}
+
+BmStorageLink::BmStorageLink()
+ : storage::StorageLink("vespa-bm-feed"),
+ StorageReplyErrorChecker(),
+ _mutex(),
+ _pending()
+{
+}
+
+BmStorageLink::~BmStorageLink()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
+}
+
+bool
+BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
+{
+ (void) msg;
+ return false;
+}
+
+bool
+BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg)
+{
+ auto tracker = release(msg->getMsgId());
+ if (tracker != nullptr) {
+ check_error(*msg);
+ tracker->release();
+ return true;
+ }
+ return false;
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
new file mode 100644
index 00000000000..63ece355c02
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
@@ -0,0 +1,31 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "storage_reply_error_checker.h"
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+
+namespace feedbm {
+
+class PendingTracker;
+
+/*
+ * Storage link used to feed storage api messages to a distributor or
+ * service layer node. A count of error replies is maintained.
+ */
+class BmStorageLink : public storage::StorageLink,
+ public StorageReplyErrorChecker
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+ PendingTracker *release(uint64_t msg_id);
+public:
+ BmStorageLink();
+ ~BmStorageLink() override;
+ bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
+ bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
+ void retain(uint64_t msg_id, PendingTracker &tracker);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h
new file mode 100644
index 00000000000..adb2a13ec10
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h
@@ -0,0 +1,20 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+namespace feedbm {
+
+class BmStorageLink;
+
+/*
+ * This context is initialized by BmStorageChainBuilder.
+ */
+struct BmStorageLinkContext
+{
+ BmStorageLink* bm_link;
+ BmStorageLinkContext()
+ : bm_link(nullptr)
+ {
+ }
+ ~BmStorageLinkContext() = default;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
index 81a08552c0c..63d0913977e 100644
--- a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
@@ -3,6 +3,7 @@
#pragma once
#include <memory>
+#include <vespa/vespalib/stllike/string.h>
namespace document {
class Bucket;
@@ -26,6 +27,9 @@ public:
virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0;
virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0;
virtual uint32_t get_error_count() const = 0;
+ virtual const vespalib::string &get_name() const = 0;
+ virtual bool manages_buckets() const = 0;
+ virtual bool manages_timestamp() const = 0;
};
}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
index 2df97841bb9..7020e52eaef 100644
--- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
@@ -63,6 +63,7 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result
SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider)
: IBmFeedHandler(),
+ _name("SpiBmFeedHandler"),
_provider(provider),
_errors(0u)
{
@@ -100,4 +101,23 @@ SpiBmFeedHandler::get_error_count() const
{
return _errors;
}
+
+const vespalib::string&
+SpiBmFeedHandler::get_name() const
+{
+ return _name;
+}
+
+bool
+SpiBmFeedHandler::manages_buckets() const
+{
+ return false;
+}
+
+bool
+SpiBmFeedHandler::manages_timestamp() const
+{
+ return false;
+}
+
}
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
index bda41898510..49f526ab5b7 100644
--- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
@@ -14,6 +14,7 @@ namespace feedbm {
*/
class SpiBmFeedHandler : public IBmFeedHandler
{
+ vespalib::string _name;
storage::spi::PersistenceProvider& _provider;
std::atomic<uint32_t> _errors;
public:
@@ -24,6 +25,9 @@ public:
void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
void create_bucket(const document::Bucket& bucket);
uint32_t get_error_count() const override;
+ const vespalib::string &get_name() const override;
+ bool manages_buckets() const override;
+ bool manages_timestamp() const override;
};
}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
index ba16f226ebb..2d45caff152 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
@@ -3,15 +3,13 @@
#include "storage_api_chain_bm_feed_handler.h"
#include "pending_tracker.h"
#include "storage_reply_error_checker.h"
+#include "bm_storage_link_context.h"
+#include "bm_storage_link.h"
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/storage/common/storagelink.h>
-#include <vespa/storage/common/storage_chain_builder.h>
-#include <vespa/vespalib/stllike/hash_map.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <cassert>
@@ -19,7 +17,6 @@
using document::Document;
using document::DocumentId;
using document::DocumentUpdate;
-using storage::StorageLink;
namespace feedbm {
@@ -34,106 +31,10 @@ std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() {
}
-class BmStorageLink : public StorageLink,
- public StorageReplyErrorChecker
-{
- std::mutex _mutex;
- vespalib::hash_map<uint64_t, PendingTracker *> _pending;
-public:
- BmStorageLink();
- ~BmStorageLink() override;
- bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
- bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
- void retain(uint64_t msg_id, PendingTracker &tracker) {
- tracker.retain();
- std::lock_guard lock(_mutex);
- _pending.insert(std::make_pair(msg_id, &tracker));
- }
- bool release(uint64_t msg_id) {
- PendingTracker *tracker = nullptr;
- {
- std::lock_guard lock(_mutex);
- auto itr = _pending.find(msg_id);
- if (itr == _pending.end()) {
- return false;
- }
- tracker = itr->second;
- _pending.erase(itr);
- }
- tracker->release();
- return true;
- }
-};
-
-BmStorageLink::BmStorageLink()
- : storage::StorageLink("vespa-bm-feed"),
- StorageReplyErrorChecker(),
- _mutex(),
- _pending()
-{
-}
-
-BmStorageLink::~BmStorageLink()
-{
- std::lock_guard lock(_mutex);
- assert(_pending.empty());
-}
-
-bool
-BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
-{
- (void) msg;
- return false;
-}
-
-bool
-BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg)
-{
- check_error(*msg);
- return release(msg->getMsgId());
-}
-
-struct StorageApiChainBmFeedHandler::Context {
- BmStorageLink* bm_link;
- Context()
- : bm_link(nullptr)
- {
- }
- ~Context() = default;
-};
-
-class MyStorageChainBuilder : public storage::StorageChainBuilder
-{
- using Parent = storage::StorageChainBuilder;
- std::shared_ptr<StorageApiChainBmFeedHandler::Context> _context;
-public:
- MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context);
- ~MyStorageChainBuilder() override;
- void add(std::unique_ptr<StorageLink> link) override;
-};
-
-MyStorageChainBuilder::MyStorageChainBuilder(std::shared_ptr<StorageApiChainBmFeedHandler::Context> context)
- : storage::StorageChainBuilder(),
- _context(std::move(context))
-{
-}
-
-MyStorageChainBuilder::~MyStorageChainBuilder() = default;
-
-void
-MyStorageChainBuilder::add(std::unique_ptr<StorageLink> link)
-{
- vespalib::string name = link->getName();
- Parent::add(std::move(link));
- if (name == "Communication manager") {
- auto my_link = std::make_unique<BmStorageLink>();
- _context->bm_link = my_link.get();
- Parent::add(std::move(my_link));
- }
-}
-
-StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<Context> context)
+StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor)
: IBmFeedHandler(),
+ _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"),
+ _distributor(distributor),
_context(std::move(context))
{
auto cmd = make_set_cluster_state_cmd();
@@ -174,22 +75,28 @@ StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const Docum
send_msg(std::move(cmd), tracker);
}
-std::shared_ptr<StorageApiChainBmFeedHandler::Context>
-StorageApiChainBmFeedHandler::get_context()
+uint32_t
+StorageApiChainBmFeedHandler::get_error_count() const
+{
+ return _context->bm_link->get_error_count();
+}
+
+const vespalib::string&
+StorageApiChainBmFeedHandler::get_name() const
{
- return std::make_shared<Context>();
+ return _name;
}
-std::unique_ptr<storage::IStorageChainBuilder>
-StorageApiChainBmFeedHandler::get_storage_chain_builder(std::shared_ptr<Context> context)
+bool
+StorageApiChainBmFeedHandler::manages_buckets() const
{
- return std::make_unique<MyStorageChainBuilder>(std::move(context));
+ return _distributor;
}
-uint32_t
-StorageApiChainBmFeedHandler::get_error_count() const
+bool
+StorageApiChainBmFeedHandler::manages_timestamp() const
{
- return _context->bm_link->get_error_count();
+ return _distributor;
}
}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
index c3fdab6cd76..089a7fd89e5 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
@@ -4,32 +4,34 @@
#include "i_bm_feed_handler.h"
-namespace storage { class IStorageChainBuilder; }
namespace storage::api { class StorageCommand; }
namespace feedbm {
+struct BmStorageLinkContext;
+
/*
* Benchmark feed handler for feed to service layer using storage api protocol
* directly on the storage chain.
*/
class StorageApiChainBmFeedHandler : public IBmFeedHandler
{
-public:
- struct Context;
private:
- std::shared_ptr<Context> _context;
+ vespalib::string _name;
+ bool _distributor;
+ std::shared_ptr<BmStorageLinkContext> _context;
void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
public:
- StorageApiChainBmFeedHandler(std::shared_ptr<Context> context);
+ StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor);
~StorageApiChainBmFeedHandler();
void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- static std::shared_ptr<Context> get_context();
- static std::unique_ptr<storage::IStorageChainBuilder> get_storage_chain_builder(std::shared_ptr<Context> context);
uint32_t get_error_count() const override;
+ const vespalib::string &get_name() const override;
+ bool manages_buckets() const override;
+ bool manages_timestamp() const override;
};
}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
index 3a974bb7d9a..e15efdf7ce6 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
@@ -9,17 +9,15 @@
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/storageserver/message_dispatcher.h>
-#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
+//#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
-#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
+//#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
-#include <vespa/fnet/frt/target.h>
-#include <vespa/slobrok/sbmirror.h>
+//#include <vespa/fnet/frt/target.h>
+//#include <vespa/slobrok/sbmirror.h>
#include <cassert>
using document::Document;
@@ -28,37 +26,10 @@ using document::DocumentUpdate;
using document::DocumentTypeRepo;
using storage::api::StorageMessageAddress;
using storage::rpc::SharedRpcResources;
+using storage::lib::NodeType;
namespace feedbm {
-namespace {
-
-FRT_RPCRequest *
-make_set_cluster_state_request() {
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
- storage::rpc::SlimeClusterStateBundleCodec codec;
- auto encoded_bundle = codec.encode(bundle);
- auto *req = new FRT_RPCRequest();
- auto* params = req->GetParams();
- params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type));
- params->AddInt32(encoded_bundle._uncompressed_length);
- params->AddData(std::move(*encoded_bundle._buffer));
- req->SetMethodName("setdistributionstates");
- return req;
-}
-
-void
-set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) {
- auto req = make_set_cluster_state_request();
- auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory());
- auto target = target_resolver->resolve_rpc_target(storage_address);
- target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
- assert(!req->IsError());
- req->SubRef();
-}
-
-}
-
class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher,
public StorageReplyErrorChecker
{
@@ -105,15 +76,16 @@ StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher()
assert(_pending.empty());
}
-StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo)
+StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const DocumentTypeRepo> repo, bool distributor)
: IBmFeedHandler(),
- _storage_address(std::make_unique<StorageMessageAddress>("storage", storage::lib::NodeType::STORAGE, 0)),
+ _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"),
+ _distributor(distributor),
+ _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
_shared_rpc_resources(shared_rpc_resources_in),
_message_dispatcher(std::make_unique<MyMessageDispatcher>()),
_message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo, std::make_shared<documentapi::LoadTypeSet>())),
_rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, storage::rpc::StorageApiRpcService::Params()))
{
- set_cluster_up(_shared_rpc_resources, *_storage_address);
}
StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default;
@@ -154,4 +126,22 @@ StorageApiRpcBmFeedHandler::get_error_count() const
return _message_dispatcher->get_error_count();
}
+const vespalib::string&
+StorageApiRpcBmFeedHandler::get_name() const
+{
+ return _name;
+}
+
+bool
+StorageApiRpcBmFeedHandler::manages_buckets() const
+{
+ return _distributor;
+}
+
+bool
+StorageApiRpcBmFeedHandler::manages_timestamp() const
+{
+ return _distributor;
+}
+
}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
index bc2f62e038f..762a3c5bd63 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
@@ -25,6 +25,8 @@ namespace feedbm {
class StorageApiRpcBmFeedHandler : public IBmFeedHandler
{
class MyMessageDispatcher;
+ vespalib::string _name;
+ bool _distributor;
std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
storage::rpc::SharedRpcResources& _shared_rpc_resources;
std::unique_ptr<MyMessageDispatcher> _message_dispatcher;
@@ -33,12 +35,15 @@ class StorageApiRpcBmFeedHandler : public IBmFeedHandler
void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
public:
- StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo);
+ StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo, bool distributor);
~StorageApiRpcBmFeedHandler();
void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
uint32_t get_error_count() const override;
+ const vespalib::string &get_name() const override;
+ bool manages_buckets() const override;
+ bool manages_timestamp() const override;
};
}
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 5b9f7a58293..cca421657ea 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -4,6 +4,9 @@
#include "spi_bm_feed_handler.h"
#include "storage_api_rpc_bm_feed_handler.h"
#include "storage_api_chain_bm_feed_handler.h"
+#include "bm_storage_chain_builder.h"
+#include "bm_cluster_controller.h"
+#include "bm_storage_link_context.h"
#include <vespa/vespalib/testkit/testapp.h>
#include <tests/proton/common/dummydbowner.h>
#include <vespa/config-imported-fields.h>
@@ -65,6 +68,7 @@
#include <vespa/storageserver/app/distributorprocess.h>
#include <vespa/storage/config/config-stor-distributormanager.h>
#include <vespa/storage/config/config-stor-visitordispatcher.h>
+#include <vespa/slobrok/sbmirror.h>
#include <getopt.h>
#include <iostream>
@@ -126,6 +130,9 @@ using search::transactionlog::TransLogServer;
using storage::rpc::SharedRpcResources;
using storage::spi::PersistenceProvider;
using vespalib::makeLambdaTask;
+using feedbm::BmClusterController;
+using feedbm::BmStorageChainBuilder;
+using feedbm::BmStorageLinkContext;
using feedbm::IBmFeedHandler;
using feedbm::SpiBmFeedHandler;
using feedbm::StorageApiRpcBmFeedHandler;
@@ -262,7 +269,7 @@ public:
uint32_t get_remove_passes() const { return _remove_passes; }
uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
bool get_enable_distributor() const { return _enable_distributor; }
- bool get_enable_service_layer() const { return _enable_service_layer; }
+ bool get_enable_service_layer() const { return _enable_service_layer || _enable_distributor; }
bool get_use_storage_chain() const { return _use_storage_chain; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_threads(uint32_t threads_in) { _threads = threads_in; }
@@ -299,10 +306,6 @@ BMParams::check() const
std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl;
return false;
}
- if (_enable_distributor && !_enable_service_layer) {
- std::cerr << "Service layer must be enabled if distributor layer is enabled" << std::endl;
- return false;
- }
return true;
}
@@ -558,9 +561,10 @@ struct PersistenceProviderFixture {
std::shared_ptr<IConfigContext> _config_context;
std::unique_ptr<IBmFeedHandler> _feed_handler;
std::unique_ptr<mbus::Slobrok> _slobrok;
- std::shared_ptr<StorageApiChainBmFeedHandler::Context> _service_layer_chain_context;
+ std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context;
std::unique_ptr<MyServiceLayerProcess> _service_layer;
std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
+ std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
std::unique_ptr<storage::DistributorProcess> _distributor;
PersistenceProviderFixture(const BMParams& params);
@@ -573,6 +577,7 @@ struct PersistenceProviderFixture {
std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const;
std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const;
void create_buckets();
+ void wait_slobrok(const vespalib::string &name);
void start_service_layer(const BMParams& params);
void start_distributor(const BMParams& params);
void create_feed_handler(const BMParams& params);
@@ -623,6 +628,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_service_layer_chain_context(),
_service_layer(),
_rpc_client_shared_rpc_resources(),
+ _distributor_chain_context(),
_distributor()
{
create_document_db();
@@ -725,16 +731,31 @@ PersistenceProviderFixture::create_buckets()
}
void
+PersistenceProviderFixture::wait_slobrok(const vespalib::string &name)
+{
+ auto &mirror = _rpc_client_shared_rpc_resources->slobrok_mirror();
+ LOG(info, "Waiting for %s in slobrok", name.c_str());
+ for (;;) {
+ auto specs = mirror.lookup(name);
+ if (!specs.empty()) {
+ LOG(info, "Found %s in slobrok", name.c_str());
+ return;
+ }
+ std::this_thread::sleep_for(100ms);
+ }
+}
+
+void
PersistenceProviderFixture::start_service_layer(const BMParams& params)
{
LOG(info, "start slobrok");
_slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
LOG(info, "start service layer");
config::ConfigUri config_uri("bm-servicelayer", _config_context);
- std::unique_ptr<storage::IStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain()) {
- _service_layer_chain_context = StorageApiChainBmFeedHandler::get_context();
- chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(_service_layer_chain_context);
+ std::unique_ptr<BmStorageChainBuilder> chain_builder;
+ if (params.get_use_storage_chain() && !params.get_enable_distributor()) {
+ chain_builder = std::make_unique<BmStorageChainBuilder>();
+ _service_layer_chain_context = chain_builder->get_context();
}
_service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
*_persistence_engine,
@@ -746,27 +767,53 @@ PersistenceProviderFixture::start_service_layer(const BMParams& params)
config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
_rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100);
_rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
+ wait_slobrok("storage/cluster.storage/storage/0/default");
+ wait_slobrok("storage/cluster.storage/storage/0");
+ BmClusterController fake_controller(*_rpc_client_shared_rpc_resources);
+ fake_controller.set_cluster_up(false);
}
void
PersistenceProviderFixture::start_distributor(const BMParams& params)
{
- if (params.get_enable_distributor()) {
- config::ConfigUri config_uri("bm-distributor", _config_context);
- _distributor = std::make_unique<storage::DistributorProcess>(config_uri);
- _distributor->setupConfig(100ms);
- _distributor->createNode();
+ config::ConfigUri config_uri("bm-distributor", _config_context);
+ std::unique_ptr<BmStorageChainBuilder> chain_builder;
+ if (params.get_use_storage_chain()) {
+ chain_builder = std::make_unique<BmStorageChainBuilder>();
+ _distributor_chain_context = chain_builder->get_context();
+ }
+ _distributor = std::make_unique<storage::DistributorProcess>(config_uri);
+ if (chain_builder) {
+ _distributor->set_storage_chain_builder(std::move(chain_builder));
}
+ _distributor->setupConfig(100ms);
+ _distributor->createNode();
+ wait_slobrok("storage/cluster.storage/distributor/0/default");
+ wait_slobrok("storage/cluster.storage/distributor/0");
+ BmClusterController fake_controller(*_rpc_client_shared_rpc_resources);
+ fake_controller.set_cluster_up(true);
+ // Wait for bucket ownership transfer safe time
+ std::this_thread::sleep_for(2s);
}
void
PersistenceProviderFixture::create_feed_handler(const BMParams& params)
{
+ if (params.get_enable_distributor()) {
+ if (params.get_use_storage_chain()) {
+ assert(_distributor_chain_context);
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, true);
+ }
+ return;
+ }
if (params.get_enable_service_layer()) {
if (params.get_use_storage_chain()) {
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context);
+ assert(_service_layer_chain_context);
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
} else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo);
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, false);
}
}
}
@@ -849,11 +896,12 @@ put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbo
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
+ bool use_timestamp = !f._feed_handler->manages_timestamp();
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
document::Bucket bucket(bucket_space, bucket_id);
auto document = std::make_unique<Document>(repo, is);
- f._feed_handler->put(bucket, std::move(document), time_bias + i, pending_tracker);
+ f._feed_handler->put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -901,11 +949,12 @@ update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
+ bool use_timestamp = !f._feed_handler->manages_timestamp();
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
document::Bucket bucket(bucket_space, bucket_id);
auto document_update = DocumentUpdate::createHEAD(repo, is);
- f._feed_handler->update(bucket, std::move(document_update), time_bias + i, pending_tracker);
+ f._feed_handler->update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -953,11 +1002,12 @@ remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
+ bool use_timestamp = !f._feed_handler->manages_timestamp();
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
is >> bucket_id;
document::Bucket bucket(bucket_space, bucket_id);
DocumentId document_id(is);
- f._feed_handler->remove(bucket, document_id, time_bias + i, pending_tracker);
+ f._feed_handler->remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
@@ -990,7 +1040,9 @@ void benchmark_async_spi(const BMParams &bm_params)
LOG(info, "start initialize");
provider.initialize();
LOG(info, "create %u buckets", f.num_buckets());
- f.create_buckets();
+ if (!f._feed_handler->manages_buckets()) {
+ f.create_buckets();
+ }
if (bm_params.get_enable_service_layer()) {
f.start_service_layer(bm_params);
}
@@ -1003,6 +1055,7 @@ void benchmark_async_spi(const BMParams &bm_params)
auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update");
auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove");
int64_t time_bias = 1;
+ LOG(info, "Feed handler is %s", f._feed_handler->get_name().c_str());
for (uint32_t pass = 0; pass < bm_params.get_put_passes(); ++pass) {
run_put_async_tasks(f, executor, pass, time_bias, put_feed, bm_params);
}