diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-27 15:24:26 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-27 15:24:26 +0200 |
commit | e2356a51c4c64745d232c02ed142afd42efe5a93 (patch) | |
tree | c2e4d0945448464daed83c6d13ce1ade316ff13f | |
parent | a4be723aa4f79616a985a28868c510a1d17970e1 (diff) | |
parent | 2e1602d453dc4272e7e1040a21f12e3438c8a5aa (diff) |
Merge pull request #13080 from vespa-engine/balder/no-more-batching-of-operations
Remove batching of messages that has no effect in favor of making asy…
9 files changed, 13 insertions, 269 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 4576f8a08f8..2cb390fca86 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -698,19 +698,6 @@ TEST_F(FileStorManagerTest, handler_pause) { ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); } -namespace { - -uint64_t getPutTime(api::StorageMessage::SP& msg) -{ - if (!msg.get()) { - return (uint64_t)-1; - } - - return static_cast<api::PutCommand*>(msg.get())->getTimestamp(); -}; - -} - TEST_F(FileStorManagerTest, remap_split) { // Setup a filestorthread to test DummyStorageLink top; @@ -768,63 +755,6 @@ TEST_F(FileStorManagerTest, remap_split) { filestorHandler.dumpQueue(0)); } -TEST_F(FileStorManagerTest, handler_multi) { - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); - filestorHandler.setGetNextMessageTimeout(50); - uint32_t stripeId = filestorHandler.getNextStripeId(0); - - std::string content("Here is some content which is in all documents"); - - Document::SP doc1(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release()); - - Document::SP doc2(createDocument(content, "id:footype:testdoctype1:n=4567:bar").release()); - - document::BucketIdFactory factory; - document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId()); - document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId()); - - // Populate bucket with the given data - for (uint32_t i = 1; i < 10; i++) { - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0); - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0); - } - - { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId); - ASSERT_EQ(1, getPutTime(lock.second)); - - lock = filestorHandler.getNextMessage(0, stripeId, lock); - ASSERT_EQ(2, getPutTime(lock.second)); - - lock = filestorHandler.getNextMessage(0, stripeId, lock); - ASSERT_EQ(3, getPutTime(lock.second)); - } - - { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId); - ASSERT_EQ(11, getPutTime(lock.second)); - - lock = filestorHandler.getNextMessage(0, stripeId, lock); - ASSERT_EQ(12, getPutTime(lock.second)); - } -} - TEST_F(FileStorManagerTest, handler_timeout) { // Setup a filestorthread to test DummyStorageLink top; diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index be276dd7f9d..3754a82e7ae 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -168,19 +168,4 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive ASSERT_FALSE(lock1.first.get()); } -TEST_F(PersistenceQueueTest, operation_batching_not_allowed_across_different_lock_modes) { - Fixture f(*this); - - f.filestorHandler->schedule(createPut(1234, 0), _disk); - f.filestorHandler->schedule(createGet(1234), _disk); - - auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); - ASSERT_TRUE(lock0.first); - ASSERT_TRUE(lock0.second); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); - - f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0); - ASSERT_FALSE(lock0.second); -} - } // namespace storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 543accc80ae..44cd8f0ab0c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -60,12 +60,6 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId) return _impl->getNextMessage(disk, stripeId); } -FileStorHandler::LockedMessage & -FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lck) -{ - return _impl->getNextMessage(disk, stripeId, lck); -} - FileStorHandler::BucketLockInterface::SP FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index db294d7c39f..af0a52b2fa0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -121,11 +121,6 @@ public: LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); /** - * Returns the next message for the same bucket. - */ - LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lock); - - /** * Lock a bucket. By default, each file stor thread has the locks of all * buckets in their area of responsibility. If they need to access buckets * outside of their area, they can call this to make sure the thread diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index bfd5233d017..5ad37d585ef 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -319,25 +319,6 @@ FileStorHandlerImpl::getNextStripeId(uint32_t disk) { return _diskInfo[disk].getNextStripeId(); } - -FileStorHandler::LockedMessage & -FileStorHandlerImpl::getNextMessage(uint16_t diskId, uint32_t stripeId, FileStorHandler::LockedMessage& lck) -{ - document::Bucket bucket(lck.first->getBucket()); - - LOG(spam, "Disk %d retrieving message for buffered bucket %s", diskId, bucket.getBucketId().toString().c_str()); - - assert(diskId < _diskInfo.size()); - Disk& disk(_diskInfo[diskId]); - - if (disk.isClosed()) { - lck.second.reset(); - return lck; - } - - return disk.getNextMessage(stripeId, lck); -} - bool FileStorHandlerImpl::tryHandlePause(uint16_t disk) const { @@ -977,49 +958,6 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk) return {}; // No message fetched. } -FileStorHandler::LockedMessage & -FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck) -{ - const document::Bucket & bucket = lck.second->getBucket(); - vespalib::MonitorGuard guard(_lock); - BucketIdx& idx = bmi::get<2>(_queue); - std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket); - - // No more for this bucket. - if (range.first == range.second) { - lck.second.reset(); - return lck; - } - - api::StorageMessage & m(*range.first->_command); - // For now, don't allow batching of operations across lock requirement modes. - // We might relax this requirement later once we're 100% sure it can't trigger - // any unfortunate edge cases. - if (lck.first->lockingRequirements() != m.lockingRequirements()) { - lck.second.reset(); - return lck; - } - - std::chrono::milliseconds waitTime(uint64_t(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]))); - - if (!messageTimedOutInQueue(m, waitTime)) { - std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command); - idx.erase(range.first); - lck.second.swap(msg); - guard.broadcast(); - } else { - std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply(); - idx.erase(range.first); - guard.broadcast(); - guard.unlock(); - msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); - _messageSender.sendReply(msgReply); - - lck.second.reset(); - } - return lck; -} - FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index da4d242a4c9..7a4f9000e82 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -129,7 +129,6 @@ public: void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk); - FileStorHandler::LockedMessage & getNextMessage(FileStorHandler::LockedMessage& lock); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; @@ -185,9 +184,6 @@ public: FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, uint32_t timeout) { return _stripes[stripeId].getNextMessage(timeout, *this); } - FileStorHandler::LockedMessage & getNextMessage(uint32_t stripeId, FileStorHandler::LockedMessage & lck) { - return _stripes[stripeId].getNextMessage(lck); - } std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { return stripe(bucket).lock(bucket, lockReq); @@ -253,8 +249,6 @@ public: FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); - FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, FileStorHandler::LockedMessage& lock); - enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index ea6f5e03a80..6eab44923f7 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -802,7 +802,7 @@ void FileStorManager::onFlush(bool downwards) LOG(debug, "Flushed _filestorHandler->flush(!downwards);"); for (uint32_t i = 0; i < _disks.size(); ++i) { for (uint32_t j = 0; j < _disks[i].size(); ++j) { - if (_disks[i][j].get() != NULL) { + if (_disks[i][j]) { _disks[i][j]->flush(); LOG(debug, "flushed disk[%d][%d]", i, j); } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index e7123164659..d1da25269cf 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -895,114 +895,24 @@ bool hasBucketInfo(const api::StorageMessage& msg) } void -PersistenceThread::flushAllReplies( - const document::Bucket& bucket, - std::vector<std::unique_ptr<MessageTracker> >& replies) -{ - if (replies.empty()) { - return; - } - - try { - if (replies.size() > 1) { - _env._metrics.batchingSize.addValue(replies.size()); - } -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - { - size_t nputs = 0, nremoves = 0, nother = 0; - for (size_t i = 0; i < replies.size(); ++i) { - if (dynamic_cast<api::PutReply*>(replies[i]->getReply().get())) - { - ++nputs; - } else if (dynamic_cast<api::RemoveReply*>( - replies[i]->getReply().get())) - { - ++nremoves; - } else { - ++nother; - } - } - LOG_BUCKET_OPERATION( - bucket.getBucketId(), - vespalib::make_string( - "flushing %zu operations (%zu puts, %zu removes, " - "%zu other)", - replies.size(), nputs, nremoves, nother)); - } -#endif - spi::Bucket b(bucket, spi::PartitionId(_env._partition)); - // Flush is not used for anything currentlu, and the context is not correct either when batching is done - // So just faking it here. - spi::Context dummyContext(documentapi::LoadType::DEFAULT, 0, 0); - spi::Result result = _spi.flush(b, dummyContext); - uint32_t errorCode = _env.convertErrorCode(result); - if (errorCode != 0) { - for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult(api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage())); - } - } - } catch (std::exception& e) { - for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); - } - } - - for (uint32_t i = 0; i < replies.size(); ++i) { - LOG(spam, "Sending reply up (batched): %s %" PRIu64, - replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId()); - _env._fileStorHandler.sendReply(replies[i]->getReply()); - } - - replies.clear(); -} - -void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) -{ +PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) { std::vector<MessageTracker::UP> trackers; document::Bucket bucket = lock.first->getBucket(); - while (lock.second) { - LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); - std::shared_ptr<api::StorageMessage> msg(lock.second); - bool batchable = isBatchable(*msg); - - // If the next operation wasn't batchable, we should flush - // everything that came before. - if (!batchable) { - flushAllReplies(bucket, trackers); - } - - std::unique_ptr<MessageTracker> tracker = processMessage(*msg); - if (!tracker || !tracker->getReply()) { - // Was a reply - break; - } + LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); + api::StorageMessage & msg(*lock.second); - if (hasBucketInfo(*msg)) { + std::unique_ptr<MessageTracker> tracker = processMessage(msg); + if (tracker && tracker->getReply()) { + if (hasBucketInfo(msg)) { if (tracker->getReply()->getResult().success()) { _env.setBucketInfo(*tracker, bucket); } } - if (batchable) { - LOG(spam, "Adding reply %s to batch for bucket %s", - tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str()); - - trackers.push_back(std::move(tracker)); - - if (trackers.back()->getReply()->getResult().success()) { - _env._fileStorHandler.getNextMessage(_env._partition, _stripeId, lock); - } else { - break; - } - } else { - LOG(spam, "Sending reply up: %s %" PRIu64, - tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); - _env._fileStorHandler.sendReply(tracker->getReply()); - break; - } + LOG(spam, "Sending reply up: %s %" PRIu64, + tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); + _env._fileStorHandler.sendReply(tracker->getReply()); } - - flushAllReplies(bucket, trackers); } void @@ -1016,7 +926,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId)); if (lock.first) { - processMessages(lock); + processLockedMessage(lock); } vespalib::MonitorGuard flushMonitorGuard(_flushMonitor); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index e410843c1be..56414835b7b 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -22,7 +22,7 @@ public: PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex); - ~PersistenceThread(); + ~PersistenceThread() override; /** Waits for current operation to be finished. */ void flush() override; @@ -75,15 +75,13 @@ private: void handleReply(api::StorageReply&); MessageTracker::UP processMessage(api::StorageMessage& msg); - void processMessages(FileStorHandler::LockedMessage & lock); + void processLockedMessage(FileStorHandler::LockedMessage & lock); // Thread main loop void run(framework::ThreadHandle&) override; bool checkForError(const spi::Result& response, MessageTracker& tracker); spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const; - void flushAllReplies(const document::Bucket& bucket, std::vector<MessageTracker::UP>& trackers); - friend class TestAndSetHelper; bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, |