summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-03-22 14:15:16 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-03-22 15:05:38 +0000
commitbe8d412fedaa421b4433e4262bd5c6b5cf36233d (patch)
tree410983ce311a14b62080b2ef8ba140e045ce9ec0 /storage/src
parentad0c199d5907794aae0e5b11068efcf338ee0c6d (diff)
Remove functions from DistributorStripeComponent that are part of DistributorOperationContext interface.
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp14
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp7
-rw-r--r--storage/src/tests/distributor/distributortestutil.h6
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp12
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp20
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.cpp68
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.h100
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp2
17 files changed, 116 insertions, 134 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 61c74a263cf..6abe594388f 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -127,9 +127,9 @@ struct DistributorTest : Test, DistributorTestUtil {
uint32_t flags(DatabaseUpdate::CREATE_IF_NONEXISTING
| (resetTrusted ? DatabaseUpdate::RESET_TRUSTED : 0));
- distributor_component().updateBucketDatabase(makeDocumentBucket(document::BucketId(16, 1)),
- changedNodes,
- flags);
+ operation_context().update_bucket_database(makeDocumentBucket(document::BucketId(16, 1)),
+ changedNodes,
+ flags);
}
std::string retVal = dumpBucket(document::BucketId(16, 1));
@@ -572,8 +572,8 @@ TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state
std::vector<BucketCopy> copies;
copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2));
- distributor_component().updateBucketDatabase(makeDocumentBucket(nonOwnedBucket), copies,
- DatabaseUpdate::CREATE_IF_NONEXISTING);
+ operation_context().update_bucket_database(makeDocumentBucket(nonOwnedBucket), copies,
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket));
}
@@ -585,8 +585,8 @@ TEST_F(DistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_cur
std::vector<BucketCopy> copies;
copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2));
- distributor_component().updateBucketDatabase(makeDocumentBucket(bucket), copies,
- DatabaseUpdate::CREATE_IF_NONEXISTING);
+ operation_context().update_bucket_database(makeDocumentBucket(bucket), copies,
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
BucketDatabase::Entry e(getBucket(bucket));
EXPECT_EQ(101234, e->getLastGarbageCollectionTime());
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 5d7a61692f6..7929cc1c906 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -259,7 +259,7 @@ void
DistributorTestUtil::addIdealNodes(const document::BucketId& id)
{
// TODO STRIPE roundabout way of getting state bundle..!
- addIdealNodes(*distributor_component().getClusterStateBundle().getBaselineClusterState(), id);
+ addIdealNodes(*operation_context().cluster_state_bundle().getBaselineClusterState(), id);
}
void
@@ -357,6 +357,11 @@ DistributorTestUtil::distributor_component() {
return _distributor->distributor_component();
}
+storage::distributor::DistributorOperationContext&
+DistributorTestUtil::operation_context() {
+ return _distributor->distributor_component();
+}
+
bool
DistributorTestUtil::tick() {
framework::ThreadWaitInfo res(
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 809a87d3bb7..d3c0445d5b5 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -21,10 +21,11 @@ class BucketDBUpdater;
class Distributor;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
-class DistributorStripeComponent;
+class DistributorOperationContext;
class DistributorStripe;
-class IdealStateManager;
+class DistributorStripeComponent;
class ExternalOperationHandler;
+class IdealStateManager;
class Operation;
// TODO STRIPE rename to DistributorStripeTestUtil?
@@ -115,6 +116,7 @@ public:
IdealStateManager& getIdealStateManager();
ExternalOperationHandler& getExternalOperationHandler();
storage::distributor::DistributorStripeComponent& distributor_component();
+ storage::distributor::DistributorOperationContext& operation_context();
Distributor& getDistributor() {
return *_distributor;
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 1829808990a..6d8086696b8 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -99,19 +99,19 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "16");
EXPECT_EQ(document::BucketId(16, 0xffff),
- distributor_component().getBucketId(document::DocumentId(
+ operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0xffff))
).stripUnused());
EXPECT_EQ(document::BucketId(16, 0),
- distributor_component().getBucketId(document::DocumentId(
+ operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x10000))
).stripUnused());
EXPECT_EQ(document::BucketId(16, 0xffff),
- distributor_component().getBucketId(document::DocumentId(
+ operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0xffff))
).stripUnused());
EXPECT_EQ(document::BucketId(16, 0x100),
- distributor_component().getBucketId(document::DocumentId(
+ operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x100))
).stripUnused());
close();
@@ -120,11 +120,11 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "20");
createLinks();
EXPECT_EQ(document::BucketId(20, 0x11111),
- distributor_component().getBucketId(document::DocumentId(
+ operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x111111))
).stripUnused());
EXPECT_EQ(document::BucketId(20, 0x22222),
- distributor_component().getBucketId(document::DocumentId(
+ operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
vespalib::make_string("id:ns:test:n=%d::", 0x222222))
).stripUnused());
}
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 1123c354ef4..cb671bb07f5 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -46,7 +46,7 @@ struct GetOperationTest : Test, DistributorTestUtil {
createLinks();
docId = document::DocumentId("id:ns:text/html::uri");
- bucketId = distributor_component().getBucketId(docId);
+ bucketId = operation_context().make_split_bit_constrained_bucket_id(docId);
};
void TearDown() override {
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index c510e08ab2a..ffd07ad9d60 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -104,7 +104,7 @@ document::BucketId
PutOperationTest::createAndSendSampleDocument(vespalib::duration timeout) {
auto doc = std::make_shared<Document>(doc_type(), DocumentId("id:test:testdoctype1::"));
- document::BucketId id = distributor_component().getBucketId(doc->getId());
+ document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
addIdealNodes(id);
auto msg = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), doc, 0);
@@ -150,7 +150,7 @@ TEST_F(PutOperationTest, bucket_database_gets_special_entry_when_CreateBucket_se
// Database updated before CreateBucket is sent
ASSERT_EQ("BucketId(0x4000000000008f09) : "
"node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false)",
- dumpBucket(distributor_component().getBucketId(doc->getId())));
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
ASSERT_EQ("Create bucket => 0,Put => 0", _sender.getCommands(true));
}
@@ -197,7 +197,7 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck
"id:test:testdoctype1::, timestamp 100, size 45) => 1",
_sender.getCommands(true, true));
- distributor_component().removeNodeFromDB(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0);
+ operation_context().remove_node_from_bucket_database(makeDocumentBucket(document::BucketId(16, 0x1dd4)), 0);
// If we get an ACK from the backend nodes, the operation has been persisted OK.
// Even if the bucket has been removed from the DB in the meantime (usually would
@@ -249,7 +249,7 @@ TEST_F(PutOperationTest, multiple_copies) {
"node(idx=3,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
"node(idx=2,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false), "
"node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
- dumpBucket(distributor_component().getBucketId(doc->getId())));
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
}
TEST_F(PutOperationTest, multiple_copies_early_return_primary_required) {
@@ -477,7 +477,7 @@ parseBucketInfoString(const std::string& nodeList) {
std::string
PutOperationTest::getNodes(const std::string& infoString) {
Document::SP doc(createDummyDocument("test", "uri"));
- document::BucketId bid(distributor_component().getBucketId(doc->getId()));
+ document::BucketId bid(operation_context().make_split_bit_constrained_bucket_id(doc->getId()));
BucketInfo entry = parseBucketInfoString(infoString);
@@ -519,7 +519,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
setupDistributor(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
Document::SP doc(createDummyDocument("test", "uri"));
- document::BucketId bId = distributor_component().getBucketId(doc->getId());
+ document::BucketId bId = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
addNodesToBucketDB(bId, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
@@ -536,14 +536,14 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
ASSERT_EQ("BucketId(0x4000000000000593) : "
"node(idx=0,crc=0x7,docs=8/8,bytes=9/9,trusted=true,active=false,ready=false)",
- dumpBucket(distributor_component().getBucketId(doc->getId())));
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
}
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "uri");
- auto bucket = distributor_component().getBucketId(doc->getId());
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
sendPut(createPut(doc));
@@ -577,7 +577,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
setupDistributor(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "test");
- auto bucket = distributor_component().getBucketId(doc->getId());
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
getBucketDBUpdater().onSetSystemState(
std::make_shared<api::SetSystemStateCommand>(
@@ -597,7 +597,7 @@ TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
"distributor:1 storage:2 .0.s:r .1.s:r");
Document::SP doc(createDummyDocument("test", "uri"));
document::BucketId bucket(
- distributor_component().getBucketId(doc->getId()));
+ operation_context().make_split_bit_constrained_bucket_id(doc->getId()));
addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t");
sendPut(createPut(doc));
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index de76379e854..c4892f342e7 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -24,7 +24,7 @@ struct RemoveOperationTest : Test, DistributorTestUtil {
createLinks();
docId = document::DocumentId("id:test:test::uri");
- bucketId = distributor_component().getBucketId(docId);
+ bucketId = operation_context().make_split_bit_constrained_bucket_id(docId);
enableDistributorClusterState("distributor:1 storage:4");
};
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 58556832f2d..dae94e41b46 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -309,7 +309,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
}
update->setCreateIfNonExistent(options._createIfNonExistent);
- document::BucketId id = distributor_component().getBucketId(update->getId());
+ document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId());
document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId());
if (bucketState.length()) {
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index ea9f0d86ac4..6620cf58571 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -60,7 +60,7 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m
document::DocumentId("id:ns:" + _html_type->getName() + "::1"));
update->setCreateIfNonExistent(create_if_missing);
- _bId = distributor_component().getBucketId(update->getId());
+ _bId = operation_context().make_split_bit_constrained_bucket_id(update->getId());
addNodesToBucketDB(_bId, bucketState);
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
index 5e9f717fc7f..4aab82ee6f8 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
@@ -41,12 +41,6 @@ DistributorStripeComponent::sendUp(const api::StorageMessage::SP& msg)
_distributor.getMessageSender().sendUp(msg);
}
-const lib::ClusterStateBundle&
-DistributorStripeComponent::getClusterStateBundle() const
-{
- return _distributor.getClusterStateBundle();
-}
-
bool
DistributorStripeComponent::checkDistribution(api::StorageCommand &cmd, const document::Bucket &bucket)
{
@@ -178,10 +172,10 @@ UpdateBucketDatabaseProcessor::process_entry(BucketDatabase::Entry &entry) const
}
void
-DistributorStripeComponent::updateBucketDatabase(
- const document::Bucket &bucket,
- const std::vector<BucketCopy>& changedNodes,
- uint32_t updateFlags)
+DistributorStripeComponent::update_bucket_database(
+ const document::Bucket& bucket,
+ const std::vector<BucketCopy>& changed_nodes,
+ uint32_t update_flags)
{
auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
assert(!(bucket.getBucketId() == document::BucketId()));
@@ -200,7 +194,7 @@ DistributorStripeComponent::updateBucketDatabase(
// bucket database (i.e. copies on nodes that are actually unavailable).
const auto& available_nodes = bucketSpace.get_available_nodes();
bool found_down_node = false;
- for (const auto& copy : changedNodes) {
+ for (const auto& copy : changed_nodes) {
if (copy.getNode() >= available_nodes.size() || !available_nodes[copy.getNode()]) {
found_down_node = true;
break;
@@ -210,18 +204,18 @@ DistributorStripeComponent::updateBucketDatabase(
// bucket copy vector
std::vector<BucketCopy> up_nodes;
if (found_down_node) {
- up_nodes.reserve(changedNodes.size());
- for (uint32_t i = 0; i < changedNodes.size(); ++i) {
- const BucketCopy& copy(changedNodes[i]);
+ up_nodes.reserve(changed_nodes.size());
+ for (uint32_t i = 0; i < changed_nodes.size(); ++i) {
+ const BucketCopy& copy(changed_nodes[i]);
if (copy.getNode() < available_nodes.size() && available_nodes[copy.getNode()]) {
up_nodes.emplace_back(copy);
}
}
}
- UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changedNodes, bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(), (updateFlags & DatabaseUpdate::RESET_TRUSTED) != 0);
+ UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changed_nodes, bucketSpace.get_ideal_service_layer_nodes_bundle(bucket.getBucketId()).get_available_nodes(), (update_flags & DatabaseUpdate::RESET_TRUSTED) != 0);
- bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (updateFlags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0);
+ bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (update_flags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0);
}
void
@@ -231,24 +225,6 @@ DistributorStripeComponent::recheckBucketInfo(uint16_t nodeIdx, const document::
}
document::BucketId
-DistributorStripeComponent::getBucketId(const document::DocumentId& docId) const
-{
- document::BucketId id(getBucketIdFactory().getBucketId(docId));
-
- id.setUsedBits(_distributor.getConfig().getMinimalBucketSplit());
- return id.stripUnused();
-}
-
-bool
-DistributorStripeComponent::storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const
-{
- const lib::NodeState& ns = getClusterStateBundle().getDerivedClusterState(bucketSpace)->getNodeState(
- lib::Node(lib::NodeType::STORAGE, nodeIndex));
-
- return ns.getState().oneOf(_distributor.getStorageNodeUpStates());
-}
-
-document::BucketId
DistributorStripeComponent::getSibling(const document::BucketId& bid) const {
document::BucketId zeroBucket;
document::BucketId oneBucket;
@@ -293,6 +269,15 @@ DistributorStripeComponent::node_address(uint16_t node_index) const noexcept
// Implements DistributorOperationContext
+document::BucketId
+DistributorStripeComponent::make_split_bit_constrained_bucket_id(const document::DocumentId& doc_id) const
+{
+ document::BucketId id(getBucketIdFactory().getBucketId(doc_id));
+
+ id.setUsedBits(_distributor.getConfig().getMinimalBucketSplit());
+ return id.stripUnused();
+}
+
bool
DistributorStripeComponent::has_pending_message(uint16_t node_index,
const document::Bucket& bucket,
@@ -302,6 +287,21 @@ DistributorStripeComponent::has_pending_message(uint16_t node_index,
return sender.getPendingMessageTracker().hasPendingMessage(node_index, bucket, message_type);
}
+const lib::ClusterStateBundle&
+DistributorStripeComponent::cluster_state_bundle() const
+{
+ return _distributor.getClusterStateBundle();
+}
+
+bool
+DistributorStripeComponent::storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const
+{
+ const lib::NodeState& ns = cluster_state_bundle().getDerivedClusterState(bucket_space)->getNodeState(
+ lib::Node(lib::NodeType::STORAGE, node_index));
+
+ return ns.getState().oneOf(_distributor.getStorageNodeUpStates());
+}
+
std::unique_ptr<document::select::Node>
DistributorStripeComponent::parse_selection(const vespalib::string& selection) const
{
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.h b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
index 892d3c8cb62..ae54eacf9fd 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
@@ -43,17 +43,6 @@ public:
~DistributorStripeComponent() override;
/**
- * Returns a reference to the current cluster state bundle. Valid until the
- * next time the distributor main thread processes its message queue.
- */
- const lib::ClusterStateBundle& getClusterStateBundle() const;
-
- /**
- * Returns true if the given storage node is in an "up state".
- */
- bool storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const;
-
- /**
* Verifies that the given command has been received at the
* correct distributor based on the current system state.
*/
@@ -68,46 +57,11 @@ public:
const std::vector<uint16_t>& nodes);
/**
- * Removes a copy from the given bucket from the bucket database.
- * If the resulting bucket is empty afterwards, removes the entire
- * bucket entry from the bucket database.
- */
- void removeNodeFromDB(const document::Bucket &bucket, uint16_t node) {
- removeNodesFromDB(bucket, toVector<uint16_t>(node));
- }
-
- /**
- * Adds the given copies to the bucket database.
- */
- void updateBucketDatabase(
- const document::Bucket &bucket,
- const std::vector<BucketCopy>& changedNodes,
- uint32_t updateFlags = 0);
-
- /**
- * Simple API for the common case of modifying a single node.
- */
- void updateBucketDatabase(
- const document::Bucket &bucket,
- const BucketCopy& changedNode,
- uint32_t updateFlags = 0)
- {
- updateBucketDatabase(bucket,
- toVector<BucketCopy>(changedNode),
- updateFlags);
- }
-
- /**
* Fetch bucket info about the given bucket from the given node.
* Used when we get BUCKET_NOT_FOUND.
*/
void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket);
- /**
- * Returns the bucket id corresponding to the given document id.
- */
- document::BucketId getBucketId(const document::DocumentId& docId) const;
-
void sendDown(const api::StorageMessage::SP&);
void sendUp(const api::StorageMessage::SP&);
@@ -148,34 +102,46 @@ public:
// Implements DistributorOperationContext
api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); }
+
+ /**
+ * Simple API for the common case of modifying a single node.
+ */
void update_bucket_database(const document::Bucket& bucket,
const BucketCopy& changed_node,
uint32_t update_flags = 0) override {
- updateBucketDatabase(bucket, changed_node, update_flags);
+ update_bucket_database(bucket,
+ toVector<BucketCopy>(changed_node),
+ update_flags);
}
+
+ /**
+ * Adds the given copies to the bucket database.
+ */
virtual void update_bucket_database(const document::Bucket& bucket,
const std::vector<BucketCopy>& changed_nodes,
- uint32_t update_flags = 0) override {
- updateBucketDatabase(bucket, changed_nodes, update_flags);
- }
+ uint32_t update_flags = 0) override;
+
+ /**
+ * Removes a copy from the given bucket from the bucket database.
+ * If the resulting bucket is empty afterwards, removes the entire
+ * bucket entry from the bucket database.
+ */
void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) override {
- removeNodeFromDB(bucket, node_index);
+ removeNodesFromDB(bucket, toVector<uint16_t>(node_index));
}
const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override {
- return getBucketSpaceRepo();
+ return _bucketSpaceRepo;
}
DistributorBucketSpaceRepo& bucket_space_repo() noexcept override {
- return getBucketSpaceRepo();
+ return _bucketSpaceRepo;
}
const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override {
- return getReadOnlyBucketSpaceRepo();
+ return _readOnlyBucketSpaceRepo;
}
DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override {
- return getReadOnlyBucketSpaceRepo();
- }
- document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& docId) const override {
- return getBucketId(docId);
+ return _readOnlyBucketSpaceRepo;
}
+ document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId& doc_id) const override;
const DistributorConfiguration& distributor_config() const noexcept override {
return getDistributor().getConfig();
}
@@ -196,12 +162,18 @@ public:
const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override {
return getDistributor().pendingClusterStateOrNull(bucket_space);
}
- const lib::ClusterStateBundle& cluster_state_bundle() const override {
- return getClusterStateBundle();
- }
- bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override {
- return storageNodeIsUp(bucket_space, node_index);
- }
+
+ /**
+ * Returns a reference to the current cluster state bundle. Valid until the
+ * next time the distributor main thread processes its message queue.
+ */
+ const lib::ClusterStateBundle& cluster_state_bundle() const override;
+
+ /**
+ * Returns true if the given storage node is in an "up state".
+ */
+ bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override;
+
const char* storage_node_up_states() const override {
return getDistributor().getStorageNodeUpStates();
}
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index a7ea1f9400c..84fef955feb 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -69,7 +69,7 @@ IdealStateManager::iAmUp() const
{
Node node(NodeType::DISTRIBUTOR, _distributorComponent.getIndex());
// Assume that derived cluster states agree on distributor node being up
- const auto &state = *_distributorComponent.getClusterStateBundle().getBaselineClusterState();
+ const auto &state = *operation_context().cluster_state_bundle().getBaselineClusterState();
const lib::State &nodeState = state.getNodeState(node).getState();
const lib::State &clusterState = state.getClusterState();
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index 2665d7938ed..e5a38ab5161 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -77,7 +77,10 @@ public:
getBucketStatus(out);
}
+ // TODO STRIPE stop exposing this
DistributorStripeComponent& getDistributorComponent() { return _distributorComponent; }
+ DistributorOperationContext& operation_context() { return _distributorComponent; }
+ const DistributorOperationContext& operation_context() const { return _distributorComponent; }
DistributorBucketSpaceRepo &getBucketSpaceRepo() { return _bucketSpaceRepo; }
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const { return _bucketSpaceRepo; }
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 71f5693329a..c2f48a9c3d9 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -69,7 +69,7 @@ GarbageCollectionOperation::onReceive(DistributorMessageSender&,
void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
// TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
- _manager->getDistributorComponent().updateBucketDatabase(getBucket(), _replica_info);
+ _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
if (dbentry.valid()) {
dbentry->setLastGarbageCollectionTime(
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 44f7b5d3b49..4509e61746a 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -110,7 +110,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP&
rep.getSourceBuckets());
for (uint32_t i = 0; i < sourceBuckets.size(); i++) {
document::Bucket sourceBucket(msg->getBucket().getBucketSpace(), sourceBuckets[i]);
- _manager->getDistributorComponent().removeNodeFromDB(sourceBucket, node);
+ _manager->operation_context().remove_node_from_bucket_database(sourceBucket, node);
}
// Add new buckets.
@@ -118,7 +118,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP&
LOG(debug, "Invalid bucketinfo for bucket %s returned in join",
getBucketId().toString().c_str());
} else {
- _manager->getDistributorComponent().updateBucketDatabase(
+ _manager->operation_context().update_bucket_database(
getBucket(),
BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(),
node,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 3bd2406cd85..a8eee5caf49 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -81,7 +81,7 @@ RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply
vespalib::string(rep->getResult().getMessage()).c_str(),
rep->getBucketInfo().toString().c_str());
- _manager->getDistributorComponent().updateBucketDatabase(
+ _manager->operation_context().update_bucket_database(
getBucket(),
BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(),
node,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 2b3c80f9401..1e66cc174e0 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -101,7 +101,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP
// Must reset trusted since otherwise trustedness of inconsistent
// copies would be arbitrarily determined by which copy managed
// to finish its split first.
- _manager->getDistributorComponent().updateBucketDatabase(
+ _manager->operation_context().update_bucket_database(
document::Bucket(msg->getBucket().getBucketSpace(), sinfo.first), copy,
(DatabaseUpdate::CREATE_IF_NONEXISTING
| DatabaseUpdate::RESET_TRUSTED));