aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-12-05 00:02:56 +0100
committerGitHub <noreply@github.com>2020-12-05 00:02:56 +0100
commit4b5cb9770f115b73f03fe59d6d1a4f30097cf739 (patch)
treece529f1e299b7107ce1c3405891b2334d5c052bc
parente94865ec953f076d7dd75370f5d4260fc522a81c (diff)
parentfe309bdc1b24e64ca9de24af663e8dc04fd52f61 (diff)
Merge pull request #15664 from vespa-engine/geirst/decouple-external-operation-handler
Decouple external operation handler
-rw-r--r--storage/src/tests/common/teststorageapp.cpp1
-rw-r--r--storage/src/tests/common/teststorageapp.h11
-rw-r--r--storage/src/tests/distributor/distributortest.cpp16
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp17
-rw-r--r--storage/src/tests/distributor/distributortestutil.h8
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp12
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp4
-rw-r--r--storage/src/tests/distributor/operationtargetresolvertest.cpp4
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp26
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp2
-rw-r--r--storage/src/tests/distributor/removelocationtest.cpp6
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp6
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp24
-rw-r--r--storage/src/tests/distributor/statoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp6
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp6
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp4
-rw-r--r--storage/src/vespa/storage/common/CMakeLists.txt3
-rw-r--r--storage/src/vespa/storage/common/messagesender.h9
-rw-r--r--storage/src/vespa/storage/common/node_identity.cpp16
-rw-r--r--storage/src/vespa/storage/common/node_identity.h28
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp32
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h20
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h12
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h24
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp117
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h22
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp13
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h14
30 files changed, 292 insertions, 175 deletions
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index 81ef5a2e800..04dc1a03dd3 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -41,6 +41,7 @@ TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg,
_docMan(),
_nodeStateUpdater(type),
_configId(configId),
+ _node_identity("test_cluster", type, index),
_initialized(false)
{
// Use config to adjust values
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index 376b1afd9c6..ce76a9f98c9 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -18,17 +18,18 @@
#pragma once
#include "testnodestateupdater.h"
+#include <vespa/document/base/testdocman.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storage/common/doneinitializehandler.h>
+#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/nodestateupdater.h>
-#include <vespa/storage/storageserver/framework.h>
#include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h>
#include <vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h>
+#include <vespa/storage/storageserver/framework.h>
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/storageframework/defaultimplementation/component/testcomponentregister.h>
-#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/document/base/testdocman.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <atomic>
@@ -50,6 +51,7 @@ protected:
document::TestDocMan _docMan;
TestNodeStateUpdater _nodeStateUpdater;
vespalib::string _configId;
+ NodeIdentity _node_identity;
std::atomic<bool> _initialized;
public:
@@ -85,6 +87,7 @@ public:
{ return _compReg.getDistribution(); }
TestNodeStateUpdater& getNodeStateUpdater() { return _nodeStateUpdater; }
uint16_t getIndex() const { return _compReg.getIndex(); }
+ const NodeIdentity& node_identity() const noexcept { return _node_identity; }
// The storage app also implements the done initializer interface, so it can
// be sent to components needing this.
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 93f503c27e4..25221b9cc7d 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -120,14 +120,14 @@ struct DistributorTest : Test, DistributorTestUtil {
}
}
- getExternalOperationHandler().removeNodesFromDB(makeDocumentBucket(document::BucketId(16, 1)), removedNodes);
+ distributor_component().removeNodesFromDB(makeDocumentBucket(document::BucketId(16, 1)), removedNodes);
uint32_t flags(DatabaseUpdate::CREATE_IF_NONEXISTING
| (resetTrusted ? DatabaseUpdate::RESET_TRUSTED : 0));
- getExternalOperationHandler().updateBucketDatabase(makeDocumentBucket(document::BucketId(16, 1)),
- changedNodes,
- flags);
+ distributor_component().updateBucketDatabase(makeDocumentBucket(document::BucketId(16, 1)),
+ changedNodes,
+ flags);
}
std::string retVal = dumpBucket(document::BucketId(16, 1));
@@ -562,8 +562,8 @@ TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state
std::vector<BucketCopy> copies;
copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2));
- getExternalOperationHandler().updateBucketDatabase(makeDocumentBucket(nonOwnedBucket), copies,
- DatabaseUpdate::CREATE_IF_NONEXISTING);
+ distributor_component().updateBucketDatabase(makeDocumentBucket(nonOwnedBucket), copies,
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket));
}
@@ -575,8 +575,8 @@ TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_cur
std::vector<BucketCopy> copies;
copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2));
- getExternalOperationHandler().updateBucketDatabase(makeDocumentBucket(bucket), copies,
- DatabaseUpdate::CREATE_IF_NONEXISTING);
+ distributor_component().updateBucketDatabase(makeDocumentBucket(bucket), copies,
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
BucketDatabase::Entry e(getBucket(bucket));
EXPECT_EQ(101234, e->getLastGarbageCollectionTime());
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 17932ccffd7..2d2038b7cd0 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -1,11 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "distributortestutil.h"
+#include <vespa/config-stor-distribution.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
-#include <vespa/config-stor-distribution.h>
+#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/vespalib/text/stringtokenizer.h>
-#include <vespa/document/test/make_document_bucket.h>
-#include <vespa/document/test/make_bucket_space.h>
using document::test::makeBucketSpace;
using document::test::makeDocumentBucket;
@@ -26,6 +27,7 @@ DistributorTestUtil::createLinks()
_threadPool = framework::TickingThreadPool::createDefault("distributor");
_distributor.reset(new Distributor(
_node->getComponentRegister(),
+ _node->node_identity(),
*_threadPool,
*this,
true,
@@ -244,7 +246,7 @@ DistributorTestUtil::removeFromBucketDB(const document::BucketId& id)
void
DistributorTestUtil::addIdealNodes(const document::BucketId& id)
{
- addIdealNodes(*getExternalOperationHandler().getClusterStateBundle().getBaselineClusterState(), id);
+ addIdealNodes(*distributor_component().getClusterStateBundle().getBaselineClusterState(), id);
}
void
@@ -276,7 +278,7 @@ DistributorTestUtil::insertBucketInfo(document::BucketId id,
if (active) {
info2.setActive();
}
- BucketCopy copy(getExternalOperationHandler().getUniqueTimestamp(), node, info2);
+ BucketCopy copy(distributor_component().getUniqueTimestamp(), node, info2);
entry->addNode(copy.setTrusted(trusted), toVector<uint16_t>(0));
@@ -336,6 +338,11 @@ DistributorTestUtil::getExternalOperationHandler() {
return _distributor->_externalOperationHandler;
}
+storage::distributor::DistributorComponent&
+DistributorTestUtil::distributor_component() {
+ return _distributor->_component;
+}
+
bool
DistributorTestUtil::tick() {
framework::ThreadWaitInfo res(
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index a6bd9d5d84c..1bdb6e33512 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -2,14 +2,14 @@
#pragma once
#include "distributor_message_sender_stub.h"
-#include <tests/common/teststorageapp.h>
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h>
#include <vespa/storage/storageutil/utils.h>
-#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <vespa/storageapi/message/state.h>
+#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
namespace storage {
@@ -21,6 +21,7 @@ class BucketDBUpdater;
class Distributor;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
+class DistributorComponent;
class IdealStateManager;
class ExternalOperationHandler;
class Operation;
@@ -111,6 +112,7 @@ public:
BucketDBUpdater& getBucketDBUpdater();
IdealStateManager& getIdealStateManager();
ExternalOperationHandler& getExternalOperationHandler();
+ storage::distributor::DistributorComponent& distributor_component();
Distributor& getDistributor() {
return *_distributor;
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 40516622b65..b230781a90c 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -95,19 +95,19 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "16");
EXPECT_EQ(document::BucketId(16, 0xffff),
- getExternalOperationHandler().getBucketId(document::DocumentId(
+ distributor_component().getBucketId(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0xffff))
).stripUnused());
EXPECT_EQ(document::BucketId(16, 0),
- getExternalOperationHandler().getBucketId(document::DocumentId(
+ distributor_component().getBucketId(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x10000))
).stripUnused());
EXPECT_EQ(document::BucketId(16, 0xffff),
- getExternalOperationHandler().getBucketId(document::DocumentId(
+ distributor_component().getBucketId(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0xffff))
).stripUnused());
EXPECT_EQ(document::BucketId(16, 0x100),
- getExternalOperationHandler().getBucketId(document::DocumentId(
+ distributor_component().getBucketId(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x100))
).stripUnused());
close();
@@ -116,11 +116,11 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "20");
createLinks();
EXPECT_EQ(document::BucketId(20, 0x11111),
- getExternalOperationHandler().getBucketId(document::DocumentId(
+ distributor_component().getBucketId(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x111111))
).stripUnused());
EXPECT_EQ(document::BucketId(20, 0x22222),
- getExternalOperationHandler().getBucketId(document::DocumentId(
+ distributor_component().getBucketId(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x222222))
).stripUnused());
}
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 410705bb1a0..fe87de5f18a 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -45,7 +45,7 @@ struct GetOperationTest : Test, DistributorTestUtil {
createLinks();
docId = document::DocumentId("id:ns:text/html::uri");
- bucketId = getExternalOperationHandler().getBucketId(docId);
+ bucketId = distributor_component().getBucketId(docId);
};
void TearDown() override {
@@ -56,7 +56,7 @@ struct GetOperationTest : Test, DistributorTestUtil {
void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) {
auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::AllFields::NAME);
op = std::make_unique<GetOperation>(
- getExternalOperationHandler(), getDistributorBucketSpace(),
+ distributor_component(), getDistributorBucketSpace(),
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
msg, getDistributor().getMetrics().gets,
consistency);
diff --git a/storage/src/tests/distributor/operationtargetresolvertest.cpp b/storage/src/tests/distributor/operationtargetresolvertest.cpp
index da0206cf0a4..3098d8382c8 100644
--- a/storage/src/tests/distributor/operationtargetresolvertest.cpp
+++ b/storage/src/tests/distributor/operationtargetresolvertest.cpp
@@ -115,7 +115,7 @@ OperationTargetResolverTest::getInstances(const BucketId& id,
bool stripToRedundancy)
{
lib::IdealNodeCalculatorImpl idealNodeCalc;
- auto &bucketSpaceRepo(getExternalOperationHandler().getBucketSpaceRepo());
+ auto &bucketSpaceRepo(distributor_component().getBucketSpaceRepo());
auto &distributorBucketSpace(bucketSpaceRepo.get(makeBucketSpace()));
idealNodeCalc.setDistribution(distributorBucketSpace.getDistribution());
idealNodeCalc.setClusterState(distributorBucketSpace.getClusterState());
@@ -144,7 +144,7 @@ TEST_F(OperationTargetResolverTest, simple) {
TEST_F(OperationTargetResolverTest, multiple_nodes) {
setupDistributor(1, 2, "storage:2 distributor:1");
- auto &bucketSpaceRepo(getExternalOperationHandler().getBucketSpaceRepo());
+ auto &bucketSpaceRepo(distributor_component().getBucketSpaceRepo());
auto &distributorBucketSpace(bucketSpaceRepo.get(makeBucketSpace()));
for (int i = 0; i < 100; ++i) {
addNodesToBucketDB(BucketId(16, i), "0=0,1=0");
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index e86eded7392..2a220cb9ef8 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -73,8 +73,8 @@ public:
}
void sendPut(std::shared_ptr<api::PutCommand> msg) {
- op = std::make_unique<PutOperation>(getExternalOperationHandler(),
- getExternalOperationHandler(),
+ op = std::make_unique<PutOperation>(distributor_component(),
+ distributor_component(),
getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
@@ -103,7 +103,7 @@ document::BucketId
PutOperationTest::createAndSendSampleDocument(vespalib::duration timeout) {
auto doc = std::make_shared<Document>(doc_type(), DocumentId("id:test:testdoctype1::"));
- document::BucketId id = getExternalOperationHandler().getBucketId(doc->getId());
+ document::BucketId id = distributor_component().getBucketId(doc->getId());
addIdealNodes(id);
auto msg = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), doc, 0);
@@ -149,7 +149,7 @@ TEST_F(PutOperationTest, bucket_database_gets_special_entry_when_CreateBucket_se
// Database updated before CreateBucket is sent
ASSERT_EQ("BucketId(0x4000000000008f09) : "
"node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false)",
- dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
+ dumpBucket(distributor_component().getBucketId(doc->getId())));
ASSERT_EQ("Create bucket => 0,Put => 0", _sender.getCommands(true));
}
@@ -196,7 +196,7 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck
"id:test:testdoctype1::, timestamp 100, size 45) => 1",
_sender.getCommands(true, true));
- getExternalOperationHandler().removeNodeFromDB(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0);
+ distributor_component().removeNodeFromDB(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0);
// If we get an ACK from the backend nodes, the operation has been persisted OK.
// Even if the bucket has been removed from the DB in the meantime (usually would
@@ -248,7 +248,7 @@ TEST_F(PutOperationTest, multiple_copies) {
"node(idx=3,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
"node(idx=2,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
"node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
- dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
+ dumpBucket(distributor_component().getBucketId(doc->getId())));
}
TEST_F(PutOperationTest, multiple_copies_early_return_primary_required) {
@@ -401,7 +401,7 @@ TEST_F(PutOperationTest, do_not_send_CreateBucket_if_already_pending) {
// Manually shove sent messages into pending message tracker, since
// this isn't done automatically.
for (size_t i = 0; i < _sender.commands().size(); ++i) {
- getExternalOperationHandler().getDistributor().getPendingMessageTracker()
+ distributor_component().getDistributor().getPendingMessageTracker()
.insert(_sender.command(i));
}
@@ -476,7 +476,7 @@ parseBucketInfoString(const std::string& nodeList) {
std::string
PutOperationTest::getNodes(const std::string& infoString) {
Document::SP doc(createDummyDocument("test", "uri"));
- document::BucketId bid(getExternalOperationHandler().getBucketId(doc->getId()));
+ document::BucketId bid(distributor_component().getBucketId(doc->getId()));
BucketInfo entry = parseBucketInfoString(infoString);
@@ -518,7 +518,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
setupDistributor(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
Document::SP doc(createDummyDocument("test", "uri"));
- document::BucketId bId = getExternalOperationHandler().getBucketId(doc->getId());
+ document::BucketId bId = distributor_component().getBucketId(doc->getId());
addNodesToBucketDB(bId, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
@@ -535,14 +535,14 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
ASSERT_EQ("BucketId(0x4000000000000593) : "
"node(idx=0,crc=0x7,docs=8/8,bytes=9/9,trusted=true,active=false,ready=false)",
- dumpBucket(getExternalOperationHandler().getBucketId(doc->getId())));
+ dumpBucket(distributor_component().getBucketId(doc->getId())));
}
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "uri");
- auto bucket = getExternalOperationHandler().getBucketId(doc->getId());
+ auto bucket = distributor_component().getBucketId(doc->getId());
addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
sendPut(createPut(doc));
@@ -576,7 +576,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "test");
- auto bucket = getExternalOperationHandler().getBucketId(doc->getId());
+ auto bucket = distributor_component().getBucketId(doc->getId());
addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
getBucketDBUpdater().onSetSystemState(
std::make_shared<api::SetSystemStateCommand>(
@@ -596,7 +596,7 @@ TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
"distributor:1 storage:2 .0.s:r .1.s:r");
Document::SP doc(createDummyDocument("test", "uri"));
document::BucketId bucket(
- getExternalOperationHandler().getBucketId(doc->getId()));
+ distributor_component().getBucketId(doc->getId()));
addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t");
sendPut(createPut(doc));
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index daa2ca94bb3..e8401914abf 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -62,7 +62,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
cmd->addBucketToBeVisited(BucketId()); // Will be inferred to first sub-bucket in DB
}
return std::make_shared<VisitorOperation>(
- getExternalOperationHandler(), getExternalOperationHandler(),
+ distributor_component(), distributor_component(),
getDistributorBucketSpace(), cmd, _default_config,
getDistributor().getMetrics().visits);
}
diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp
index 47b6e436fb1..779d6f60be0 100644
--- a/storage/src/tests/distributor/removelocationtest.cpp
+++ b/storage/src/tests/distributor/removelocationtest.cpp
@@ -27,9 +27,9 @@ struct RemoveLocationOperationTest : Test, DistributorTestUtil {
auto msg = std::make_shared<api::RemoveLocationCommand>(selection, makeDocumentBucket(document::BucketId(0)));
op = std::make_unique<RemoveLocationOperation>(
- getExternalOperationHandler(),
- getExternalOperationHandler(),
- getExternalOperationHandler(),
+ distributor_component(),
+ distributor_component(),
+ distributor_component(),
getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index 49d60292326..cc5452e2bfe 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -23,7 +23,7 @@ struct RemoveOperationTest : Test, DistributorTestUtil {
createLinks();
docId = document::DocumentId("id:test:test::uri");
- bucketId = getExternalOperationHandler().getBucketId(docId);
+ bucketId = distributor_component().getBucketId(docId);
enableDistributorClusterState("distributor:1 storage:4");
};
@@ -35,8 +35,8 @@ struct RemoveOperationTest : Test, DistributorTestUtil {
auto msg = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(0)), dId, 100);
op = std::make_unique<RemoveOperation>(
- getExternalOperationHandler(),
- getExternalOperationHandler(),
+ distributor_component(),
+ distributor_component(),
getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index f66aab26dc9..00c1c7bb403 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -238,7 +238,7 @@ struct StateCheckersTest : Test, DistributorTestUtil {
tick(); // Trigger command processing and pending state setup.
}
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(),
+ StateChecker::Context c(distributor_component(),
getBucketSpaceRepo().get(params._bucket_space),
statsTracker,
bucket);
@@ -290,7 +290,7 @@ std::string StateCheckersTest::testSplit(uint32_t splitCount,
SplitBucketStateChecker checker;
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
getConfig().setSplitSize(splitSize);
getConfig().setSplitCount(splitCount);
getConfig().setMinimalBucketSplit(minSplitBits);
@@ -375,7 +375,7 @@ StateCheckersTest::testInconsistentSplit(const document::BucketId& bid,
{
SplitInconsistentStateChecker checker;
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
return testStateChecker(checker, c, true,
PendingMessage(), includePriority);
}
@@ -433,7 +433,7 @@ StateCheckersTest::testJoin(uint32_t joinCount,
getConfig().setMinimalBucketSplit(minSplitBits);
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
return testStateChecker(checker, c, true, blocker, includePriority);
}
@@ -586,7 +586,7 @@ StateCheckersTest::testSynchronizeAndMove(const std::string& bucketInfo,
enableDistributorClusterState(clusterState);
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
return testStateChecker(checker, c, false, blocker, includePriority);
}
@@ -820,7 +820,7 @@ StateCheckersTest::testDeleteExtraCopies(
}
DeleteExtraCopiesStateChecker checker;
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
return testStateChecker(checker, c, false, blocker, includePriority);
}
@@ -937,7 +937,7 @@ std::string StateCheckersTest::testBucketState(
BucketStateStateChecker checker;
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
return testStateChecker(checker, c, false, PendingMessage(),
includePriority);
}
@@ -1104,7 +1104,7 @@ std::string StateCheckersTest::testBucketStatePerGroup(
BucketStateStateChecker checker;
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
return testStateChecker(checker, c, false, PendingMessage(),
includePriority);
}
@@ -1231,7 +1231,7 @@ std::string StateCheckersTest::testGarbageCollection(
getConfig().setGarbageCollection("music", std::chrono::seconds(checkInterval));
getConfig().setLastGarbageCollectionChangeTime(vespalib::steady_time(std::chrono::seconds(lastChangeTime)));
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker,
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker,
makeDocumentBucket(e.getBucketId()));
getClock().setAbsoluteTimeInSeconds(nowTimestamp);
return testStateChecker(checker, c, false, PendingMessage(),
@@ -1304,7 +1304,7 @@ TEST_F(StateCheckersTest, gc_inhibited_when_ideal_node_in_maintenance) {
getConfig().setGarbageCollection("music", 3600s);
getConfig().setLastGarbageCollectionChangeTime(vespalib::steady_time(vespalib::duration::zero()));
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker,
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker,
makeDocumentBucket(bucket));
getClock().setAbsoluteTimeInSeconds(4000);
// Would normally (in a non-maintenance case) trigger GC due to having
@@ -1448,7 +1448,7 @@ TEST_F(StateCheckersTest, context_populates_ideal_state_containers) {
setupDistributor(2, 100, "distributor:1 storage:4");
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket({17, 0}));
+ StateChecker::Context c(distributor_component(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket({17, 0}));
ASSERT_THAT(c.idealState, ElementsAre(1, 3));
// TODO replace with UnorderedElementsAre once we can build gmock without issues
@@ -1491,7 +1491,7 @@ public:
// NOTE: resets the bucket database!
void runFor(const document::BucketId& bid) {
Checker checker;
- StateChecker::Context c(_fixture.getExternalOperationHandler(), _fixture.getDistributorBucketSpace(), _statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(_fixture.distributor_component(), _fixture.getDistributorBucketSpace(), _statsTracker, makeDocumentBucket(bid));
_result = _fixture.testStateChecker(
checker, c, false, StateCheckersTest::PendingMessage(), false);
}
diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp
index 981247c0753..a80eb9533bb 100644
--- a/storage/src/tests/distributor/statoperationtest.cpp
+++ b/storage/src/tests/distributor/statoperationtest.cpp
@@ -73,7 +73,7 @@ TEST_F(StatOperationTest, bucket_list) {
StatBucketListOperation op(
getDistributorBucketSpace().getBucketDatabase(),
getIdealStateManager(),
- getExternalOperationHandler().getIndex(),
+ distributor_component().getIndex(),
msg);
op.start(_sender, framework::MilliSecTime(0));
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 6e3837afa43..1518c8594aa 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -302,7 +302,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
}
update->setCreateIfNonExistent(options._createIfNonExistent);
- document::BucketId id = getExternalOperationHandler().getBucketId(update->getId());
+ document::BucketId id = distributor_component().getBucketId(update->getId());
document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId());
if (bucketState.length()) {
@@ -325,9 +325,9 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
msg->setCondition(options._condition);
msg->setTransportContext(std::make_unique<DummyTransportContext>());
- ExternalOperationHandler& handler = getExternalOperationHandler();
+ auto& comp = distributor_component();
return std::make_shared<TwoPhaseUpdateOperation>(
- handler, handler, handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics());
+ comp, comp, comp, getDistributorBucketSpace(), msg, getDistributor().getMetrics());
}
TEST_F(TwoPhaseUpdateOperationTest, simple) {
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index 8c5fb6db378..00e1e832b3c 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -59,15 +59,15 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m
document::DocumentId("id:ns:" + _html_type->getName() + "::1"));
update->setCreateIfNonExistent(create_if_missing);
- _bId = getExternalOperationHandler().getBucketId(update->getId());
+ _bId = distributor_component().getBucketId(update->getId());
addNodesToBucketDB(_bId, bucketState);
auto msg = std::make_shared<api::UpdateCommand>(makeDocumentBucket(document::BucketId(0)), update, 100);
- ExternalOperationHandler& handler = getExternalOperationHandler();
+ auto& comp = distributor_component();
return std::make_shared<UpdateOperation>(
- handler, handler, getDistributorBucketSpace(), msg,
+ comp, comp, getDistributorBucketSpace(), msg,
getDistributor().getMetrics().updates);
}
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index 0af4aff02d5..f37ded7aacb 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -102,8 +102,8 @@ struct VisitorOperationTest : Test, DistributorTestUtil {
const VisitorOperation::Config& config)
{
return std::make_unique<VisitorOperation>(
- getExternalOperationHandler(),
- getExternalOperationHandler(),
+ distributor_component(),
+ distributor_component(),
getDistributorBucketSpace(),
msg,
config,
diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt
index 29130515d2a..efbd62a45a0 100644
--- a/storage/src/vespa/storage/common/CMakeLists.txt
+++ b/storage/src/vespa/storage/common/CMakeLists.txt
@@ -9,13 +9,14 @@ vespa_add_library(storage_common OBJECT
global_bucket_space_distribution_converter.cpp
messagebucket.cpp
messagesender.cpp
+ node_identity.cpp
reindexing_constants.cpp
servicelayercomponent.cpp
statusmessages.cpp
statusmetricconsumer.cpp
+ storage_chain_builder.cpp
storagecomponent.cpp
storagelink.cpp
storagelinkqueued.cpp
- storage_chain_builder.cpp
DEPENDS
)
diff --git a/storage/src/vespa/storage/common/messagesender.h b/storage/src/vespa/storage/common/messagesender.h
index 48020bf053e..839dbcb91dc 100644
--- a/storage/src/vespa/storage/common/messagesender.h
+++ b/storage/src/vespa/storage/common/messagesender.h
@@ -43,4 +43,13 @@ struct ChainedMessageSender {
virtual void sendDown(const std::shared_ptr<api::StorageMessage>&) = 0;
};
+/**
+ * Interface to send messages "up" that bypasses message tracking.
+ */
+class NonTrackingMessageSender {
+public:
+ virtual ~NonTrackingMessageSender() = default;
+ virtual void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) = 0;
+};
+
} // storage
diff --git a/storage/src/vespa/storage/common/node_identity.cpp b/storage/src/vespa/storage/common/node_identity.cpp
new file mode 100644
index 00000000000..2ad940b7b20
--- /dev/null
+++ b/storage/src/vespa/storage/common/node_identity.cpp
@@ -0,0 +1,16 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "node_identity.h"
+
+namespace storage {
+
+NodeIdentity::NodeIdentity(vespalib::stringref cluster_name_in,
+ const lib::NodeType& node_type_in,
+ uint16_t node_index_in)
+ : _cluster_name(cluster_name_in),
+ _node_type(node_type_in),
+ _node_index(node_index_in)
+{
+}
+
+}
diff --git a/storage/src/vespa/storage/common/node_identity.h b/storage/src/vespa/storage/common/node_identity.h
new file mode 100644
index 00000000000..ea2edb98cdd
--- /dev/null
+++ b/storage/src/vespa/storage/common/node_identity.h
@@ -0,0 +1,28 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vdslib/state/nodetype.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace storage {
+
+/**
+ * Class that represents the identity of a storage or distributor node.
+ */
+class NodeIdentity {
+private:
+ vespalib::string _cluster_name;
+ const lib::NodeType& _node_type;
+ uint16_t _node_index;
+
+public:
+ NodeIdentity(vespalib::stringref cluster_name_in,
+ const lib::NodeType& node_type_in,
+ uint16_t node_index_in);
+ const vespalib::string& cluster_name() const { return _cluster_name; }
+ const lib::NodeType& node_type() const { return _node_type; }
+ uint16_t node_index() const { return _node_index; }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 0ad8118d925..235384c9473 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -1,18 +1,19 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
//
-#include "distributor.h"
#include "blockingoperationstarter.h"
-#include "throttlingoperationstarter.h"
-#include "idealstatemetricsset.h"
-#include "ownership_transfer_safe_time_point_calculator.h"
+#include "distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
-#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
-#include <vespa/storage/common/nodestateupdater.h>
-#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include "idealstatemetricsset.h"
+#include "ownership_transfer_safe_time_point_calculator.h"
+#include "throttlingoperationstarter.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
+#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <vespa/storage/common/node_identity.h>
+#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vespalib/util/memoryusage.h>
#include <vespa/log/log.h>
@@ -62,6 +63,7 @@ public:
};
Distributor::Distributor(DistributorComponentRegister& compReg,
+ const NodeIdentity& node_identity,
framework::TickingThreadPool& threadPool,
DoneInitializeHandler& doneInitHandler,
bool manageActiveBucketCopies,
@@ -71,10 +73,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
DistributorInterface(),
framework::StatusReporter("distributor", "Distributor"),
_clusterStateBundle(lib::ClusterState()),
- _compReg(compReg),
- _component(compReg, "distributor"),
- _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(_component.getIndex())),
- _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(_component.getIndex())),
+ _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
+ _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
+ _component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"),
_metrics(std::make_shared<DistributorMetricSet>()),
_operationOwner(*this, _component.getClock()),
_maintenanceOperationOwner(*this, _component.getClock()),
@@ -83,13 +84,14 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
_idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
- _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo,
- _idealStateManager, _operationOwner, compReg),
+ _messageSender(messageSender),
+ _externalOperationHandler(_component, _component,
+ getMetrics(), getMessageSender(), *this, _component,
+ _idealStateManager, _operationOwner),
_threadPool(threadPool),
_initializingIsUp(true),
_doneInitializeHandler(doneInitHandler),
_doneInitializing(false),
- _messageSender(messageSender),
_bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()),
_scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)),
_throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 6bda3e12ac7..e95df863210 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -12,10 +12,10 @@
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
#include <vespa/config/config.h>
-#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h>
+#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/storage/distributor/maintenance/maintenancescheduler.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
@@ -26,16 +26,17 @@
namespace storage {
struct DoneInitializeHandler;
class HostInfo;
+ class NodeIdentity;
}
namespace storage::distributor {
-class DistributorBucketSpaceRepo;
-class SimpleMaintenanceScanner;
class BlockingOperationStarter;
-class ThrottlingOperationStarter;
class BucketPriorityDatabase;
+class DistributorBucketSpaceRepo;
class OwnershipTransferSafeTimePointCalculator;
+class SimpleMaintenanceScanner;
+class ThrottlingOperationStarter;
class Distributor : public StorageLink,
public DistributorInterface,
@@ -43,10 +44,12 @@ class Distributor : public StorageLink,
public framework::StatusReporter,
public framework::TickingThread,
public MinReplicaProvider,
- public BucketSpacesStatsProvider
+ public BucketSpacesStatsProvider,
+ public NonTrackingMessageSender
{
public:
Distributor(DistributorComponentRegister&,
+ const NodeIdentity& node_identity,
framework::TickingThreadPool&,
DoneInitializeHandler&,
bool manageActiveBucketCopies,
@@ -61,7 +64,7 @@ public:
void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
void sendDown(const std::shared_ptr<api::StorageMessage>&) override;
// Bypasses message tracker component. Thread safe.
- void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&);
+ void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override;
ChainedMessageSender& getMessageSender() override {
return (_messageSender == 0 ? *this : *_messageSender);
@@ -260,13 +263,12 @@ private:
lib::ClusterStateBundle _clusterStateBundle;
- DistributorComponentRegister& _compReg;
- storage::DistributorComponent _component;
std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
// Read-only bucket space repo with DBs that only contain buckets transiently
// during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo
// and the DBs are empty during non-transition phases.
std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo;
+ storage::distributor::DistributorComponent _component;
std::shared_ptr<DistributorMetricSet> _metrics;
OperationOwner _operationOwner;
@@ -277,6 +279,7 @@ private:
StatusReporterDelegate _distributorStatusDelegate;
StatusReporterDelegate _bucketDBStatusDelegate;
IdealStateManager _idealStateManager;
+ ChainedMessageSender* _messageSender;
ExternalOperationHandler _externalOperationHandler;
std::shared_ptr<lib::Distribution> _distribution;
@@ -307,7 +310,6 @@ private:
DoneInitializeHandler& _doneInitializeHandler;
bool _doneInitializing;
- ChainedMessageSender* _messageSender;
std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb;
std::unique_ptr<SimpleMaintenanceScanner> _scanner;
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index 9b2c46f0071..083ffcdacf4 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -3,6 +3,7 @@
#pragma once
#include "bucketownership.h"
+#include "operation_routing_snapshot.h"
#include <vespa/document/bucket/bucketspace.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/storage/common/distributorcomponent.h>
@@ -15,6 +16,7 @@ namespace storage { class DistributorConfiguration; }
namespace storage::distributor {
class DistributorBucketSpaceRepo;
+class PendingMessageTracker;
/**
* Interface with functionality that is used when handling distributor operations.
@@ -30,12 +32,18 @@ public:
const std::vector<BucketCopy>& changed_nodes,
uint32_t update_flags = 0) = 0;
virtual void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) = 0;
- virtual const DistributorBucketSpaceRepo& bucket_space_repo() const = 0;
+ virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept= 0;
+ virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
+ virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
+ virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
+ virtual document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& docId) const = 0;
+ virtual const DistributorConfiguration& distributor_config() const noexcept = 0;
virtual void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space,
const BucketDatabase::Entry& entry,
uint8_t pri) = 0;
- virtual const DistributorConfiguration& distributor_config() const = 0;
+ virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket& bucket) const = 0;
+ virtual PendingMessageTracker& pending_message_tracker() noexcept = 0;
virtual bool has_pending_message(uint16_t node_index,
const document::Bucket& bucket,
uint32_t message_type) const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index d7fffaaf271..18923c44b1b 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -166,16 +166,34 @@ public:
void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) override {
removeNodeFromDB(bucket, node_index);
}
- const DistributorBucketSpaceRepo& bucket_space_repo() const override {
+ const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override {
return getBucketSpaceRepo();
}
+ DistributorBucketSpaceRepo& bucket_space_repo() noexcept override {
+ return getBucketSpaceRepo();
+ }
+ const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override {
+ return getReadOnlyBucketSpaceRepo();
+ }
+ DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override {
+ return getReadOnlyBucketSpaceRepo();
+ }
+ document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& docId) const override {
+ return getBucketId(docId);
+ }
+ const DistributorConfiguration& distributor_config() const noexcept override {
+ return getDistributor().getConfig();
+ }
void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space,
const BucketDatabase::Entry& entry,
uint8_t pri) override {
getDistributor().checkBucketForSplit(bucket_space, entry, pri);
}
- const DistributorConfiguration& distributor_config() const override {
- return getDistributor().getConfig();
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket& bucket) const override {
+ return getDistributor().read_snapshot_for_bucket(bucket);
+ }
+ PendingMessageTracker& pending_message_tracker() noexcept override {
+ return getDistributor().getPendingMessageTracker();
}
bool has_pending_message(uint16_t node_index,
const document::Bucket& bucket,
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index b9eaebb0bd1..5fe5e51202a 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -29,43 +29,52 @@ LOG_SETUP(".distributor.manager");
namespace storage::distributor {
class DirectDispatchSender : public DistributorMessageSender {
- Distributor& _distributor;
+ DistributorNodeContext& _node_ctx;
+ NonTrackingMessageSender& _msg_sender;
public:
- explicit DirectDispatchSender(Distributor& distributor)
- : _distributor(distributor)
+ DirectDispatchSender(DistributorNodeContext& node_ctx,
+ NonTrackingMessageSender& msg_sender)
+ : _node_ctx(node_ctx),
+ _msg_sender(msg_sender)
{}
~DirectDispatchSender() override = default;
void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override {
- _distributor.send_up_without_tracking(cmd);
+ _msg_sender.send_up_without_tracking(cmd);
}
void sendReply(const std::shared_ptr<api::StorageReply>& reply) override {
- _distributor.send_up_without_tracking(reply);
+ _msg_sender.send_up_without_tracking(reply);
}
int getDistributorIndex() const override {
- return _distributor.getDistributorIndex(); // Thread safe
+ return _node_ctx.node_index();
}
const vespalib::string& getClusterName() const override {
- return _distributor.getClusterName(); // Thread safe
+ return _node_ctx.cluster_name();
}
const PendingMessageTracker& getPendingMessageTracker() const override {
abort(); // Never called by the messages using this component.
}
};
-ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DistributorMetricSet& metrics,
+ ChainedMessageSender& msg_sender,
+ NonTrackingMessageSender& non_tracking_sender,
+ DocumentSelectionParser& parser,
const MaintenanceOperationGenerator& gen,
- OperationOwner& operation_owner,
- DistributorComponentRegister& compReg)
- : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"),
- _direct_dispatch_sender(std::make_unique<DirectDispatchSender>(owner)),
+ OperationOwner& operation_owner)
+ : _node_ctx(node_ctx),
+ _op_ctx(op_ctx),
+ _metrics(metrics),
+ _msg_sender(msg_sender),
+ _parser(parser),
+ _direct_dispatch_sender(std::make_unique<DirectDispatchSender>(node_ctx, non_tracking_sender)),
_operationGenerator(gen),
_rejectFeedBeforeTimeReached(), // At epoch
_distributor_operation_owner(operation_owner),
_non_main_thread_ops_mutex(),
- _non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()),
+ _non_main_thread_ops_owner(*_direct_dispatch_sender, _node_ctx.clock()),
_concurrent_gets_enabled(false),
_use_weak_internal_read_consistency_for_gets(false)
{
@@ -103,11 +112,11 @@ ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime)
bool
ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd)
{
- const auto now = TimePoint(std::chrono::seconds(getClock().getTimeInSeconds().getTime()));
+ const auto now = TimePoint(std::chrono::seconds(_node_ctx.clock().getTimeInSeconds().getTime()));
if (now < _rejectFeedBeforeTimeReached) {
api::StorageReply::UP reply(cmd.makeReply());
reply->setResult(makeSafeTimeRejectionResult(now));
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+ _msg_sender.sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
return false;
}
return true;
@@ -116,7 +125,7 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd)
void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result) {
api::StorageReply::UP reply(cmd.makeReply());
reply->setResult(result);
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+ _msg_sender.sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
}
void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd,
@@ -141,7 +150,7 @@ void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageComman
}
void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
- const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ const auto& cluster_state = _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).getClusterState();
bounce_with_wrong_distribution(cmd, cluster_state);
}
@@ -157,7 +166,7 @@ void ExternalOperationHandler::bounce_with_busy_during_state_transition(
api::StorageReply::UP reply(cmd.makeReply());
api::ReturnCode ret(api::ReturnCode::BUSY, status_str);
reply->setResult(ret);
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+ _msg_sender.sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
}
bool
@@ -165,7 +174,7 @@ ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageComman
const document::BucketId &bucketId,
PersistenceOperationMetricSet& persistenceMetrics)
{
- auto &bucket_space(_bucketSpaceRepo.get(cmd.getBucket().getBucketSpace()));
+ auto &bucket_space(_op_ctx.bucket_space_repo().get(cmd.getBucket().getBucketSpace()));
if (!bucket_space.owns_bucket_in_current_state(bucketId)) {
document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId);
LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution",
@@ -179,7 +188,7 @@ ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageComman
if (!pending.isOwned()) {
// We return BUSY here instead of WrongDistributionReply to avoid clients potentially
// ping-ponging between cluster state versions during a state transition.
- auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ auto& current_state = _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).getClusterState();
auto& pending_state = pending.getNonOwnedState();
bounce_with_busy_during_state_transition(cmd, current_state, pending_state);
return false;
@@ -207,7 +216,7 @@ ExternalOperationHandler::makeConcurrentMutationRejectionReply(api::StorageComma
}
bool ExternalOperationHandler::allowMutation(const SequencingHandle& handle) const {
- const auto& config(getDistributor().getConfig());
+ const auto& config(_op_ctx.distributor_config());
if (!config.getSequenceMutatingOperations()) {
// Sequencing explicitly disabled, so always allow.
return true;
@@ -222,7 +231,7 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op(
PersistenceOperationMetricSet& metrics,
Func func)
{
- auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ auto &bucket_space(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
if (!bucket_space.owns_bucket_in_current_state(bucket.getBucketId())) {
LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution",
cmd.toString().c_str(), bucket.toString().c_str());
@@ -233,12 +242,12 @@ void ExternalOperationHandler::bounce_or_invoke_read_only_op(
auto pending = bucket_space.check_ownership_in_pending_state(bucket.getBucketId());
if (pending.isOwned()) {
- func(_bucketSpaceRepo);
+ func(_op_ctx.bucket_space_repo());
} else {
- if (getDistributor().getConfig().allowStaleReadsDuringClusterStateTransitions()) {
- func(_readOnlyBucketSpaceRepo);
+ if (_op_ctx.distributor_config().allowStaleReadsDuringClusterStateTransitions()) {
+ func(_op_ctx.read_only_bucket_space_repo());
} else {
- auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ auto& current_state = _op_ctx.bucket_space_repo().get(document::FixedBucketSpaces::default_space()).getClusterState();
auto& pending_state = pending.getNonOwnedState();
bounce_with_busy_during_state_transition(cmd, current_state, pending_state);
}
@@ -256,12 +265,12 @@ bool put_is_allowed_through_bucket_lock(const api::PutCommand& cmd) {
bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd) {
auto& metrics = getMetrics().puts;
- if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) {
+ if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) {
return true;
}
if (cmd->getTimestamp() == 0) {
- cmd->setTimestamp(getUniqueTimestamp());
+ cmd->setTimestamp(_op_ctx.generate_unique_timestamp());
}
const auto bucket_space = cmd->getBucket().getBucketSpace();
@@ -279,11 +288,11 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd
}
}
if (allow) {
- _op = std::make_shared<PutOperation>(*this, *this,
- _bucketSpaceRepo.get(bucket_space),
+ _op = std::make_shared<PutOperation>(_node_ctx, _op_ctx,
+ _op_ctx.bucket_space_repo().get(bucket_space),
std::move(cmd), getMetrics().puts, std::move(handle));
} else {
- sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
+ _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
}
return true;
@@ -292,21 +301,21 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd
bool ExternalOperationHandler::onUpdate(const std::shared_ptr<api::UpdateCommand>& cmd) {
auto& metrics = getMetrics().updates;
- if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) {
+ if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) {
return true;
}
if (cmd->getTimestamp() == 0) {
- cmd->setTimestamp(getUniqueTimestamp());
+ cmd->setTimestamp(_op_ctx.generate_unique_timestamp());
}
const auto bucket_space = cmd->getBucket().getBucketSpace();
auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId());
if (allowMutation(handle)) {
- _op = std::make_shared<TwoPhaseUpdateOperation>(*this, *this, *this,
- _bucketSpaceRepo.get(bucket_space),
+ _op = std::make_shared<TwoPhaseUpdateOperation>(_node_ctx, _op_ctx, _parser,
+ _op_ctx.bucket_space_repo().get(bucket_space),
std::move(cmd), getMetrics(), std::move(handle));
} else {
- sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
+ _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
}
return true;
@@ -315,22 +324,22 @@ bool ExternalOperationHandler::onUpdate(const std::shared_ptr<api::UpdateCommand
bool ExternalOperationHandler::onRemove(const std::shared_ptr<api::RemoveCommand>& cmd) {
auto& metrics = getMetrics().removes;
- if (!checkTimestampMutationPreconditions(*cmd, getBucketId(cmd->getDocumentId()), metrics)) {
+ if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) {
return true;
}
if (cmd->getTimestamp() == 0) {
- cmd->setTimestamp(getUniqueTimestamp());
+ cmd->setTimestamp(_op_ctx.generate_unique_timestamp());
}
const auto bucket_space = cmd->getBucket().getBucketSpace();
auto handle = _operation_sequencer.try_acquire(bucket_space, cmd->getDocumentId());
if (allowMutation(handle)) {
- auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket_space));
+ auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket_space));
- _op = std::make_shared<RemoveOperation>(*this, *this, distributorBucketSpace, std::move(cmd),
+ _op = std::make_shared<RemoveOperation>(_node_ctx, _op_ctx, distributorBucketSpace, std::move(cmd),
getMetrics().removes, std::move(handle));
} else {
- sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
+ _msg_sender.sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
}
return true;
@@ -338,7 +347,7 @@ bool ExternalOperationHandler::onRemove(const std::shared_ptr<api::RemoveCommand
bool ExternalOperationHandler::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd) {
document::BucketId bid;
- RemoveLocationOperation::getBucketId(*this, *this, *cmd, bid);
+ RemoveLocationOperation::getBucketId(_node_ctx, _parser, *cmd, bid);
document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid);
auto& metrics = getMetrics().removelocations;
@@ -346,8 +355,8 @@ bool ExternalOperationHandler::onRemoveLocation(const std::shared_ptr<api::Remov
return true;
}
- _op = std::make_shared<RemoveLocationOperation>(*this, *this, *this,
- _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
+ _op = std::make_shared<RemoveLocationOperation>(_node_ctx, _op_ctx, _parser,
+ _op_ctx.bucket_space_repo().get(cmd->getBucket().getBucketSpace()),
std::move(cmd), getMetrics().removelocations);
return true;
}
@@ -359,9 +368,9 @@ api::InternalReadConsistency ExternalOperationHandler::desired_get_read_consiste
}
std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr<api::GetCommand>& cmd) {
- document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
+ document::Bucket bucket(cmd->getBucket().getBucketSpace(), _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()));
auto& metrics = getMetrics().gets;
- auto snapshot = getDistributor().read_snapshot_for_bucket(bucket);
+ auto snapshot = _op_ctx.read_snapshot_for_bucket(bucket);
if (!snapshot.is_routable()) {
const auto& ctx = snapshot.context();
if (ctx.has_pending_state_transition()) {
@@ -376,7 +385,7 @@ std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(
// The snapshot is aware of whether stale reads are enabled, so we don't have to check that here.
const auto* space_repo = snapshot.bucket_space_repo();
assert(space_repo != nullptr);
- return std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()),
+ return std::make_shared<GetOperation>(_node_ctx, space_repo->get(bucket.getBucketSpace()),
snapshot.steal_read_guard(), cmd, metrics,
desired_get_read_consistency());
}
@@ -400,21 +409,21 @@ bool ExternalOperationHandler::onGetBucketList(const std::shared_ptr<api::GetBuc
bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
auto& bucket_database = bucket_space.getBucketDatabase();
- _op = std::make_shared<StatBucketListOperation>(bucket_database, _operationGenerator, getIndex(), cmd);
+ _op = std::make_shared<StatBucketListOperation>(bucket_database, _operationGenerator, _node_ctx.node_index(), cmd);
});
return true;
}
bool ExternalOperationHandler::onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>& cmd) {
// TODO same handling as Gets (VisitorOperation needs to change)
- const DistributorConfiguration& config(getDistributor().getConfig());
+ const auto& config(_op_ctx.distributor_config());
VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor());
- auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- auto visit_op = std::make_shared<VisitorOperation>(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits);
+ auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(cmd->getBucket().getBucketSpace()));
+ auto visit_op = std::make_shared<VisitorOperation>(_node_ctx, _op_ctx, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits);
if (visit_op->is_read_for_write()) {
_op = std::make_shared<ReadForWriteVisitorOperationStarter>(std::move(visit_op), _operation_sequencer,
_distributor_operation_owner,
- getDistributor().getPendingMessageTracker());
+ _op_ctx.pending_message_tracker());
} else {
_op = std::move(visit_op);
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 00dc9a49d74..0b6e9d970aa 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -25,8 +25,7 @@ class MaintenanceOperationGenerator;
class DirectDispatchSender;
class OperationOwner;
-class ExternalOperationHandler : public DistributorComponent,
- public api::MessageHandler
+class ExternalOperationHandler : public api::MessageHandler
{
public:
using Clock = std::chrono::system_clock;
@@ -41,12 +40,14 @@ public:
bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override;
bool onGetBucketList(const std::shared_ptr<api::GetBucketListCommand>&) override;
- ExternalOperationHandler(Distributor& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
+ ExternalOperationHandler(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DistributorMetricSet& metrics,
+ ChainedMessageSender& msg_sender,
+ NonTrackingMessageSender& non_tracking_sender,
+ DocumentSelectionParser& parser,
const MaintenanceOperationGenerator& gen,
- OperationOwner& operation_owner,
- DistributorComponentRegister& compReg);
+ OperationOwner& operation_owner);
~ExternalOperationHandler() override;
@@ -84,6 +85,11 @@ public:
}
private:
+ DistributorNodeContext& _node_ctx;
+ DistributorOperationContext& _op_ctx;
+ DistributorMetricSet& _metrics;
+ ChainedMessageSender& _msg_sender;
+ DocumentSelectionParser& _parser;
std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender;
const MaintenanceOperationGenerator& _operationGenerator;
OperationSequencer _operation_sequencer;
@@ -124,7 +130,7 @@ private:
api::InternalReadConsistency desired_get_read_consistency() const noexcept;
- DistributorMetricSet& getMetrics() { return getDistributor().getMetrics(); }
+ DistributorMetricSet& getMetrics() { return _metrics; }
};
}
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index e6bee3248d4..37174903697 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -108,7 +108,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
// manager, which is safe since the lifetime of said state manager
// extends to the end of the process.
builder.add(std::make_unique<storage::distributor::Distributor>
- (dcr, *_threadPool, getDoneInitializeHandler(),
+ (dcr, *_node_identity, *_threadPool, getDoneInitializeHandler(),
_manageActiveBucketCopies,
stateManager->getHostInfo()));
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 120965cc4fe..74052932853 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -1,21 +1,22 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "config_logging.h"
-#include "storagenode.h"
#include "communicationmanager.h"
+#include "config_logging.h"
#include "statemanager.h"
#include "statereporter.h"
#include "storagemetricsset.h"
+#include "storagenode.h"
#include "storagenodecontext.h"
-#include <vespa/storage/frameworkimpl/status/statuswebserver.h>
-#include <vespa/storage/frameworkimpl/thread/deadlockdetector.h>
+#include <vespa/metrics/metricmanager.h>
+#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/statusmetricconsumer.h>
#include <vespa/storage/common/storage_chain_builder.h>
+#include <vespa/storage/frameworkimpl/status/statuswebserver.h>
+#include <vespa/storage/frameworkimpl/thread/deadlockdetector.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/time.h>
-#include <vespa/metrics/metricmanager.h>
#include <fcntl.h>
#include <vespa/log/log.h>
@@ -96,6 +97,7 @@ StorageNode::StorageNode(
_newDoctypesConfig(),
_newBucketSpacesConfig(),
_component(),
+ _node_identity(),
_configUri(configUri),
_communicationManager(nullptr),
_chain_builder(std::make_unique<StorageChainBuilder>())
@@ -143,6 +145,7 @@ StorageNode::initialize()
_context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory());
_context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(*_distributionConfig));
_context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig);
+ _node_identity = std::make_unique<NodeIdentity>(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex);
_metrics = std::make_shared<StorageMetricSet>();
_component = std::make_unique<StorageComponent>(_context.getComponentRegister(), "storagenode");
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 259f48f3492..44b87f4b61c 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -30,21 +30,22 @@ namespace document { class DocumentTypeRepo; }
namespace storage {
-class StatusMetricConsumer;
-class StateReporter;
+class ApplicationGenerationFetcher;
class CommunicationManager;
class FileStorManager;
class HostInfo;
-class StateManager;
+class IStorageChainBuilder;
class MemoryStatusViewer;
+class NodeIdentity;
+class StateManager;
+class StateReporter;
+class StatusMetricConsumer;
class StatusWebServer;
+class StorageComponent;
class StorageLink;
struct DeadLockDetector;
struct StorageMetricSet;
struct StorageNodeContext;
-class ApplicationGenerationFetcher;
-class IStorageChainBuilder;
-class StorageComponent;
namespace lib { class NodeType; }
@@ -156,6 +157,7 @@ protected:
std::unique_ptr<document::DocumenttypesConfig> _newDoctypesConfig;
std::unique_ptr<BucketspacesConfig> _newBucketSpacesConfig;
std::unique_ptr<StorageComponent> _component;
+ std::unique_ptr<NodeIdentity> _node_identity;
config::ConfigUri _configUri;
CommunicationManager* _communicationManager;
private: