aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-04-29 13:53:35 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-04-29 13:53:35 +0000
commit94fd404a04ce2fec36f184daa12aadacc28d645e (patch)
treecac5635fc75e481217d41d8e96045997bfa3ba87 /storage
parentb5ffe5a1927c93c0c3eccc66c76be87f6361841e (diff)
Split DistributorMessageSender into two parts.
DistributorStripeMessageSender is used for all stripe related operations.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.h2
-rw-r--r--storage/src/tests/distributor/maintenancemocks.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_interface.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributormessagesender.h6
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h12
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h10
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp46
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h38
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp22
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.h22
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h8
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h10
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.h10
43 files changed, 176 insertions, 172 deletions
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h
index 3791839f3fe..59a5a82b7df 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.h
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.h
@@ -10,7 +10,7 @@
namespace storage {
-class DistributorMessageSenderStub : public distributor::DistributorMessageSender {
+class DistributorMessageSenderStub : public distributor::DistributorStripeMessageSender {
MessageSenderStub _stub_impl;
distributor::PendingMessageTracker* _pending_message_tracker;
distributor::OperationSequencer* _operation_sequencer;
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h
index 2bfb4ebb40f..fff798d4413 100644
--- a/storage/src/tests/distributor/maintenancemocks.h
+++ b/storage/src/tests/distributor/maintenancemocks.h
@@ -44,13 +44,13 @@ public:
return _bucket.toString();
}
- void onClose(DistributorMessageSender&) override {}
+ void onClose(DistributorStripeMessageSender&) override {}
const char* getName() const override { return "MockOperation"; }
const std::string& getDetailedReason() const override {
return _reason;
}
- void onStart(DistributorMessageSender&) override {}
- void onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&) override {}
+ void onStart(DistributorStripeMessageSender&) override {}
+ void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override {}
bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override {
return _shouldBlock;
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
index aef126b8318..8302e0d9684 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
@@ -257,7 +257,7 @@ DistributorStripeComponent::has_pending_message(uint16_t node_index,
const document::Bucket& bucket,
uint32_t message_type) const
{
- const auto& sender = static_cast<const DistributorMessageSender&>(getDistributor());
+ const auto& sender = static_cast<const DistributorStripeMessageSender&>(getDistributor());
return sender.getPendingMessageTracker().hasPendingMessage(node_index, bucket, message_type);
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
index d83acfabffc..2f0b74c8a30 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
@@ -21,7 +21,7 @@ class PendingMessageTracker;
/**
* TODO STRIPE add class comment.
*/
-class DistributorStripeInterface : public DistributorMessageSender
+class DistributorStripeInterface : public DistributorStripeMessageSender
{
public:
virtual PendingMessageTracker& getPendingMessageTracker() = 0;
diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h
index 54be92dc99a..c39e3e8fe8a 100644
--- a/storage/src/vespa/storage/distributor/distributormessagesender.h
+++ b/storage/src/vespa/storage/distributor/distributormessagesender.h
@@ -21,7 +21,11 @@ public:
const std::shared_ptr<api::StorageCommand>& cmd, bool useDocumentAPI = false);
virtual int getDistributorIndex() const = 0;
- virtual const ClusterContext & cluster_context() const = 0;
+ virtual const ClusterContext& cluster_context() const = 0;
+};
+
+class DistributorStripeMessageSender : public DistributorMessageSender {
+public:
virtual const PendingMessageTracker& getPendingMessageTracker() const = 0;
virtual const OperationSequencer& operation_sequencer() const noexcept = 0;
};
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 2acd04f7eef..e703c5bfdb8 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -31,7 +31,7 @@ LOG_SETUP(".distributor.manager");
namespace storage::distributor {
-class DirectDispatchSender : public DistributorMessageSender {
+class DirectDispatchSender : public DistributorStripeMessageSender {
DistributorNodeContext& _node_ctx;
NonTrackingMessageSender& _msg_sender;
public:
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index 56a5f28f2b6..d3f46343ebc 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -18,10 +18,10 @@ class Operation;
class OperationOwner : public OperationStarter {
public:
- class Sender : public DistributorMessageSender {
+ class Sender : public DistributorStripeMessageSender {
public:
Sender(OperationOwner& owner,
- DistributorMessageSender& sender,
+ DistributorStripeMessageSender& sender,
const std::shared_ptr<Operation>& cb)
: _owner(owner),
_sender(sender),
@@ -53,11 +53,11 @@ public:
private:
OperationOwner& _owner;
- DistributorMessageSender& _sender;
+ DistributorStripeMessageSender& _sender;
std::shared_ptr<Operation> _cb;
};
- OperationOwner(DistributorMessageSender& sender,
+ OperationOwner(DistributorStripeMessageSender& sender,
const framework::Clock& clock)
: _sender(sender),
_clock(clock) {
@@ -85,7 +85,7 @@ public:
*/
void erase(api::StorageMessage::Id msgId);
- [[nodiscard]] DistributorMessageSender& sender() noexcept { return _sender; }
+ [[nodiscard]] DistributorStripeMessageSender& sender() noexcept { return _sender; }
void onClose();
uint32_t size() const { return _sentMessageMap.size(); }
@@ -93,7 +93,7 @@ public:
private:
SentMessageMap _sentMessageMap;
- DistributorMessageSender& _sender;
+ DistributorStripeMessageSender& _sender;
const framework::Clock& _clock;
};
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 61bdcd4444d..94bddc4d0c4 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -68,7 +68,7 @@ GetOperation::GetOperation(DistributorNodeContext& node_ctx,
}
void
-GetOperation::onClose(DistributorMessageSender& sender)
+GetOperation::onClose(DistributorStripeMessageSender& sender)
{
_returnCode = api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down");
sendReply(sender);
@@ -99,7 +99,7 @@ GetOperation::findBestUnsentTarget(const GroupVector& candidates) const
}
bool
-GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res)
+GetOperation::sendForChecksum(DistributorStripeMessageSender& sender, const document::BucketId& id, GroupVector& res)
{
const int best = findBestUnsentTarget(res);
@@ -122,7 +122,7 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::
}
void
-GetOperation::onStart(DistributorMessageSender& sender)
+GetOperation::onStart(DistributorStripeMessageSender& sender)
{
// Send one request for each unique group (BucketId/checksum)
bool sent = false;
@@ -138,7 +138,7 @@ GetOperation::onStart(DistributorMessageSender& sender)
};
void
-GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
+GetOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
auto* getreply = dynamic_cast<api::GetReply*>(msg.get());
assert(getreply != nullptr);
@@ -225,7 +225,7 @@ void GetOperation::update_internal_metrics() {
}
void
-GetOperation::sendReply(DistributorMessageSender& sender)
+GetOperation::sendReply(DistributorStripeMessageSender& sender)
{
if (_msg.get()) {
const auto newest = _newest_replica.value_or(NewestReplica::make_empty());
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index cc6fd6680e0..18309643754 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -33,9 +33,9 @@ public:
PersistenceOperationMetricSet& metric,
api::InternalReadConsistency desired_read_consistency = api::InternalReadConsistency::Strong);
- void onClose(DistributorMessageSender& sender) override;
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
+ void onClose(DistributorStripeMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
const char* getName() const override { return "get"; }
std::string getStatus() const override { return ""; }
@@ -114,8 +114,8 @@ private:
bool _has_replica_inconsistency;
bool _any_replicas_failed;
- void sendReply(DistributorMessageSender& sender);
- bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res);
+ void sendReply(DistributorStripeMessageSender& sender);
+ bool sendForChecksum(DistributorStripeMessageSender& sender, const document::BucketId& id, GroupVector& res);
void assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard);
bool copyIsOnLocalNode(const BucketCopy&) const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index de36bb60b2c..1bf81278918 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -163,7 +163,7 @@ bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTarge
}
void
-PutOperation::onStart(DistributorMessageSender& sender)
+PutOperation::onStart(DistributorStripeMessageSender& sender)
{
document::BucketIdFactory bucketIdFactory;
document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId());
@@ -268,14 +268,14 @@ PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets
}
void
-PutOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
+PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
{
LOG(debug, "Received %s", msg->toString(true).c_str());
_tracker.receiveReply(sender, static_cast<api::BucketInfoReply&>(*msg));
}
void
-PutOperation::onClose(DistributorMessageSender& sender)
+PutOperation::onClose(DistributorStripeMessageSender& sender)
{
const char* error = "Process is shutting down";
LOG(debug, "%s", error);
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index c9cfc08d63d..fa793e578d2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -30,11 +30,11 @@ public:
PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle = SequencingHandle());
- void onStart(DistributorMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
const char* getName() const override { return "put"; };
std::string getStatus() const override { return ""; };
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
- void onClose(DistributorMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void onClose(DistributorStripeMessageSender& sender) override;
static void getTargetNodes(const std::vector<uint16_t>& idealNodes, std::vector<uint16_t>& targetNodes,
std::vector<uint16_t>& createNodes, const BucketInfo& bucketInfo, uint32_t redundancy);
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
index 04e64703c19..1d6b0fed6f9 100644
--- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp
@@ -29,11 +29,11 @@ ReadForWriteVisitorOperationStarter::ReadForWriteVisitorOperationStarter(
ReadForWriteVisitorOperationStarter::~ReadForWriteVisitorOperationStarter() = default;
-void ReadForWriteVisitorOperationStarter::onClose(DistributorMessageSender& sender) {
+void ReadForWriteVisitorOperationStarter::onClose(DistributorStripeMessageSender& sender) {
_visitor_op->onClose(sender);
}
-void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& sender) {
+void ReadForWriteVisitorOperationStarter::onStart(DistributorStripeMessageSender& sender) {
if (_visitor_op->verify_command_and_expand_buckets(sender)) {
assert(!_visitor_op->has_sent_reply());
auto maybe_bucket = _visitor_op->first_bucket_to_visit();
@@ -78,7 +78,7 @@ void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& send
}
}
-void ReadForWriteVisitorOperationStarter::onReceive(DistributorMessageSender& sender,
+void ReadForWriteVisitorOperationStarter::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg) {
_visitor_op->onReceive(sender, msg);
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
index e9391f9f133..28474bd52f1 100644
--- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h
@@ -42,9 +42,9 @@ public:
~ReadForWriteVisitorOperationStarter() override;
const char* getName() const override { return "ReadForWriteVisitorOperationStarter"; }
- void onClose(DistributorMessageSender& sender) override;
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender,
+ void onClose(DistributorStripeMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg) override;
private:
bool bucket_has_pending_merge(const document::Bucket&, const PendingMessageTracker& tracker) const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
index f8609dedde4..26aca41eaa2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
@@ -60,7 +60,7 @@ RemoveLocationOperation::getBucketId(
}
void
-RemoveLocationOperation::onStart(DistributorMessageSender& sender)
+RemoveLocationOperation::onStart(DistributorStripeMessageSender& sender)
{
document::BucketId bid;
int count = getBucketId(_node_ctx, _parser, *_msg, bid);
@@ -108,14 +108,14 @@ RemoveLocationOperation::onStart(DistributorMessageSender& sender)
void
RemoveLocationOperation::onReceive(
- DistributorMessageSender& sender,
+ DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg)
{
_tracker.receiveReply(sender, static_cast<api::BucketInfoReply&>(*msg));
}
void
-RemoveLocationOperation::onClose(DistributorMessageSender& sender)
+RemoveLocationOperation::onClose(DistributorStripeMessageSender& sender)
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED,
"Process is shutting down"));
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
index bf09a95933f..a635f7b697c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
@@ -28,11 +28,11 @@ public:
DocumentSelectionParser& parser,
const api::RemoveLocationCommand& cmd,
document::BucketId& id);
- void onStart(DistributorMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
const char* getName() const override { return "removelocation"; };
std::string getStatus() const override { return ""; };
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
- void onClose(DistributorMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void onClose(DistributorStripeMessageSender& sender) override;
private:
PersistenceMessageTrackerImpl _trackerInstance;
PersistenceMessageTracker& _tracker;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 25c73d88e37..07f37680ad2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -32,7 +32,7 @@ RemoveOperation::RemoveOperation(DistributorNodeContext& node_ctx,
RemoveOperation::~RemoveOperation() = default;
void
-RemoveOperation::onStart(DistributorMessageSender& sender)
+RemoveOperation::onStart(DistributorStripeMessageSender& sender)
{
LOG(spam, "Started remove on document %s", _msg->getDocumentId().toString().c_str());
@@ -79,7 +79,7 @@ RemoveOperation::onStart(DistributorMessageSender& sender)
void
-RemoveOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
+RemoveOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
{
api::RemoveReply& reply(static_cast<api::RemoveReply&>(*msg));
@@ -96,7 +96,7 @@ RemoveOperation::onReceive(DistributorMessageSender& sender, const std::shared_p
}
void
-RemoveOperation::onClose(DistributorMessageSender& sender)
+RemoveOperation::onClose(DistributorStripeMessageSender& sender)
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 32eb5bd3d70..2d3bea157fa 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -23,12 +23,12 @@ public:
SequencingHandle sequencingHandle = SequencingHandle());
~RemoveOperation() override;
- void onStart(DistributorMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
const char* getName() const override { return "remove"; };
std::string getStatus() const override { return ""; };
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
- void onClose(DistributorMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void onClose(DistributorStripeMessageSender& sender) override;
private:
PersistenceMessageTrackerImpl _trackerInstance;
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp
index 4b7cff41ad1..12d1cc3f216 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp
@@ -40,7 +40,7 @@ StatBucketListOperation::getBucketStatus(const BucketDatabase::Entry& entry,
}
void
-StatBucketListOperation::onStart(DistributorMessageSender& sender)
+StatBucketListOperation::onStart(DistributorStripeMessageSender& sender)
{
api::GetBucketListReply::SP reply(new api::GetBucketListReply(*_command));
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.h b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.h
index aa38a0d2319..831ebe0f9ce 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.h
@@ -26,13 +26,13 @@ public:
const char* getName() const override { return "statBucketList"; }
std::string getStatus() const override { return ""; }
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&) override
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override
{
// Never called.
HDR_ABORT("should not be reached");
}
- void onClose(DistributorMessageSender&) override {}
+ void onClose(DistributorStripeMessageSender&) override {}
private:
void getBucketStatus(const BucketDatabase::Entry& entry, std::ostream& os) const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
index d0fdd539b72..9c97f12b89a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
@@ -22,7 +22,7 @@ StatBucketOperation::StatBucketOperation(
StatBucketOperation::~StatBucketOperation() = default;
void
-StatBucketOperation::onClose(DistributorMessageSender& sender)
+StatBucketOperation::onClose(DistributorStripeMessageSender& sender)
{
api::StatBucketReply* rep = (api::StatBucketReply*)_command->makeReply().release();
rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
@@ -30,7 +30,7 @@ StatBucketOperation::onClose(DistributorMessageSender& sender)
}
void
-StatBucketOperation::onStart(DistributorMessageSender& sender)
+StatBucketOperation::onStart(DistributorStripeMessageSender& sender)
{
std::vector<uint16_t> nodes;
@@ -68,7 +68,7 @@ StatBucketOperation::onStart(DistributorMessageSender& sender)
};
void
-StatBucketOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
+StatBucketOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
{
assert(msg->getType() == api::MessageType::STATBUCKET_REPLY);
api::StatBucketReply& myreply(dynamic_cast<api::StatBucketReply&>(*msg));
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h
index beb9e9c3445..d0c299d88bc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h
@@ -25,9 +25,9 @@ public:
const char* getName() const override { return "statBucket"; }
std::string getStatus() const override { return ""; }
- void onClose(DistributorMessageSender& sender) override;
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
+ void onClose(DistributorStripeMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
private:
DistributorBucketSpace &_bucketSpace;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 1f8da7a0589..f0c15935b81 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -60,13 +60,13 @@ TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() = default;
namespace {
-struct IntermediateMessageSender : DistributorMessageSender {
+struct IntermediateMessageSender : DistributorStripeMessageSender {
SentMessageMap& msgMap;
std::shared_ptr<Operation> callback;
- DistributorMessageSender& forward;
+ DistributorStripeMessageSender& forward;
std::shared_ptr<api::StorageReply> _reply;
- IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorMessageSender & fwd);
+ IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorStripeMessageSender & fwd);
~IntermediateMessageSender() override;
void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override {
@@ -97,7 +97,7 @@ struct IntermediateMessageSender : DistributorMessageSender {
IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm,
std::shared_ptr<Operation> cb,
- DistributorMessageSender & fwd)
+ DistributorStripeMessageSender & fwd)
: msgMap(mm),
callback(std::move(cb)),
forward(fwd)
@@ -141,7 +141,7 @@ TwoPhaseUpdateOperation::ensureUpdateReplyCreated()
void
TwoPhaseUpdateOperation::sendReply(
- DistributorMessageSender& sender,
+ DistributorStripeMessageSender& sender,
std::shared_ptr<api::StorageReply>& reply)
{
assert(!_replySent);
@@ -152,7 +152,7 @@ TwoPhaseUpdateOperation::sendReply(
void
TwoPhaseUpdateOperation::sendReplyWithResult(
- DistributorMessageSender& sender,
+ DistributorStripeMessageSender& sender,
const api::ReturnCode& result)
{
ensureUpdateReplyCreated();
@@ -179,7 +179,7 @@ TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::En
}
void
-TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender, std::vector<BucketDatabase::Entry> entries)
+TwoPhaseUpdateOperation::startFastPathUpdate(DistributorStripeMessageSender& sender, std::vector<BucketDatabase::Entry> entries)
{
_mode = Mode::FAST_PATH;
LOG(debug, "Update(%s) fast path: sending Update commands", update_doc_id().c_str());
@@ -196,7 +196,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender, s
}
void
-TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
+TwoPhaseUpdateOperation::startSafePathUpdate(DistributorStripeMessageSender& sender)
{
if (_op_ctx.cluster_state_bundle().block_feed_in_cluster()) {
send_feed_blocked_error_reply(sender);
@@ -248,7 +248,7 @@ TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() {
}
void
-TwoPhaseUpdateOperation::onStart(DistributorMessageSender& sender) {
+TwoPhaseUpdateOperation::onStart(DistributorStripeMessageSender& sender) {
auto entries = get_bucket_database_entries();
if (isFastPathPossible(entries)) {
startFastPathUpdate(sender, std::move(entries));
@@ -274,7 +274,7 @@ TwoPhaseUpdateOperation::lostBucketOwnershipBetweenPhases() const
}
void
-TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorMessageSender& sender)
+TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorStripeMessageSender& sender)
{
sendReplyWithResult(sender,
api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
@@ -284,7 +284,7 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorMessage
}
void
-TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorMessageSender& sender)
+TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorStripeMessageSender& sender)
{
sendReplyWithResult(sender,
api::ReturnCode(api::ReturnCode::NO_SPACE,
@@ -294,7 +294,7 @@ TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorMessageSender&
void
TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc,
- api::Timestamp putTimestamp, DistributorMessageSender& sender)
+ api::Timestamp putTimestamp, DistributorStripeMessageSender& sender)
{
if (lostBucketOwnershipBetweenPhases()) {
sendLostOwnershipTransientErrorReply(sender);
@@ -318,7 +318,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
}
void
-TwoPhaseUpdateOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
+TwoPhaseUpdateOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
if (_mode == Mode::FAST_PATH) {
handleFastPathReceive(sender, msg);
@@ -328,7 +328,7 @@ TwoPhaseUpdateOperation::onReceive(DistributorMessageSender& sender, const std::
}
void
-TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
+TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& msg)
{
if (msg->getType() == api::MessageType::GET_REPLY) {
@@ -396,7 +396,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
}
void
-TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
+TwoPhaseUpdateOperation::handleSafePathReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& msg)
{
// No explicit operation is associated with the direct replica Get operation,
@@ -434,7 +434,7 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
}
void TwoPhaseUpdateOperation::handle_safe_path_received_single_full_get(
- DistributorMessageSender& sender,
+ DistributorStripeMessageSender& sender,
api::GetReply& reply)
{
LOG(spam, "Received single full Get reply for '%s'", update_doc_id().c_str());
@@ -453,7 +453,7 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_single_full_get(
}
void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
- DistributorMessageSender& sender, api::GetReply& reply,
+ DistributorStripeMessageSender& sender, api::GetReply& reply,
const std::optional<NewestReplica>& newest_replica,
bool any_replicas_failed)
{
@@ -511,7 +511,7 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
}
void
-TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sender, api::GetReply& reply)
+TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSender& sender, api::GetReply& reply)
{
LOG(debug, "Update(%s): got Get reply with code %s",
_updateCmd->getDocumentId().toString().c_str(),
@@ -585,7 +585,7 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
return (replicas_in_db_now == _replicas_at_get_send_time);
}
-void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) {
+void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender) {
LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
update_doc_id().c_str());
if (lostBucketOwnershipBetweenPhases()) {
@@ -600,7 +600,7 @@ void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_times
}
bool
-TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorMessageSender& sender,
+TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorStripeMessageSender& sender,
const document::Document& candidateDoc)
{
if (!hasTasCondition()) {
@@ -631,7 +631,7 @@ TwoPhaseUpdateOperation::hasTasCondition() const noexcept
}
void
-TwoPhaseUpdateOperation::replyWithTasFailure(DistributorMessageSender& sender, vespalib::stringref message)
+TwoPhaseUpdateOperation::replyWithTasFailure(DistributorStripeMessageSender& sender, vespalib::stringref message)
{
sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, message));
}
@@ -651,7 +651,7 @@ TwoPhaseUpdateOperation::createBlankDocument() const
}
void
-TwoPhaseUpdateOperation::handleSafePathReceivedPut(DistributorMessageSender& sender, const api::PutReply& reply)
+TwoPhaseUpdateOperation::handleSafePathReceivedPut(DistributorStripeMessageSender& sender, const api::PutReply& reply)
{
sendReplyWithResult(sender, reply.getResult());
}
@@ -681,7 +681,7 @@ TwoPhaseUpdateOperation::addTraceFromReply(api::StorageReply & reply)
}
void
-TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) {
+TwoPhaseUpdateOperation::onClose(DistributorStripeMessageSender& sender) {
while (true) {
std::shared_ptr<Operation> cb = _sentMessageMap.pop();
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 7817eb7bffd..32b15cc7edf 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -64,16 +64,16 @@ public:
SequencingHandle sequencingHandle = SequencingHandle());
~TwoPhaseUpdateOperation() override;
- void onStart(DistributorMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
const char* getName() const override { return "twophaseupdate"; }
std::string getStatus() const override { return ""; }
- void onReceive(DistributorMessageSender&,
+ void onReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>&) override;
- void onClose(DistributorMessageSender& sender) override;
+ void onClose(DistributorStripeMessageSender& sender) override;
private:
enum class SendState {
@@ -93,49 +93,49 @@ private:
void transitionTo(SendState newState);
static const char* stateToString(SendState);
- void sendReply(DistributorMessageSender&,
+ void sendReply(DistributorStripeMessageSender&,
std::shared_ptr<api::StorageReply>&);
- void sendReplyWithResult(DistributorMessageSender&, const api::ReturnCode&);
+ void sendReplyWithResult(DistributorStripeMessageSender&, const api::ReturnCode&);
void ensureUpdateReplyCreated();
std::vector<BucketDatabase::Entry> get_bucket_database_entries() const;
bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const;
- void startFastPathUpdate(DistributorMessageSender& sender, std::vector<BucketDatabase::Entry> entries);
- void startSafePathUpdate(DistributorMessageSender&);
+ void startFastPathUpdate(DistributorStripeMessageSender& sender, std::vector<BucketDatabase::Entry> entries);
+ void startSafePathUpdate(DistributorStripeMessageSender&);
bool lostBucketOwnershipBetweenPhases() const;
- void sendLostOwnershipTransientErrorReply(DistributorMessageSender&);
- void send_feed_blocked_error_reply(DistributorMessageSender& sender);
+ void sendLostOwnershipTransientErrorReply(DistributorStripeMessageSender&);
+ void send_feed_blocked_error_reply(DistributorStripeMessageSender& sender);
void schedulePutsWithUpdatedDocument(
std::shared_ptr<document::Document>,
api::Timestamp,
- DistributorMessageSender&);
+ DistributorStripeMessageSender&);
void applyUpdateToDocument(document::Document&) const;
std::shared_ptr<document::Document> createBlankDocument() const;
void setUpdatedForTimestamp(api::Timestamp);
- void handleFastPathReceive(DistributorMessageSender&,
+ void handleFastPathReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>&);
- void handleSafePathReceive(DistributorMessageSender&,
+ void handleSafePathReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>&);
std::shared_ptr<GetOperation> create_initial_safe_path_get_operation();
- void handle_safe_path_received_metadata_get(DistributorMessageSender&,
+ void handle_safe_path_received_metadata_get(DistributorStripeMessageSender&,
api::GetReply&,
const std::optional<NewestReplica>&,
bool any_replicas_failed);
- void handle_safe_path_received_single_full_get(DistributorMessageSender&, api::GetReply&);
- void handleSafePathReceivedGet(DistributorMessageSender&, api::GetReply&);
- void handleSafePathReceivedPut(DistributorMessageSender&, const api::PutReply&);
+ void handle_safe_path_received_single_full_get(DistributorStripeMessageSender&, api::GetReply&);
+ void handleSafePathReceivedGet(DistributorStripeMessageSender&, api::GetReply&);
+ void handleSafePathReceivedPut(DistributorStripeMessageSender&, const api::PutReply&);
bool shouldCreateIfNonExistent() const;
bool processAndMatchTasCondition(
- DistributorMessageSender& sender,
+ DistributorStripeMessageSender& sender,
const document::Document& candidateDoc);
bool satisfiesUpdateTimestampConstraint(api::Timestamp) const;
void addTraceFromReply(api::StorageReply& reply);
bool hasTasCondition() const noexcept;
- void replyWithTasFailure(DistributorMessageSender& sender,
+ void replyWithTasFailure(DistributorStripeMessageSender& sender,
vespalib::stringref message);
bool may_restart_with_fast_path(const api::GetReply& reply);
bool replica_set_unchanged_after_get_operation() const;
- void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender);
+ void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender);
// Precondition: reply has not yet been sent.
vespalib::string update_doc_id() const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 4530d7b2864..69baf9df452 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -60,7 +60,7 @@ UpdateOperation::anyStorageNodesAvailable() const
}
void
-UpdateOperation::onStart(DistributorMessageSender& sender)
+UpdateOperation::onStart(DistributorStripeMessageSender& sender)
{
LOG(debug, "Received UPDATE %s for bucket %" PRIx64,
_msg->getDocumentId().toString().c_str(),
@@ -123,7 +123,7 @@ UpdateOperation::onStart(DistributorMessageSender& sender)
};
void
-UpdateOperation::onReceive(DistributorMessageSender& sender,
+UpdateOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg)
{
auto& reply = static_cast<api::UpdateReply&>(*msg);
@@ -186,7 +186,7 @@ UpdateOperation::onReceive(DistributorMessageSender& sender,
}
void
-UpdateOperation::onClose(DistributorMessageSender& sender)
+UpdateOperation::onClose(DistributorStripeMessageSender& sender)
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 4e875b27133..7da6f48c333 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -30,11 +30,11 @@ public:
std::vector<BucketDatabase::Entry> entries,
UpdateMetricSet& metric);
- void onStart(DistributorMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
const char* getName() const override { return "update"; };
std::string getStatus() const override { return ""; };
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
- void onClose(DistributorMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
+ void onClose(DistributorStripeMessageSender& sender) override;
std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const {
return _newestTimestampLocation;
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index b4ad16f3323..06f43b8759e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -190,7 +190,7 @@ VisitorOperation::markOperationAsFailedDueToNodeError(
void
VisitorOperation::onReceive(
- DistributorMessageSender& sender,
+ DistributorStripeMessageSender& sender,
const api::StorageReply::SP& r)
{
api::CreateVisitorReply& reply = static_cast<api::CreateVisitorReply&>(*r);
@@ -348,7 +348,7 @@ VisitorOperation::verifyOperationSentToCorrectDistributor()
}
bool
-VisitorOperation::verifyCreateVisitorCommand(DistributorMessageSender& sender)
+VisitorOperation::verifyCreateVisitorCommand(DistributorStripeMessageSender& sender)
{
try {
verifyOperationContainsBuckets();
@@ -587,7 +587,7 @@ VisitorOperation::pickTargetNode(
}
void
-VisitorOperation::onStart(DistributorMessageSender& sender)
+VisitorOperation::onStart(DistributorStripeMessageSender& sender)
{
if (!_verified_and_expanded) {
if (!verify_command_and_expand_buckets(sender)) {
@@ -598,7 +598,7 @@ VisitorOperation::onStart(DistributorMessageSender& sender)
}
bool
-VisitorOperation::verify_command_and_expand_buckets(DistributorMessageSender& sender)
+VisitorOperation::verify_command_and_expand_buckets(DistributorStripeMessageSender& sender)
{
assert(!_verified_and_expanded);
_verified_and_expanded = true;
@@ -636,7 +636,7 @@ VisitorOperation::maySendNewStorageVisitors() const noexcept
}
void
-VisitorOperation::startNewVisitors(DistributorMessageSender& sender)
+VisitorOperation::startNewVisitors(DistributorStripeMessageSender& sender)
{
LOG(spam,
"Starting new visitors: Superbucket: %s, last subbucket: %s",
@@ -764,7 +764,7 @@ VisitorOperation::getNumVisitorsToSendForNode(uint16_t node, uint32_t totalBucke
bool
VisitorOperation::sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap,
- DistributorMessageSender& sender)
+ DistributorStripeMessageSender& sender)
{
bool visitorsSent = false;
for (const auto & entry : nodeToBucketsMap ) {
@@ -800,7 +800,7 @@ void
VisitorOperation::sendStorageVisitor(uint16_t node,
const std::vector<document::BucketId>& buckets,
uint32_t pending,
- DistributorMessageSender& sender)
+ DistributorStripeMessageSender& sender)
{
// TODO rewrite to not use copy ctor and remove wonky StorageCommand copy ctor impl
auto cmd = std::make_shared<api::CreateVisitorCommand>(*_msg);
@@ -839,7 +839,7 @@ VisitorOperation::sendStorageVisitor(uint16_t node,
}
void
-VisitorOperation::sendReply(const api::ReturnCode& code, DistributorMessageSender& sender)
+VisitorOperation::sendReply(const api::ReturnCode& code, DistributorStripeMessageSender& sender)
{
if (!_sentReply) {
// Send create visitor reply
@@ -880,20 +880,20 @@ VisitorOperation::updateReplyMetrics(const api::ReturnCode& result)
}
void
-VisitorOperation::onClose(DistributorMessageSender& sender)
+VisitorOperation::onClose(DistributorStripeMessageSender& sender)
{
sendReply(api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"), sender);
}
void
-VisitorOperation::fail_with_bucket_already_locked(DistributorMessageSender& sender)
+VisitorOperation::fail_with_bucket_already_locked(DistributorStripeMessageSender& sender)
{
assert(is_read_for_write());
sendReply(api::ReturnCode(api::ReturnCode::BUSY, "This bucket is already locked by another operation"), sender);
}
void
-VisitorOperation::fail_with_merge_pending(DistributorMessageSender& sender)
+VisitorOperation::fail_with_merge_pending(DistributorStripeMessageSender& sender)
{
assert(is_read_for_write());
sendReply(api::ReturnCode(api::ReturnCode::BUSY, "A merge operation is pending for this bucket"), sender);
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
index 794436a28e6..6fdee031549 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
@@ -44,16 +44,16 @@ public:
~VisitorOperation() override;
- void onClose(DistributorMessageSender& sender) override;
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender,
+ void onClose(DistributorStripeMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg) override;
// Only valid to call if is_read_for_write() == true
- void fail_with_bucket_already_locked(DistributorMessageSender& sender);
- void fail_with_merge_pending(DistributorMessageSender& sender);
+ void fail_with_bucket_already_locked(DistributorStripeMessageSender& sender);
+ void fail_with_merge_pending(DistributorStripeMessageSender& sender);
- [[nodiscard]] bool verify_command_and_expand_buckets(DistributorMessageSender& sender);
+ [[nodiscard]] bool verify_command_and_expand_buckets(DistributorStripeMessageSender& sender);
const char* getName() const override { return "visit"; }
std::string getStatus() const override { return ""; }
@@ -97,7 +97,7 @@ private:
using NodeToBucketsMap = std::map<uint16_t, std::vector<document::BucketId>>;
using SentMessagesMap = std::map<uint64_t, api::CreateVisitorCommand::SP>;
- void sendReply(const api::ReturnCode& code, DistributorMessageSender& sender);
+ void sendReply(const api::ReturnCode& code, DistributorStripeMessageSender& sender);
void updateReplyMetrics(const api::ReturnCode& result);
void verifyDistributorsAreAvailable();
void verifyVisitorDistributionBitCount(const document::BucketId&);
@@ -106,7 +106,7 @@ private:
void verifyOperationContainsBuckets();
void verifyOperationHasSuperbucketAndProgress();
void verifyOperationSentToCorrectDistributor();
- bool verifyCreateVisitorCommand(DistributorMessageSender& sender);
+ bool verifyCreateVisitorCommand(DistributorStripeMessageSender& sender);
bool pickBucketsToVisit(const std::vector<BucketDatabase::Entry>& buckets);
bool expandBucketContaining();
bool expandBucketContained();
@@ -115,7 +115,7 @@ private:
const BucketDatabase::Entry& entry,
const std::vector<uint16_t>& triedNodes);
bool maySendNewStorageVisitors() const noexcept;
- void startNewVisitors(DistributorMessageSender& sender);
+ void startNewVisitors(DistributorStripeMessageSender& sender);
void initializeActiveNodes();
bool shouldSkipBucket(const BucketInfo& bucketInfo) const;
bool bucketIsValidAndConsistent(const BucketDatabase::Entry& entry) const;
@@ -125,11 +125,11 @@ private:
int getNumVisitorsToSendForNode(uint16_t node, uint32_t totalBucketsOnNode) const;
vespalib::duration computeVisitorQueueTimeoutMs() const noexcept;
bool sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap,
- DistributorMessageSender& sender);
+ DistributorStripeMessageSender& sender);
void sendStorageVisitor(uint16_t node,
const std::vector<document::BucketId>& buckets,
uint32_t pending,
- DistributorMessageSender& sender);
+ DistributorStripeMessageSender& sender);
void markCompleted(const document::BucketId& bid, const api::ReturnCode& code);
/**
* Operation failed and we can pin the blame on a specific node. Updates
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index c9e983d4284..f611968b481 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -21,7 +21,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
GarbageCollectionOperation::~GarbageCollectionOperation() = default;
-void GarbageCollectionOperation::onStart(DistributorMessageSender& sender) {
+void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) {
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
std::vector<uint16_t> nodes = entry->getNodes();
@@ -42,7 +42,7 @@ void GarbageCollectionOperation::onStart(DistributorMessageSender& sender) {
}
void
-GarbageCollectionOperation::onReceive(DistributorMessageSender&,
+GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>& reply)
{
auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 545dd10b539..01ef607912c 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -17,8 +17,8 @@ public:
const BucketAndNodes& nodes);
~GarbageCollectionOperation() override;
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "garbagecollection"; };
Type getType() const override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index dcdc2f32374..7906150d0cb 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -114,7 +114,7 @@ public:
virtual ~IdealStateOperation();
- void onClose(DistributorMessageSender&) override {}
+ void onClose(DistributorStripeMessageSender&) override {}
/**
Returns true if the operation was performed successfully.
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index c2c43f86c42..d9e411bc44e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -21,7 +21,7 @@ JoinOperation::JoinOperation(const ClusterContext &clusterName,
JoinOperation::~JoinOperation() = default;
void
-JoinOperation::onStart(DistributorMessageSender& sender)
+JoinOperation::onStart(DistributorStripeMessageSender& sender)
{
_ok = false;
@@ -96,7 +96,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
}
void
-JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& msg)
+JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
api::JoinBucketsReply& rep = static_cast<api::JoinBucketsReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
index ad133a937e4..5796b8d3fa1 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
@@ -22,9 +22,9 @@ public:
~JoinOperation() override;
- void onStart(DistributorMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender,
+ void onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>&) override;
const char* getName() const override {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index afb806e903a..481506096eb 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -106,7 +106,7 @@ struct NodeIndexComparator
}
void
-MergeOperation::onStart(DistributorMessageSender& sender)
+MergeOperation::onStart(DistributorStripeMessageSender& sender)
{
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
if (!entry.valid()) {
@@ -209,7 +209,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge(
void
MergeOperation::deleteSourceOnlyNodes(
const BucketDatabase::Entry& currentState,
- DistributorMessageSender& sender)
+ DistributorStripeMessageSender& sender)
{
assert(currentState.valid());
std::vector<uint16_t> sourceOnlyNodes;
@@ -253,7 +253,7 @@ MergeOperation::deleteSourceOnlyNodes(
}
void
-MergeOperation::onReceive(DistributorMessageSender& sender,
+MergeOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg)
{
if (_removeOperation.get()) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index a5f7d352eea..5df9421e815 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -34,8 +34,8 @@ public:
~MergeOperation();
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender, const api::StorageReply::SP&) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const api::StorageReply::SP&) override;
const char* getName() const override { return "merge"; };
std::string getStatus() const override;
Type getType() const override { return MERGE_BUCKET; }
@@ -60,7 +60,7 @@ private:
std::vector<MergeMetaData>& result);
void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState,
- DistributorMessageSender& sender);
+ DistributorStripeMessageSender& sender);
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 6b06657d713..f6458bc0522 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -12,7 +12,7 @@ LOG_SETUP(".distributor.operation.idealstate.remove");
using namespace storage::distributor;
bool
-RemoveBucketOperation::onStartInternal(DistributorMessageSender& sender)
+RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender)
{
std::vector<std::pair<uint16_t, std::shared_ptr<api::DeleteBucketCommand> > > msgs;
@@ -51,7 +51,7 @@ RemoveBucketOperation::onStartInternal(DistributorMessageSender& sender)
void
-RemoveBucketOperation::onStart(DistributorMessageSender& sender)
+RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender)
{
if (onStartInternal(sender)) {
done();
@@ -104,7 +104,7 @@ RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply
void
-RemoveBucketOperation::onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply> &msg)
+RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply> &msg)
{
if (onReceiveInternal(msg)) {
done();
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
index 5b79a465f4e..a9d7f4ebf04 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
@@ -18,16 +18,16 @@ public:
/**
Sends messages, returns true if we are done (sent nothing).
*/
- bool onStartInternal(DistributorMessageSender& sender);
+ bool onStartInternal(DistributorStripeMessageSender& sender);
/**
Sends messages, calls done() if we are done (sent nothing).
*/
- void onStart(DistributorMessageSender& sender) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
bool onReceiveInternal(const std::shared_ptr<api::StorageReply> &);
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "remove"; };
Type getType() const override { return DELETE_BUCKET; }
bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
index d244521140a..88e53f7da06 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
@@ -43,7 +43,7 @@ SetBucketStateOperation::shouldBeActive(uint16_t node) const
}
void
-SetBucketStateOperation::activateNode(DistributorMessageSender& sender) {
+SetBucketStateOperation::activateNode(DistributorStripeMessageSender& sender) {
for (uint32_t i=0; i<_wantedActiveNodes.size(); ++i) {
enqueueSetBucketStateCommand(_wantedActiveNodes[i], true);
}
@@ -53,7 +53,7 @@ SetBucketStateOperation::activateNode(DistributorMessageSender& sender) {
void
-SetBucketStateOperation::deactivateNodes(DistributorMessageSender& sender) {
+SetBucketStateOperation::deactivateNodes(DistributorStripeMessageSender& sender) {
const std::vector<uint16_t>& nodes(getNodes());
for (size_t i = 0; i < nodes.size(); ++i) {
if (!shouldBeActive(nodes[i])) {
@@ -64,13 +64,13 @@ SetBucketStateOperation::deactivateNodes(DistributorMessageSender& sender) {
}
void
-SetBucketStateOperation::onStart(DistributorMessageSender& sender)
+SetBucketStateOperation::onStart(DistributorStripeMessageSender& sender)
{
activateNode(sender);
}
void
-SetBucketStateOperation::onReceive(DistributorMessageSender& sender,
+SetBucketStateOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply)
{
api::SetBucketStateReply& rep(
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h
index 1c818f9198d..5c13aaf5c05 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h
@@ -14,8 +14,8 @@ public:
const std::vector<uint16_t>& wantedActiveNodes);
~SetBucketStateOperation() override;
- void onStart(DistributorMessageSender&) override;
- void onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&) override;
+ void onStart(DistributorStripeMessageSender&) override;
+ void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override;
const char* getName() const override { return "setbucketstate"; }
Type getType() const override { return SET_BUCKET_STATE; }
protected:
@@ -24,8 +24,8 @@ protected:
private:
void enqueueSetBucketStateCommand(uint16_t node, bool active);
- void activateNode(DistributorMessageSender& sender);
- void deactivateNodes(DistributorMessageSender& sender);
+ void activateNode(DistributorStripeMessageSender& sender);
+ void deactivateNodes(DistributorStripeMessageSender& sender);
bool shouldBeActive(uint16_t node) const;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index a75e954c118..437c4ed6033 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -22,7 +22,7 @@ SplitOperation::SplitOperation(const ClusterContext &cluster_ctx, const BucketAn
SplitOperation::~SplitOperation() = default;
void
-SplitOperation::onStart(DistributorMessageSender& sender)
+SplitOperation::onStart(DistributorStripeMessageSender& sender)
{
_ok = false;
@@ -49,7 +49,7 @@ SplitOperation::onStart(DistributorMessageSender& sender)
}
void
-SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& msg)
+SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
api::SplitBucketReply& rep = static_cast<api::SplitBucketReply&>(*msg);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
index 1bb82c2a39e..eccbdc69869 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
@@ -16,8 +16,8 @@ public:
SplitOperation & operator = (const SplitOperation &) = delete;
~SplitOperation();
- void onStart(DistributorMessageSender& sender) override;
- void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void onStart(DistributorStripeMessageSender& sender) override;
+ void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "split"; };
Type getType() const override { return SPLIT_BUCKET; }
bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override;
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index ee695dae606..bcc9a36b010 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -26,7 +26,7 @@ Operation::getStatus() const
}
void
-Operation::start(DistributorMessageSender& sender,
+Operation::start(DistributorStripeMessageSender& sender,
framework::MilliSecTime startTime)
{
_startTime = startTime;
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index e9320817a8e..75d72a2b5c9 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -33,13 +33,13 @@ public:
Tell the callback that storage is shutting down. Reply to any pending
stuff.
*/
- virtual void onClose(DistributorMessageSender&) = 0;
+ virtual void onClose(DistributorStripeMessageSender&) = 0;
/**
When a reply has been received, the storagelink will call receive()
on the owner of the message that was replied to.
*/
- virtual void receive(DistributorMessageSender& sender,
+ virtual void receive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg)
{
onReceive(sender, msg);
@@ -56,7 +56,7 @@ public:
/**
Starts the callback, sending any messages etc. Sets _startTime to current time
*/
- virtual void start(DistributorMessageSender& sender, framework::MilliSecTime startTime);
+ virtual void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime);
/**
* Returns true if we are blocked to start this operation given
@@ -81,9 +81,9 @@ private:
/**
Implementation of start for the callback
*/
- virtual void onStart(DistributorMessageSender& sender) = 0;
+ virtual void onStart(DistributorStripeMessageSender& sender) = 0;
- virtual void onReceive(DistributorMessageSender& sender,
+ virtual void onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg) = 0;
protected:
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
index 0b6d89e0570..e25141be214 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.h
@@ -26,7 +26,7 @@ class ThrottlingOperationStarter : public OperationStarter
ThrottlingOperation(const ThrottlingOperation&);
ThrottlingOperation& operator=(const ThrottlingOperation&);
- void onClose(DistributorMessageSender& sender) override {
+ void onClose(DistributorStripeMessageSender& sender) override {
_operation->onClose(sender);
}
const char* getName() const override {
@@ -38,21 +38,21 @@ class ThrottlingOperationStarter : public OperationStarter
std::string toString() const override {
return _operation->toString();
}
- void start(DistributorMessageSender& sender, framework::MilliSecTime startTime) override {
+ void start(DistributorStripeMessageSender& sender, framework::MilliSecTime startTime) override {
_operation->start(sender, startTime);
}
- void receive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override {
+ void receive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override {
_operation->receive(sender, msg);
}
framework::MilliSecTime getStartTime() const {
return _operation->getStartTime();
}
- void onStart(DistributorMessageSender&) override {
+ void onStart(DistributorStripeMessageSender&) override {
// Should never be called directly on the throttled operation
// instance, but rather on its wrapped implementation.
HDR_ABORT("should not be reached");
}
- void onReceive(DistributorMessageSender&,
+ void onReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>&) override {
HDR_ABORT("should not be reached");
}