summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-12 16:39:40 +0200
committerGitHub <noreply@github.com>2017-10-12 16:39:40 +0200
commit1b70594bb416c7f11082323557f57de047bad5bd (patch)
tree4c7b2e3fe19a4078c5efa637a3bd7ef3c1d0ac01 /storage
parent1912fe8235c2f8d21017dca0ffc323b21be0bca4 (diff)
parentd8b880a01c106f84270e91ef2e16d0639d817bd9 (diff)
Merge pull request #3736 from vespa-engine/vekterli/use-priority-queue-for-external-client-operations
Use priority queue for external client operations
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp60
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp64
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h16
3 files changed, 128 insertions, 12 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 3aedd31f574..12a4118aa08 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -48,6 +48,8 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker);
+ CPPUNIT_TEST(external_client_requests_are_handled_individually_in_priority_order);
+ CPPUNIT_TEST(internal_messages_are_started_in_fifo_order_batch);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -77,6 +79,8 @@ protected:
void sequencing_config_is_propagated_to_distributor_config();
void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config();
void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker();
+ void external_client_requests_are_handled_individually_in_priority_order();
+ void internal_messages_are_started_in_fifo_order_batch();
public:
void setUp() override {
@@ -867,6 +871,62 @@ void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_mess
CPPUNIT_ASSERT(!node_info.isBusy(0));
}
+void Distributor_Test::external_client_requests_are_handled_individually_in_priority_order() {
+ setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+ addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a");
+
+ std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 0});
+ document::DocumentId id("id:foo:testdoctype1:n=1:foo");
+ vespalib::stringref field_set = "";
+ for (auto pri : priorities) {
+ auto cmd = std::make_shared<api::GetCommand>(document::BucketId(), id, field_set);
+ cmd->setPriority(pri);
+ // onDown appends to internal message FIFO queue, awaiting hand-off.
+ _distributor->onDown(cmd);
+ }
+ // At the hand-off point we expect client requests to be prioritized.
+ // For each tick, a priority-order client request is processed and sent off.
+ for (size_t i = 1; i <= priorities.size(); ++i) {
+ tickDistributorNTimes(1);
+ CPPUNIT_ASSERT_EQUAL(size_t(i), _sender.commands.size());
+ }
+
+ std::vector<int> expected({0, 10, 40, 50, 255});
+ std::vector<int> actual;
+ for (auto& msg : _sender.commands) {
+ actual.emplace_back(static_cast<int>(msg->getPriority()));
+ }
+ CPPUNIT_ASSERT_EQUAL(expected, actual);
+}
+
+void Distributor_Test::internal_messages_are_started_in_fifo_order_batch() {
+ // To test internal request ordering, we use NotifyBucketChangeCommand
+ // for the reason that it explicitly updates the bucket database for
+ // each individual invocation.
+ setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+ document::BucketId bucket(16, 1);
+ addNodesToBucketDB(bucket, "0=1/1/1/t");
+
+ std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 1});
+ for (auto pri : priorities) {
+ api::BucketInfo fake_info(pri, pri, pri);
+ auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, fake_info);
+ cmd->setSourceIndex(0);
+ cmd->setPriority(pri);
+ _distributor->onDown(cmd);
+ }
+
+ // Doing a single tick should process all internal requests in one batch
+ tickDistributorNTimes(1);
+ CPPUNIT_ASSERT_EQUAL(size_t(5), _sender.replies.size());
+
+ // The bucket info for priority 1 (last FIFO-order change command received, but
+ // highest priority) should be the end-state of the bucket database, _not_ that
+ // of lowest priority 255.
+ BucketDatabase::Entry e(getBucket(bucket));
+ CPPUNIT_ASSERT_EQUAL(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo());
+}
+
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 0dc52650131..53df19fd10c 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -572,19 +572,61 @@ Distributor::workWasDone()
return !_tickResult.waitWanted();
}
-void
-Distributor::startExternalOperations()
-{
- for (uint32_t i=0; i<_fetchedMessages.size(); ++i) {
- MBUS_TRACE(_fetchedMessages[i]->getTrace(), 9,
- "Distributor: Grabbed from queue to be processed.");
- if (!handleMessage(_fetchedMessages[i])) {
- MBUS_TRACE(_fetchedMessages[i]->getTrace(), 9,
- "Distributor: Not handling it. Sending further down.");
- sendDown(_fetchedMessages[i]);
+namespace {
+
+bool is_client_request(const api::StorageMessage& msg) noexcept {
+ // Despite having been converted to StorageAPI messages, the following
+ // set of messages are never sent to the distributor by other processes
+ // than clients.
+ switch (msg.getType().getId()) {
+ case api::MessageType::GET_ID:
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::VISITOR_CREATE_ID:
+ case api::MessageType::VISITOR_DESTROY_ID:
+ case api::MessageType::MULTIOPERATION_ID: // Deprecated
+ case api::MessageType::GETBUCKETLIST_ID:
+ case api::MessageType::STATBUCKET_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::REMOVELOCATION_ID:
+ case api::MessageType::BATCHPUTREMOVE_ID: // Deprecated
+ case api::MessageType::BATCHDOCUMENTUPDATE_ID: // Deprecated
+ return true;
+ default:
+ return false;
+ }
+}
+
+}
+
+void Distributor::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) {
+ if (!handleMessage(msg)) {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down.");
+ sendDown(msg);
+ }
+}
+
+void Distributor::startExternalOperations() {
+ for (auto& msg : _fetchedMessages) {
+ if (is_client_request(*msg)) {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue");
+ _client_request_priority_queue.emplace(std::move(msg));
+ } else {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed.");
+ handle_or_propagate_message(msg);
}
}
- if (!_fetchedMessages.empty()) {
+
+ const bool start_single_client_request = !_client_request_priority_queue.empty();
+ if (start_single_client_request) {
+ auto& msg = _client_request_priority_queue.top();
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from "
+ "client request priority queue to be processed.");
+ handle_or_propagate_message(msg); // TODO move() once we've move-enabled our message chains
+ _client_request_priority_queue.pop();
+ }
+
+ if (!_fetchedMessages.empty() || start_single_client_request) {
signalWorkWasDone();
}
_fetchedMessages.clear();
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index caf3a13d113..438001acc40 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -23,6 +23,7 @@
#include <vespa/config/config.h>
#include <vespa/vespalib/util/sync.h>
#include <unordered_map>
+#include <queue>
namespace storage {
@@ -191,6 +192,7 @@ private:
bool isMaintenanceReply(const api::StorageReply& reply) const;
void handleStatusRequests();
+ void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg);
void startExternalOperations();
/**
@@ -252,8 +254,20 @@ private:
mutable std::shared_ptr<lib::Distribution> _distribution;
std::shared_ptr<lib::Distribution> _nextDistribution;
- typedef std::vector<std::shared_ptr<api::StorageMessage> > MessageQueue;
+ using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
+ struct IndirectHigherPriority {
+ template <typename Lhs, typename Rhs>
+ bool operator()(const Lhs& lhs, const Rhs& rhs) const noexcept {
+ return lhs->getPriority() > rhs->getPriority();
+ }
+ };
+ using ClientRequestPriorityQueue = std::priority_queue<
+ std::shared_ptr<api::StorageMessage>,
+ std::vector<std::shared_ptr<api::StorageMessage>>,
+ IndirectHigherPriority
+ >;
MessageQueue _messageQueue;
+ ClientRequestPriorityQueue _client_request_priority_queue;
MessageQueue _fetchedMessages;
framework::TickingThreadPool& _threadPool;
vespalib::Monitor _statusMonitor;