summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-10-13 11:28:16 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-10-13 11:28:16 +0000
commite65d3e92093c9f7c1e3211c990608e040d22148c (patch)
treefeb7a652a20e1550b5b30a561deae28ee0f8e00e /storage
parenteea8c949825803bc6413c42c99b2d34178a2ba6b (diff)
Abort priority-queued requests on distributor shutdown
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp23
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp31
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
3 files changed, 41 insertions, 15 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 12a4118aa08..d0941571a6a 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -50,6 +50,7 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker);
CPPUNIT_TEST(external_client_requests_are_handled_individually_in_priority_order);
CPPUNIT_TEST(internal_messages_are_started_in_fifo_order_batch);
+ CPPUNIT_TEST(closing_aborts_priority_queued_client_requests);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -81,6 +82,7 @@ protected:
void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker();
void external_client_requests_are_handled_individually_in_priority_order();
void internal_messages_are_started_in_fifo_order_batch();
+ void closing_aborts_priority_queued_client_requests();
public:
void setUp() override {
@@ -927,6 +929,27 @@ void Distributor_Test::internal_messages_are_started_in_fifo_order_batch() {
CPPUNIT_ASSERT_EQUAL(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo());
}
+void Distributor_Test::closing_aborts_priority_queued_client_requests() {
+ setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+ document::BucketId bucket(16, 1);
+ addNodesToBucketDB(bucket, "0=1/1/1/t");
+
+ document::DocumentId id("id:foo:testdoctype1:n=1:foo");
+ vespalib::stringref field_set = "";
+ for (int i = 0; i < 10; ++i) {
+ auto cmd = std::make_shared<api::GetCommand>(document::BucketId(), id, field_set);
+ _distributor->onDown(cmd);
+ }
+ tickDistributorNTimes(1);
+ // Closing should trigger 1 abort via startet GetOperation and 9 aborts from pri queue
+ _distributor->close();
+ CPPUNIT_ASSERT_EQUAL(size_t(10), _sender.replies.size());
+ for (auto& msg : _sender.replies) {
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED,
+ dynamic_cast<api::StorageReply&>(*msg).getResult().getResult());
+ }
+}
+
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 53df19fd10c..3dfe5103654 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -203,23 +203,26 @@ Distributor::onOpen()
}
}
-void
-Distributor::onClose()
-{
- for (uint32_t i=0; i<_messageQueue.size(); ++i) {
- std::shared_ptr<api::StorageMessage> msg = _messageQueue[i];
+void Distributor::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) {
+ api::StorageReply::UP reply(
+ std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down"));
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
+void Distributor::onClose() {
+ for (auto& msg : _messageQueue) {
if (!msg->getType().isReply()) {
- api::StorageReply::UP reply(
- std::dynamic_pointer_cast<api::StorageCommand>(msg)
- ->makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED,
- "Distributor is shutting down"));
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+ send_shutdown_abort_reply(msg);
}
}
_messageQueue.clear();
+ while (!_client_request_priority_queue.empty()) {
+ send_shutdown_abort_reply(_client_request_priority_queue.top());
+ _client_request_priority_queue.pop();
+ }
- LOG(debug, "Distributor::onFlush invoked");
+ LOG(debug, "Distributor::onClose invoked");
_bucketDBUpdater.flush();
_operationOwner.onClose();
_maintenanceOperationOwner.onClose();
@@ -619,10 +622,10 @@ void Distributor::startExternalOperations() {
const bool start_single_client_request = !_client_request_priority_queue.empty();
if (start_single_client_request) {
- auto& msg = _client_request_priority_queue.top();
+ const auto& msg = _client_request_priority_queue.top();
MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from "
"client request priority queue to be processed.");
- handle_or_propagate_message(msg); // TODO move() once we've move-enabled our message chains
+ handle_or_propagate_message(msg);
_client_request_priority_queue.pop();
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 438001acc40..5856aca71a1 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -187,11 +187,11 @@ private:
};
void setNodeStateUp();
-
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
bool isMaintenanceReply(const api::StorageReply& reply) const;
void handleStatusRequests();
+ void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&);
void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg);
void startExternalOperations();