diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-10-13 11:28:16 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-10-13 11:28:16 +0000 |
commit | e65d3e92093c9f7c1e3211c990608e040d22148c (patch) | |
tree | feb7a652a20e1550b5b30a561deae28ee0f8e00e /storage | |
parent | eea8c949825803bc6413c42c99b2d34178a2ba6b (diff) |
Abort priority-queued requests on distributor shutdown
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/tests/distributor/distributortest.cpp | 23 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/distributor.cpp | 31 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/distributor.h | 2 |
3 files changed, 41 insertions, 15 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 12a4118aa08..d0941571a6a 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -50,6 +50,7 @@ class Distributor_Test : public CppUnit::TestFixture, CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker); CPPUNIT_TEST(external_client_requests_are_handled_individually_in_priority_order); CPPUNIT_TEST(internal_messages_are_started_in_fifo_order_batch); + CPPUNIT_TEST(closing_aborts_priority_queued_client_requests); CPPUNIT_TEST_SUITE_END(); protected: @@ -81,6 +82,7 @@ protected: void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker(); void external_client_requests_are_handled_individually_in_priority_order(); void internal_messages_are_started_in_fifo_order_batch(); + void closing_aborts_priority_queued_client_requests(); public: void setUp() override { @@ -927,6 +929,27 @@ void Distributor_Test::internal_messages_are_started_in_fifo_order_batch() { CPPUNIT_ASSERT_EQUAL(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo()); } +void Distributor_Test::closing_aborts_priority_queued_client_requests() { + setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + document::BucketId bucket(16, 1); + addNodesToBucketDB(bucket, "0=1/1/1/t"); + + document::DocumentId id("id:foo:testdoctype1:n=1:foo"); + vespalib::stringref field_set = ""; + for (int i = 0; i < 10; ++i) { + auto cmd = std::make_shared<api::GetCommand>(document::BucketId(), id, field_set); + _distributor->onDown(cmd); + } + tickDistributorNTimes(1); + // Closing should trigger 1 abort via startet GetOperation and 9 aborts from pri queue + _distributor->close(); + CPPUNIT_ASSERT_EQUAL(size_t(10), _sender.replies.size()); + for (auto& msg : _sender.replies) { + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, + dynamic_cast<api::StorageReply&>(*msg).getResult().getResult()); + } +} + } } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 53df19fd10c..3dfe5103654 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -203,23 +203,26 @@ Distributor::onOpen() } } -void -Distributor::onClose() -{ - for (uint32_t i=0; i<_messageQueue.size(); ++i) { - std::shared_ptr<api::StorageMessage> msg = _messageQueue[i]; +void Distributor::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) { + api::StorageReply::UP reply( + std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down")); + sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); +} + +void Distributor::onClose() { + for (auto& msg : _messageQueue) { if (!msg->getType().isReply()) { - api::StorageReply::UP reply( - std::dynamic_pointer_cast<api::StorageCommand>(msg) - ->makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, - "Distributor is shutting down")); - sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); + send_shutdown_abort_reply(msg); } } _messageQueue.clear(); + while (!_client_request_priority_queue.empty()) { + send_shutdown_abort_reply(_client_request_priority_queue.top()); + _client_request_priority_queue.pop(); + } - LOG(debug, "Distributor::onFlush invoked"); + LOG(debug, "Distributor::onClose invoked"); _bucketDBUpdater.flush(); _operationOwner.onClose(); _maintenanceOperationOwner.onClose(); @@ -619,10 +622,10 @@ void Distributor::startExternalOperations() { const bool start_single_client_request = !_client_request_priority_queue.empty(); if (start_single_client_request) { - auto& msg = _client_request_priority_queue.top(); + const auto& msg = _client_request_priority_queue.top(); MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from " "client request priority queue to be processed."); - handle_or_propagate_message(msg); // TODO move() once we've move-enabled our message chains + handle_or_propagate_message(msg); _client_request_priority_queue.pop(); } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 438001acc40..5856aca71a1 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -187,11 +187,11 @@ private: }; void setNodeStateUp(); - bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); bool isMaintenanceReply(const api::StorageReply& reply) const; void handleStatusRequests(); + void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&); void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg); void startExternalOperations(); |