summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-27 15:24:26 +0200
committerGitHub <noreply@github.com>2020-04-27 15:24:26 +0200
commite2356a51c4c64745d232c02ed142afd42efe5a93 (patch)
treec2e4d0945448464daed83c6d13ce1ade316ff13f
parenta4be723aa4f79616a985a28868c510a1d17970e1 (diff)
parent2e1602d453dc4272e7e1040a21f12e3438c8a5aa (diff)
Merge pull request #13080 from vespa-engine/balder/no-more-batching-of-operations
Remove batching of messages that has no effect in favor of making asy…
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp70
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp15
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp62
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp110
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h6
9 files changed, 13 insertions, 269 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 4576f8a08f8..2cb390fca86 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -698,19 +698,6 @@ TEST_F(FileStorManagerTest, handler_pause) {
ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority());
}
-namespace {
-
-uint64_t getPutTime(api::StorageMessage::SP& msg)
-{
- if (!msg.get()) {
- return (uint64_t)-1;
- }
-
- return static_cast<api::PutCommand*>(msg.get())->getTimestamp();
-};
-
-}
-
TEST_F(FileStorManagerTest, remap_split) {
// Setup a filestorthread to test
DummyStorageLink top;
@@ -768,63 +755,6 @@ TEST_F(FileStorManagerTest, remap_split) {
filestorHandler.dumpQueue(0));
}
-TEST_F(FileStorManagerTest, handler_multi) {
- // Setup a filestorthread to test
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
- // Since we fake time with small numbers, we need to make sure we dont
- // compact them away, as they will seem to be from 1970
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
- filestorHandler.setGetNextMessageTimeout(50);
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
-
- std::string content("Here is some content which is in all documents");
-
- Document::SP doc1(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release());
-
- Document::SP doc2(createDocument(content, "id:footype:testdoctype1:n=4567:bar").release());
-
- document::BucketIdFactory factory;
- document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId());
- document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId());
-
- // Populate bucket with the given data
- for (uint32_t i = 1; i < 10; i++) {
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0);
- filestorHandler.schedule(
- api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0);
- }
-
- {
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId);
- ASSERT_EQ(1, getPutTime(lock.second));
-
- lock = filestorHandler.getNextMessage(0, stripeId, lock);
- ASSERT_EQ(2, getPutTime(lock.second));
-
- lock = filestorHandler.getNextMessage(0, stripeId, lock);
- ASSERT_EQ(3, getPutTime(lock.second));
- }
-
- {
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId);
- ASSERT_EQ(11, getPutTime(lock.second));
-
- lock = filestorHandler.getNextMessage(0, stripeId, lock);
- ASSERT_EQ(12, getPutTime(lock.second));
- }
-}
-
TEST_F(FileStorManagerTest, handler_timeout) {
// Setup a filestorthread to test
DummyStorageLink top;
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index be276dd7f9d..3754a82e7ae 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -168,19 +168,4 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive
ASSERT_FALSE(lock1.first.get());
}
-TEST_F(PersistenceQueueTest, operation_batching_not_allowed_across_different_lock_modes) {
- Fixture f(*this);
-
- f.filestorHandler->schedule(createPut(1234, 0), _disk);
- f.filestorHandler->schedule(createGet(1234), _disk);
-
- auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
- ASSERT_TRUE(lock0.first);
- ASSERT_TRUE(lock0.second);
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
-
- f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0);
- ASSERT_FALSE(lock0.second);
-}
-
} // namespace storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 543accc80ae..44cd8f0ab0c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -60,12 +60,6 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId)
return _impl->getNextMessage(disk, stripeId);
}
-FileStorHandler::LockedMessage &
-FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lck)
-{
- return _impl->getNextMessage(disk, stripeId, lck);
-}
-
FileStorHandler::BucketLockInterface::SP
FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq)
{
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index db294d7c39f..af0a52b2fa0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -121,11 +121,6 @@ public:
LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
/**
- * Returns the next message for the same bucket.
- */
- LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lock);
-
- /**
* Lock a bucket. By default, each file stor thread has the locks of all
* buckets in their area of responsibility. If they need to access buckets
* outside of their area, they can call this to make sure the thread
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index bfd5233d017..5ad37d585ef 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -319,25 +319,6 @@ FileStorHandlerImpl::getNextStripeId(uint32_t disk) {
return _diskInfo[disk].getNextStripeId();
}
-
-FileStorHandler::LockedMessage &
-FileStorHandlerImpl::getNextMessage(uint16_t diskId, uint32_t stripeId, FileStorHandler::LockedMessage& lck)
-{
- document::Bucket bucket(lck.first->getBucket());
-
- LOG(spam, "Disk %d retrieving message for buffered bucket %s", diskId, bucket.getBucketId().toString().c_str());
-
- assert(diskId < _diskInfo.size());
- Disk& disk(_diskInfo[diskId]);
-
- if (disk.isClosed()) {
- lck.second.reset();
- return lck;
- }
-
- return disk.getNextMessage(stripeId, lck);
-}
-
bool
FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
{
@@ -977,49 +958,6 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
return {}; // No message fetched.
}
-FileStorHandler::LockedMessage &
-FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck)
-{
- const document::Bucket & bucket = lck.second->getBucket();
- vespalib::MonitorGuard guard(_lock);
- BucketIdx& idx = bmi::get<2>(_queue);
- std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket);
-
- // No more for this bucket.
- if (range.first == range.second) {
- lck.second.reset();
- return lck;
- }
-
- api::StorageMessage & m(*range.first->_command);
- // For now, don't allow batching of operations across lock requirement modes.
- // We might relax this requirement later once we're 100% sure it can't trigger
- // any unfortunate edge cases.
- if (lck.first->lockingRequirements() != m.lockingRequirements()) {
- lck.second.reset();
- return lck;
- }
-
- std::chrono::milliseconds waitTime(uint64_t(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])));
-
- if (!messageTimedOutInQueue(m, waitTime)) {
- std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command);
- idx.erase(range.first);
- lck.second.swap(msg);
- guard.broadcast();
- } else {
- std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply();
- idx.erase(range.first);
- guard.broadcast();
- guard.unlock();
- msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
- _messageSender.sendReply(msgReply);
-
- lck.second.reset();
- }
- return lck;
-}
-
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) {
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index da4d242a4c9..7a4f9000e82 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -129,7 +129,6 @@ public:
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk);
- FileStorHandler::LockedMessage & getNextMessage(FileStorHandler::LockedMessage& lock);
void dumpQueue(std::ostream & os) const;
void dumpActiveHtml(std::ostream & os) const;
void dumpQueueHtml(std::ostream & os) const;
@@ -185,9 +184,6 @@ public:
FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, uint32_t timeout) {
return _stripes[stripeId].getNextMessage(timeout, *this);
}
- FileStorHandler::LockedMessage & getNextMessage(uint32_t stripeId, FileStorHandler::LockedMessage & lck) {
- return _stripes[stripeId].getNextMessage(lck);
- }
std::shared_ptr<FileStorHandler::BucketLockInterface>
lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
return stripe(bucket).lock(bucket, lockReq);
@@ -253,8 +249,6 @@ public:
FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
- FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, FileStorHandler::LockedMessage& lock);
-
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index ea6f5e03a80..6eab44923f7 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -802,7 +802,7 @@ void FileStorManager::onFlush(bool downwards)
LOG(debug, "Flushed _filestorHandler->flush(!downwards);");
for (uint32_t i = 0; i < _disks.size(); ++i) {
for (uint32_t j = 0; j < _disks[i].size(); ++j) {
- if (_disks[i][j].get() != NULL) {
+ if (_disks[i][j]) {
_disks[i][j]->flush();
LOG(debug, "flushed disk[%d][%d]", i, j);
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index e7123164659..d1da25269cf 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -895,114 +895,24 @@ bool hasBucketInfo(const api::StorageMessage& msg)
}
void
-PersistenceThread::flushAllReplies(
- const document::Bucket& bucket,
- std::vector<std::unique_ptr<MessageTracker> >& replies)
-{
- if (replies.empty()) {
- return;
- }
-
- try {
- if (replies.size() > 1) {
- _env._metrics.batchingSize.addValue(replies.size());
- }
-#ifdef ENABLE_BUCKET_OPERATION_LOGGING
- {
- size_t nputs = 0, nremoves = 0, nother = 0;
- for (size_t i = 0; i < replies.size(); ++i) {
- if (dynamic_cast<api::PutReply*>(replies[i]->getReply().get()))
- {
- ++nputs;
- } else if (dynamic_cast<api::RemoveReply*>(
- replies[i]->getReply().get()))
- {
- ++nremoves;
- } else {
- ++nother;
- }
- }
- LOG_BUCKET_OPERATION(
- bucket.getBucketId(),
- vespalib::make_string(
- "flushing %zu operations (%zu puts, %zu removes, "
- "%zu other)",
- replies.size(), nputs, nremoves, nother));
- }
-#endif
- spi::Bucket b(bucket, spi::PartitionId(_env._partition));
- // Flush is not used for anything currentlu, and the context is not correct either when batching is done
- // So just faking it here.
- spi::Context dummyContext(documentapi::LoadType::DEFAULT, 0, 0);
- spi::Result result = _spi.flush(b, dummyContext);
- uint32_t errorCode = _env.convertErrorCode(result);
- if (errorCode != 0) {
- for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage()));
- }
- }
- } catch (std::exception& e) {
- for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
- }
- }
-
- for (uint32_t i = 0; i < replies.size(); ++i) {
- LOG(spam, "Sending reply up (batched): %s %" PRIu64,
- replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId());
- _env._fileStorHandler.sendReply(replies[i]->getReply());
- }
-
- replies.clear();
-}
-
-void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
-{
+PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) {
std::vector<MessageTracker::UP> trackers;
document::Bucket bucket = lock.first->getBucket();
- while (lock.second) {
- LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
- std::shared_ptr<api::StorageMessage> msg(lock.second);
- bool batchable = isBatchable(*msg);
-
- // If the next operation wasn't batchable, we should flush
- // everything that came before.
- if (!batchable) {
- flushAllReplies(bucket, trackers);
- }
-
- std::unique_ptr<MessageTracker> tracker = processMessage(*msg);
- if (!tracker || !tracker->getReply()) {
- // Was a reply
- break;
- }
+ LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
+ api::StorageMessage & msg(*lock.second);
- if (hasBucketInfo(*msg)) {
+ std::unique_ptr<MessageTracker> tracker = processMessage(msg);
+ if (tracker && tracker->getReply()) {
+ if (hasBucketInfo(msg)) {
if (tracker->getReply()->getResult().success()) {
_env.setBucketInfo(*tracker, bucket);
}
}
- if (batchable) {
- LOG(spam, "Adding reply %s to batch for bucket %s",
- tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str());
-
- trackers.push_back(std::move(tracker));
-
- if (trackers.back()->getReply()->getResult().success()) {
- _env._fileStorHandler.getNextMessage(_env._partition, _stripeId, lock);
- } else {
- break;
- }
- } else {
- LOG(spam, "Sending reply up: %s %" PRIu64,
- tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
- _env._fileStorHandler.sendReply(tracker->getReply());
- break;
- }
+ LOG(spam, "Sending reply up: %s %" PRIu64,
+ tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
+ _env._fileStorHandler.sendReply(tracker->getReply());
}
-
- flushAllReplies(bucket, trackers);
}
void
@@ -1016,7 +926,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId));
if (lock.first) {
- processMessages(lock);
+ processLockedMessage(lock);
}
vespalib::MonitorGuard flushMonitorGuard(_flushMonitor);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index e410843c1be..56414835b7b 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -22,7 +22,7 @@ public:
PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics, uint16_t deviceIndex);
- ~PersistenceThread();
+ ~PersistenceThread() override;
/** Waits for current operation to be finished. */
void flush() override;
@@ -75,15 +75,13 @@ private:
void handleReply(api::StorageReply&);
MessageTracker::UP processMessage(api::StorageMessage& msg);
- void processMessages(FileStorHandler::LockedMessage & lock);
+ void processLockedMessage(FileStorHandler::LockedMessage & lock);
// Thread main loop
void run(framework::ThreadHandle&) override;
bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const;
- void flushAllReplies(const document::Bucket& bucket, std::vector<MessageTracker::UP>& trackers);
-
friend class TestAndSetHelper;
bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,