diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-05-04 21:30:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-04 21:30:38 +0200 |
commit | 24dc7f48d253f9cb6b69ba9a58bf53cf789803c4 (patch) | |
tree | 7bfcb6eb321f8d6a5fe171f729ce3e5867d84e40 | |
parent | 6e07be846490ce9579f29d4a5de47ca86cea430a (diff) |
Revert "- Implement async put"
25 files changed, 166 insertions, 349 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 80ddc0931f4..7816ee74fde 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -26,11 +26,6 @@ disk_operation_timeout int default=0 restart ## Number of threads to use for each mountpoint. num_threads int default=8 restart -## Number of threads for response processing and delivery -## 0 will give legacy sync behavior. -## Negative number will choose a good number based on # cores. -num_response_threads int default=0 - ## When merging, if we find more than this number of documents that exist on all ## of the same copies, send a separate apply bucket diff with these entries ## to an optimized merge chain that guarantuees minimum data transfer. diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h index 18a3c250e24..fa386e274f2 100644 --- a/persistence/src/vespa/persistence/spi/operationcomplete.h +++ b/persistence/src/vespa/persistence/spi/operationcomplete.h @@ -8,12 +8,6 @@ namespace storage::spi { class Result; -class ResultHandler { -public: - virtual ~ResultHandler() = default; - virtual void handle(const Result &) const = 0; -}; - /** * This is the callback interface when using the async operations * in the persistence provider. @@ -24,7 +18,6 @@ public: using UP = std::unique_ptr<OperationComplete>; virtual ~OperationComplete() = default; virtual void onComplete(std::unique_ptr<Result> result) = 0; - virtual void addResultHandler(const ResultHandler * resultHandler) = 0; }; }
\ No newline at end of file diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index b6569d9561c..c60ac615644 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -2,7 +2,6 @@ #include "persistenceprovider.h" #include <future> -#include <cassert> namespace storage::spi { @@ -10,20 +9,14 @@ PersistenceProvider::~PersistenceProvider() = default; class CatchResult : public OperationComplete { public: - CatchResult() : _promisedResult(), _resulthandler(nullptr) {} std::future<Result::UP> future_result() { - return _promisedResult.get_future(); + return promisedResult.get_future(); } void onComplete(Result::UP result) override { - _promisedResult.set_value(std::move(result)); - } - void addResultHandler(const ResultHandler * resultHandler) override { - assert(_resulthandler == nullptr); - _resulthandler = resultHandler; + promisedResult.set_value(std::move(result)); } private: - std::promise<Result::UP> _promisedResult; - const ResultHandler *_resulthandler; + std::promise<Result::UP> promisedResult; }; Result diff --git a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp index 371aa88e692..c10681405e7 100644 --- a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp +++ b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp @@ -31,7 +31,7 @@ TEST_F(DiskMoveOperationHandlerTest, simple) { document::Bucket bucket = makeDocumentBucket(document::BucketId(16, 4)); auto move = std::make_shared<BucketDiskMoveCommand>(bucket, 3, 4); spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - diskMoveHandler.handleBucketDiskMove(*move, createTracker(move, bucket)); + diskMoveHandler.handleBucketDiskMove(*move, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), move)); EXPECT_EQ("BucketId(0x4000000000000004): 10,4", getBucketStatus(document::BucketId(16,4))); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 811b4d60e15..6497f8bb698 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -201,8 +201,12 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, uint16_t deviceIndex) { (void) config; - return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(), - provider, filestorHandler, metrics, deviceIndex); + std::unique_ptr<DiskThread> disk; + disk.reset(new PersistenceThread( + node.getComponentRegister(), config.getConfigId(), provider, + filestorHandler, metrics, + deviceIndex)); + return disk; } namespace { diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 262906a4baf..035da326d48 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -149,6 +149,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; }; + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); + } + std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index f2c7b317be2..4ac9dfd7765 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -12,7 +12,6 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/sequencedtaskexecutor.h> using document::DocumentType; using storage::spi::test::makeSpiBucket; @@ -55,9 +54,9 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const { _node.setupDummyPersistence(); _metrics.initDiskMetrics(numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1); - _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, + _handler.reset(new FileStorHandler(_messageKeeper, _metrics, _node.getPersistenceProvider().getPartitionStates().getList(), - _node.getComponentRegister()); + _node.getComponentRegister())); for (uint32_t i = 0; i < numDisks; i++) { _diskEnvs.push_back( std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, @@ -65,10 +64,6 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const } } -PersistenceTestEnvironment::~PersistenceTestEnvironment() { - _handler->close(); -} - PersistenceTestUtils::PersistenceTestUtils() = default; PersistenceTestUtils::~PersistenceTestUtils() = default; @@ -79,21 +74,15 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) { void PersistenceTestUtils::setupDisks(uint32_t numDisks) { - _env = std::make_unique<PersistenceTestEnvironment>(DiskCount(numDisks), "todo-make-unique-persistencetestutils"); - setupExecutor(numDisks); -} - -void -PersistenceTestUtils::setupExecutor(uint32_t numThreads) { - _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); + _env.reset(new PersistenceTestEnvironment(DiskCount(numDisks), "todo-make-unique-persistencetestutils")); } std::unique_ptr<PersistenceThread> PersistenceTestUtils::createPersistenceThread(uint32_t disk) { - return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(), - _env->_config.getConfigId(),getPersistenceProvider(), - getEnv()._fileStorHandler, getEnv()._metrics, disk); + return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(), + getPersistenceProvider(), getEnv()._fileStorHandler, + getEnv()._metrics, disk); } document::Document::SP diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index cdd08d42565..3121bef61e5 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -10,7 +10,6 @@ #include <vespa/storage/common/storagecomponent.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> -#include <vespa/storage/storageserver/communicationmanager.h> #include <vespa/document/base/testdocman.h> #include <vespa/vespalib/gtest/gtest.h> @@ -25,7 +24,6 @@ struct MessageKeeper : public MessageSender { struct PersistenceTestEnvironment { PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot); - ~PersistenceTestEnvironment(); document::TestDocMan _testDocMan; vdstestlib::DirConfig _config; @@ -56,22 +54,7 @@ public: document::Bucket _bucket; }; - struct ReplySender : public MessageSender { - void sendCommand(const std::shared_ptr<api::StorageCommand> &) override { - abort(); - } - - void sendReply(const std::shared_ptr<api::StorageReply> & ptr) override { - queue.enqueue(std::move(ptr)); - } - - Queue queue; - }; - std::unique_ptr<PersistenceTestEnvironment> _env; - std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor; - ReplySender _replySender; - PersistenceTestUtils(); virtual ~PersistenceTestUtils(); @@ -84,13 +67,8 @@ public: uint32_t maxSize = 128); void setupDisks(uint32_t disks); - void setupExecutor(uint32_t numThreads); void TearDown() override { - if (_sequenceTaskExecutor) { - _sequenceTaskExecutor->sync(); - _sequenceTaskExecutor.reset(); - } _env.reset(); } @@ -112,21 +90,6 @@ public: spi::PersistenceProvider& getPersistenceProvider(); - MessageTracker::UP - createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { - return MessageTracker::createForTesting(getEnv(), _replySender, NoBucketLock::make(bucket), std::move(cmd)); - } - - api::ReturnCode - fetchResult(const MessageTracker::UP & tracker) { - if (tracker) { - return tracker->getResult(); - } - std::shared_ptr<api::StorageMessage> msg; - _replySender.queue.getNext(msg, 60000); - return dynamic_cast<api::StorageReply &>(*msg).getResult(); - } - /** Performs a put to the given disk. Returns the document that was inserted. diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 3d7fc70db6a..1ec6a35fb1d 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -214,7 +214,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) cmd->setMinByteSize(maxSize); cmd->setMinDocCount(maxCount); cmd->setSourceIndex(0); - MessageTracker::UP result = thread->handleSplitBucket(*cmd, createTracker(cmd, docBucket)); + MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 5174b733334..0d482ebe5b7 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -23,7 +23,7 @@ TEST_F(ProcessAllHandlerTest, remove_location) { document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", @@ -47,7 +47,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); - auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" @@ -74,7 +74,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -88,7 +88,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -107,7 +107,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -141,7 +141,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -187,7 +187,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index a5bde43f7ce..864ab320527 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -40,6 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { : context(spi::LoadType(0, "default"), 0, 0) {} + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); + } + void SetUp() override { SingleDiskPersistenceTestUtils::SetUp(); @@ -54,9 +59,8 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { } void TearDown() override { - thread->flush(); - SingleDiskPersistenceTestUtils::TearDown(); thread.reset(nullptr); + SingleDiskPersistenceTestUtils::TearDown(); } std::shared_ptr<api::UpdateCommand> conditional_update_test( @@ -87,7 +91,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), + ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -107,7 +111,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -219,7 +223,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -231,7 +235,7 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { setTestCondition(*put); thread->handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), + ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -275,7 +279,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta } auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); - fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))); + thread->handlePut(*put, createTracker(put, BUCKET)); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) diff --git a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h index 2a3d9aa8f91..c5257d59a3e 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h @@ -4,7 +4,8 @@ #include <vespa/storageapi/buckets/bucketinfo.h> -namespace storage::bucketdb { +namespace storage { +namespace bucketdb { struct StorageBucketInfo { api::BucketInfo info; @@ -33,3 +34,4 @@ struct StorageBucketInfo { std::ostream& operator<<(std::ostream& out, const StorageBucketInfo& info); } +} diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index d4615ee2df5..431c90b27f2 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -240,43 +240,4 @@ std::ostream& operator<<(std::ostream& out, StorageLink& link) { return out; } -Queue::Queue() = default; -Queue::~Queue() = default; - -bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) { - vespalib::MonitorGuard sync(_queueMonitor); - bool first = true; - while (true) { // Max twice - if (!_queue.empty()) { - LOG(spam, "Picking message from queue"); - msg = std::move(_queue.front()); - _queue.pop(); - return true; - } - if (timeout == 0 || !first) { - return false; - } - sync.wait(timeout); - first = false; - } - - return false; -} - -void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { - vespalib::MonitorGuard sync(_queueMonitor); - _queue.emplace(std::move(msg)); - sync.unsafeSignalUnlock(); -} - -void Queue::signal() { - vespalib::MonitorGuard sync(_queueMonitor); - sync.unsafeSignalUnlock(); -} - -size_t Queue::size() const { - vespalib::MonitorGuard sync(_queueMonitor); - return _queue.size(); -} - } diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index d6df7bf8afb..f1f679a1e26 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -23,9 +23,7 @@ #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/document/util/printable.h> -#include <vespa/vespalib/util/sync.h> #include <atomic> -#include <queue> namespace storage { @@ -184,36 +182,6 @@ private: friend struct StorageLinkTest; }; -class Queue { -private: - using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>; - QueueType _queue; - vespalib::Monitor _queueMonitor; - -public: - Queue(); - ~Queue(); - - /** - * Returns the next event from the event queue - * @param msg The next event - * @param timeout Millisecs to wait if the queue is empty - * (0 = don't wait, -1 = forever) - * @return true or false if the queue was empty. - */ - bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout); - - /** - * Enqueue msg in FIFO order. - */ - void enqueue(std::shared_ptr<api::StorageMessage> msg); - - /** Signal queue monitor. */ - void signal(); - - size_t size() const; -}; - std::ostream& operator<<(std::ostream& out, StorageLink& link); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index cc4768cb91d..59ae6b83f56 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -15,7 +15,6 @@ #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.filestor.manager"); @@ -89,13 +88,6 @@ FileStorManager::print(std::ostream& out, bool verbose, const std::string& inden out << "FileStorManager"; } -namespace { - -uint32_t computeNumResponseThreads(int configured) { - return (configured < 0) ? std::max(1u, std::thread::hardware_concurrency()/4) : configured; -} - -} /** * If live configuration, assuming storageserver makes sure no messages are * incoming during reconfiguration @@ -117,16 +109,12 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads); _filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _partitions, _compReg); - uint32_t numResposeThreads = computeNumResponseThreads(_config->numResponseThreads); - if (numResposeThreads > 0) { - _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResposeThreads, 10000, vespalib::Executor::OptimizeFor::ADAPTIVE); - } for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { LOG(spam, "Setting up disk %u", i); for (uint32_t j = 0; j < numThreads; j++) { - _disks[i].push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider, - *_filestorHandler, *_metrics->disks[i]->threads[j], i)); + _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler, + *_metrics->disks[i]->threads[j], i)); } } else { _filestorHandler->disable(i); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 452b83bb794..433b9ddbd39 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -12,7 +12,6 @@ #include "filestormetrics.h" #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/sync.h> -#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/document/bucket/bucketid.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storage/bucketdb/storbucketdb.h> @@ -68,7 +67,6 @@ class FileStorManager : public StorageLinkQueued, bool _failDiskOnError; std::shared_ptr<FileStorMetrics> _metrics; std::unique_ptr<FileStorHandler> _filestorHandler; - std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor; mutable vespalib::Monitor _threadMonitor; // Notify to stop sleeping bool _closed; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 4a99655c775..1368b14077a 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -10,87 +10,13 @@ #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.thread"); namespace storage { -namespace { - -class ResultTask : public vespalib::Executor::Task { -public: - ResultTask() : _result(), _resultHandler(nullptr) { } - void setResult(spi::Result::UP result) { - _result = std::move(result); - } - void addResultHandler(const spi::ResultHandler * resultHandler) { - // Only handles a signal handler now, - // Can be extended if necessary later on - assert(_resultHandler == nullptr); - _resultHandler = resultHandler; - } - void handle(const spi::Result &result ) { - if (_resultHandler != nullptr) { - _resultHandler->handle(result); - } - } -protected: - spi::Result::UP _result; -private: - const spi::ResultHandler * _resultHandler; -}; - -template<class FunctionType> -class LambdaResultTask : public ResultTask { -public: - LambdaResultTask(FunctionType && func) - : _func(std::move(func)) - {} - ~LambdaResultTask() override {} - void run() override { - handle(*_result); - _func(std::move(_result)); - } -private: - FunctionType _func; -}; - -template <class FunctionType> -std::unique_ptr<ResultTask> -makeResultTask(FunctionType &&function) -{ - return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>> - (std::forward<FunctionType>(function)); -} - -class ResultTaskOperationDone : public spi::OperationComplete { -public: - ResultTaskOperationDone(vespalib::ISequencedTaskExecutor & executor, document::BucketId bucketId, - std::unique_ptr<ResultTask> task) - : _executor(executor), - _task(std::move(task)), - _executorId(executor.getExecutorId(bucketId.getId())) - { - } - void onComplete(spi::Result::UP result) override { - _task->setResult(std::move(result)); - _executor.executeTask(_executorId, std::move(_task)); - } - void addResultHandler(const spi::ResultHandler * resultHandler) override { - _task->addResultHandler(resultHandler); - } -private: - vespalib::ISequencedTaskExecutor & _executor; - std::unique_ptr<ResultTask> _task; - vespalib::ISequencedTaskExecutor::ExecutorId _executorId; -}; - -} - -PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequencedExecutor, - ServiceLayerComponentRegister& compReg, +PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, @@ -98,7 +24,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequence uint16_t deviceIndex) : _stripeId(filestorHandler.getNextStripeId(deviceIndex)), _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), - _sequencedExecutor(sequencedExecutor), + _warnOnSlowOperations(5000), _spi(provider), _processAllHandler(_env, provider), _mergeHandler(_spi, _env), @@ -166,32 +92,20 @@ PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, Messa } MessageTracker::UP -PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) +PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP tracker) { - MessageTracker & tracker = *trackerUP; auto& metrics = _env._metrics.put[cmd.getLoadType()]; - tracker.setMetric(metrics); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) { - return trackerUP; + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) { + return tracker; } - if (_sequencedExecutor == nullptr) { - spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), - tracker.context()); - tracker.checkForError(response); - } else { - _spi.putAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()), spi::Timestamp(cmd.getTimestamp()), - std::move(cmd.getDocument()), tracker.context(), - std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), - makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { - tracker->checkForError(*response); - tracker->sendReply(); - }))); - } - return trackerUP; + spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker->context()); + tracker->checkForError(response); + return tracker; } MessageTracker::UP @@ -859,12 +773,37 @@ PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP t } else { auto & initiatingCommand = static_cast<api::StorageCommand&>(msg); try { + int64_t startTime(_component->getClock().getTimeInMillis().getTime()); + LOG(debug, "Handling command: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - return handleCommandSplitByType(initiatingCommand, std::move(tracker)); + tracker = handleCommandSplitByType(initiatingCommand, std::move(tracker)); + if (!tracker) { + LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); + } else { + tracker->generateReply(initiatingCommand); + if ((tracker->hasReply() + && tracker->getReply().getResult().failed()) + || tracker->getResult().failed()) + { + _env._metrics.failedOperations.inc(); + } + } + + int64_t stopTime(_component->getClock().getTimeInMillis().getTime()); + if (stopTime - startTime >= _warnOnSlowOperations) { + LOGBT(warning, msg.getType().toString(), + "Slow processing of message %s on disk %u. Processing time: %" PRId64 " ms (>=%d ms)", + msg.toString().c_str(), _env._partition, stopTime - startTime, _warnOnSlowOperations); + } else { + LOGBT(spam, msg.getType().toString(), "Processing time of message %s on disk %u: %" PRId64 " ms", + msg.toString(true).c_str(), _env._partition, stopTime - startTime); + } + + return tracker; } catch (std::exception& e) { LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); - api::StorageReply::SP reply(initiatingCommand.makeReply()); + api::StorageReply::SP reply(initiatingCommand.makeReply().release()); reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); _env._fileStorHandler.sendReply(reply); } @@ -878,7 +817,7 @@ PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) { LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); api::StorageMessage & msg(*lock.second); - auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), std::move(lock.second)); + auto tracker = std::make_unique<MessageTracker>(_env, std::move(lock.first), std::move(lock.second)); tracker = processMessage(msg, std::move(tracker)); if (tracker) { tracker->sendReply(); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 8eae0d8b34f..773732b5ef1 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -10,7 +10,6 @@ #include "provider_error_wrapper.h" #include <vespa/storage/common/storagecomponent.h> #include <vespa/storage/common/statusmessages.h> -#include <vespa/vespalib/util/isequencedtaskexecutor.h> namespace storage { @@ -20,9 +19,9 @@ class TestAndSetHelper; class PersistenceThread final : public DiskThread, public Types { public: - PersistenceThread(vespalib::ISequencedTaskExecutor *, ServiceLayerComponentRegister&, - const config::ConfigUri & configUri, spi::PersistenceProvider& provider, - FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex); + PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri, + spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, + FileStorThreadMetrics& metrics, uint16_t deviceIndex); ~PersistenceThread() override; /** Waits for current operation to be finished. */ @@ -49,7 +48,7 @@ public: private: uint32_t _stripeId; PersistenceUtil _env; - vespalib::ISequencedTaskExecutor * _sequencedExecutor; + uint32_t _warnOnSlowOperations; spi::PersistenceProvider& _spi; ProcessAllHandler _processAllHandler; MergeHandler _mergeHandler; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index c8dd564e390..9095b351f64 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -29,37 +29,23 @@ namespace { (id == api::MessageType::REMOVELOCATION_ID || id == api::MessageType::JOINBUCKETS_ID)); } - constexpr double WARN_ON_SLOW_OPERATIONS = 5.0; + } MessageTracker::MessageTracker(PersistenceUtil & env, - MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg) - : MessageTracker(env, replySender, true, std::move(bucketLock), std::move(msg)) -{} -MessageTracker::MessageTracker(PersistenceUtil & env, - MessageSender & replySender, - bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) : _sendReply(true), - _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), + _updateBucketInfo(hasBucketInfo(msg->getType().getId())), _bucketLock(std::move(bucketLock)), _msg(std::move(msg)), _context(_msg->getLoadType(), _msg->getPriority(), _msg->getTrace().getLevel()), _env(env), - _replySender(replySender), _metric(nullptr), _result(api::ReturnCode::OK), _timer(_env._component.getClock()) { } -MessageTracker::UP -MessageTracker::createForTesting(PersistenceUtil &env, MessageSender &replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { - return MessageTracker::UP(new MessageTracker(env, replySender, false, std::move(bucketLock), std::move(msg))); -} - void MessageTracker::setMetric(FileStorThreadMetrics::Op& metric) { metric.count.inc(); @@ -70,19 +56,6 @@ MessageTracker::~MessageTracker() = default; void MessageTracker::sendReply() { - generateReply(static_cast<api::StorageCommand &>(*_msg)); - if ((hasReply() && getReply().getResult().failed()) || getResult().failed()) { - _env._metrics.failedOperations.inc(); - } - double duration = _timer.getElapsedTimeAsDouble(); - if (duration >= WARN_ON_SLOW_OPERATIONS) { - LOGBT(warning, _msg->getType().toString(), - "Slow processing of message %s on disk %u. Processing time: %4.0f ms (>=%4.0f ms)", - _msg->toString().c_str(), _env._partition, duration, WARN_ON_SLOW_OPERATIONS); - } else { - LOGBT(spam, _msg->getType().toString(), "Processing time of message %s on disk %u: %4.0f ms", - _msg->toString(true).c_str(), _env._partition, duration); - } if (hasReply()) { if ( ! _context.getTrace().getRoot().isEmpty()) { getReply().getTrace().getRoot().addChild(_context.getTrace().getRoot()); @@ -97,7 +70,7 @@ MessageTracker::sendReply() { } LOG(spam, "Sending reply up: %s %" PRIu64, getReply().toString().c_str(), getReply().getMsgId()); - _replySender.sendReply(std::move(_reply)); + _env._fileStorHandler.sendReply(std::move(_reply)); } else { if ( ! _context.getTrace().getRoot().isEmpty()) { _msg->getTrace().getRoot().addChild(_context.getTrace().getRoot()); @@ -132,8 +105,8 @@ MessageTracker::generateReply(api::StorageCommand& cmd) return; } - if (!_reply) { - _reply = cmd.makeReply(); + if (!_reply.get()) { + _reply.reset(cmd.makeReply().release()); _reply->setResult(_result); } @@ -165,7 +138,7 @@ PersistenceUtil::PersistenceUtil( { } -PersistenceUtil::~PersistenceUtil() = default; +PersistenceUtil::~PersistenceUtil() { } void PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index a57ef186b46..126fa6ca17a 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -19,8 +19,7 @@ class MessageTracker : protected Types { public: typedef std::unique_ptr<MessageTracker> UP; - MessageTracker(PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); + MessageTracker(PersistenceUtil & env, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); ~MessageTracker(); @@ -63,28 +62,18 @@ public: api::ReturnCode getResult() const { return _result; } spi::Context & context() { return _context; } - document::BucketId getBucketId() const { - return _bucketLock->getBucket().getBucketId(); - } void sendReply(); bool checkForError(const spi::Result& response); - static MessageTracker::UP - createForTesting(PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); - private: - MessageTracker(PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); bool _sendReply; bool _updateBucketInfo; FileStorHandler::BucketLockInterface::SP _bucketLock; api::StorageMessage::SP _msg; spi::Context _context; PersistenceUtil &_env; - MessageSender &_replySender; FileStorThreadMetrics::Op *_metric; api::StorageReply::SP _reply; api::ReturnCode _result; diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index a9e6cbd3ea1..a86edd359fc 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -9,17 +9,12 @@ template <typename ResultType> ResultType ProviderErrorWrapper::checkResult(ResultType&& result) const { - handle(result); - return std::forward<ResultType>(result); -} - -void -ProviderErrorWrapper::handle(const spi::Result & result) const { if (result.getErrorCode() == spi::Result::ErrorType::FATAL_ERROR) { trigger_shutdown_listeners(result.getErrorMessage()); } else if (result.getErrorCode() == spi::Result::ErrorType::RESOURCE_EXHAUSTED) { trigger_resource_exhaustion_listeners(result.getErrorMessage()); } + return std::forward<ResultType>(result); } void ProviderErrorWrapper::trigger_shutdown_listeners(vespalib::stringref reason) const { @@ -196,10 +191,4 @@ ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, return checkResult(_impl.removeEntry(bucket, ts, context)); } -void ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc, - spi::Context &context, spi::OperationComplete::UP onComplete) { - onComplete->addResultHandler(this); - _impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete)); -} - } // ns storage diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 5e7bc288b03..23da566afee 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -33,7 +33,7 @@ public: } }; -class ProviderErrorWrapper : public spi::PersistenceProvider, public spi::ResultHandler { +class ProviderErrorWrapper : public spi::PersistenceProvider { public: explicit ProviderErrorWrapper(spi::PersistenceProvider& impl) : _impl(impl), @@ -72,13 +72,9 @@ public: } void register_error_listener(std::shared_ptr<ProviderErrorListener> listener); - - void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override; - private: template <typename ResultType> ResultType checkResult(ResultType&& result) const; - void handle(const spi::Result &) const override; void trigger_shutdown_listeners(vespalib::stringref reason) const; void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index a0b75b7602d..fa2b0cda018 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -28,6 +28,45 @@ using document::FixedBucketSpaces; namespace storage { +Queue::Queue() = default; +Queue::~Queue() = default; + +bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) { + vespalib::MonitorGuard sync(_queueMonitor); + bool first = true; + while (true) { // Max twice + if (!_queue.empty()) { + LOG(spam, "Picking message from queue"); + msg = std::move(_queue.front()); + _queue.pop(); + return true; + } + if (timeout == 0 || !first) { + return false; + } + sync.wait(timeout); + first = false; + } + + return false; +} + +void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { + vespalib::MonitorGuard sync(_queueMonitor); + _queue.emplace(std::move(msg)); + sync.unsafeSignalUnlock(); +} + +void Queue::signal() { + vespalib::MonitorGuard sync(_queueMonitor); + sync.unsafeSignalUnlock(); +} + +size_t Queue::size() const { + vespalib::MonitorGuard sync(_queueMonitor); + return _queue.size(); +} + StorageTransportContext::StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg) : _docAPIMsg(std::move(msg)) { } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 1d64c8a8911..c08ad214768 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -44,6 +44,36 @@ class VisitorThread; class FNetListener; class RPCRequestWrapper; +class Queue { +private: + using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>; + QueueType _queue; + vespalib::Monitor _queueMonitor; + +public: + Queue(); + ~Queue(); + + /** + * Returns the next event from the event queue + * @param msg The next event + * @param timeout Millisecs to wait if the queue is empty + * (0 = don't wait, -1 = forever) + * @return true or false if the queue was empty. + */ + bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout); + + /** + * Enqueue msg in FIFO order. + */ + void enqueue(std::shared_ptr<api::StorageMessage> msg); + + /** Signal queue monitor. */ + void signal(); + + size_t size() const; +}; + class StorageTransportContext : public api::TransportContext { public: StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg); diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp index 2e90e1ae3ee..5efd638ec26 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp @@ -23,6 +23,6 @@ TestComponentRegister::TestComponentRegister(ComponentRegisterImpl::UP compReg) // register status pages without a server } -TestComponentRegister::~TestComponentRegister() = default; +TestComponentRegister::~TestComponentRegister() {} } |