summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-03-21 11:34:00 +0100
committerHenning Baldersheim <balder@oath.com>2018-04-09 14:19:49 +0200
commit96071a9312fbc01038050726b1bd7d07f44fde0b (patch)
treed2f9e22c0a29266484426dc28757dd94e8ea0561 /storage
parent37999706a2755345effcf81a38e1acd22bef0237 (diff)
Stripe the operations on bucketid to provide for a more efficient thread handover.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp73
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp33
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h19
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp657
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h163
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h1
9 files changed, 552 insertions, 413 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index c786f0bbaa8..168e9599b3d 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -615,6 +615,8 @@ FileStorManagerTest::testHandlerPriority()
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
+ uint32_t stripeId = filestorHandler.getNextStripeId(0);
+ CPPUNIT_ASSERT_EQUAL(0u, stripeId);
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -633,11 +635,11 @@ FileStorManagerTest::testHandlerPriority()
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority());
- CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority());
}
class MessagePusherThread : public document::Runnable
@@ -672,6 +674,7 @@ MessagePusherThread::~MessagePusherThread() = default;
class MessageFetchingThread : public document::Runnable {
public:
+ const uint32_t _threadId;
FileStorHandler& _handler;
std::atomic<uint32_t> _config;
uint32_t _fetchedCount;
@@ -680,12 +683,13 @@ public:
bool _threadDone;
MessageFetchingThread(FileStorHandler& handler)
- : _handler(handler), _config(0), _fetchedCount(0), _done(false),
- _failed(false), _threadDone(false) {}
+ : _threadId(handler.getNextStripeId(0)), _handler(handler), _config(0), _fetchedCount(0), _done(false),
+ _failed(false), _threadDone(false)
+ {}
void run() override {
while (!_done) {
- FileStorHandler::LockedMessage msg = _handler.getNextMessage(0);
+ FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, _threadId);
if (msg.second.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
@@ -761,8 +765,7 @@ FileStorManagerTest::testHandlerPause()
// Setup a filestorthread to test
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
// Since we fake time with small numbers, we need to make sure we dont
@@ -774,6 +777,7 @@ FileStorManagerTest::testHandlerPause()
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
+ uint32_t stripeId = filestorHandler.getNextStripeId(0);
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -781,8 +785,7 @@ FileStorManagerTest::testHandlerPause()
Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId());
// Populate bucket with the given data
for (uint32_t i = 1; i < 6; i++) {
@@ -793,15 +796,15 @@ FileStorManagerTest::testHandlerPause()
filestorHandler.schedule(cmd, 0);
}
- CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL);
+ CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, stripeId).second.get() == NULL);
}
- CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority());
+ CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, stripeId).second->getPriority());
}
namespace {
@@ -898,9 +901,9 @@ FileStorManagerTest::testHandlerMulti()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister());
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
+ uint32_t stripeId = filestorHandler.getNextStripeId(0);
std::string content("Here is some content which is in all documents");
@@ -921,21 +924,21 @@ FileStorManagerTest::testHandlerMulti()
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 0);
CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock);
+ lock = filestorHandler.getNextMessage(0, stripeId, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock);
+ lock = filestorHandler.getNextMessage(0, stripeId, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second));
}
{
- FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0);
+ FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId);
CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second));
- lock = filestorHandler.getNextMessage(0, lock);
+ lock = filestorHandler.getNextMessage(0, stripeId, lock);
CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second));
}
}
@@ -959,9 +962,9 @@ FileStorManagerTest::testHandlerTimeout()
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(),
- _node->getComponentRegister());
+ FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50);
+ uint32_t stripeId = filestorHandler.getNextStripeId(0);
std::string content("Here is some content which is in all documents");
std::ostringstream uri;
@@ -969,16 +972,12 @@ FileStorManagerTest::testHandlerTimeout()
Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release());
document::BucketIdFactory factory;
- document::BucketId bucket(16, factory.getBucketId(
- doc->getId()).getRawId());
+ document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId());
// Populate bucket with the given data
{
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(0);
cmd->setTimeout(50);
@@ -986,11 +985,8 @@ FileStorManagerTest::testHandlerTimeout()
}
{
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bucket), doc, 100));
- std::unique_ptr<api::StorageMessageAddress> address(
- new api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 3));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100);
+ auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(200);
cmd->setTimeout(10000);
@@ -999,7 +995,7 @@ FileStorManagerTest::testHandlerTimeout()
FastOS_Thread::Sleep(51);
for (;;) {
- auto lock = filestorHandler.getNextMessage(0);
+ auto lock = filestorHandler.getNextMessage(0, stripeId);
if (lock.first.get()) {
CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority());
break;
@@ -1008,8 +1004,7 @@ FileStorManagerTest::testHandlerTimeout()
CPPUNIT_ASSERT_EQUAL(size_t(1), top.getNumReplies());
CPPUNIT_ASSERT_EQUAL(api::ReturnCode::TIMEOUT,
- static_cast<api::StorageReply&>(*top.getReply(0))
- .getResult().getResult());
+ static_cast<api::StorageReply&>(*top.getReply(0)).getResult().getResult());
}
void
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index f5f190ad718..1a0b549ceed 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -67,8 +67,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
{
DummyStorageLink top;
DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(
- dummyManager = new DummyStorageLink));
+ top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
top.open();
ForwardingMessageSender messageSender(*dummyManager);
@@ -78,6 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
loadTypes.getMetricLoadTypes(), 1);
FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
+ uint32_t stripeId = filestorHandler.getNextStripeId(0);
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
@@ -87,12 +87,12 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
filestorHandler.schedule(createPut(1234, 1), 0);
filestorHandler.schedule(createPut(5432, 0), 0);
- auto lock0 = filestorHandler.getNextMessage(0);
+ auto lock0 = filestorHandler.getNextMessage(0, stripeId);
CPPUNIT_ASSERT(lock0.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0);
+ auto lock1 = filestorHandler.getNextMessage(0, stripeId);
CPPUNIT_ASSERT(lock1.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 949ceb901e1..b067239a6f7 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -4,11 +4,16 @@
namespace storage {
-FileStorHandler::FileStorHandler(MessageSender& sender,
- FileStorMetrics& metrics,
- const spi::PartitionStateList& partitions,
- ServiceLayerComponentRegister& compReg)
- : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg))
+FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics,
+ const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg)
+ : _impl(new FileStorHandlerImpl(1, sender, metrics, partitions, compReg))
+{
+}
+
+
+FileStorHandler::FileStorHandler(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics,
+ const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg)
+ : _impl(new FileStorHandlerImpl(numStripes, sender, metrics, partitions, compReg))
{
}
@@ -54,15 +59,15 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread)
}
FileStorHandler::LockedMessage
-FileStorHandler::getNextMessage(uint16_t thread)
+FileStorHandler::getNextMessage(uint16_t thread, uint32_t stripeId)
{
- return _impl->getNextMessage(thread);
+ return _impl->getNextMessage(thread, stripeId);
}
FileStorHandler::LockedMessage &
-FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck)
+FileStorHandler::getNextMessage(uint16_t thread, uint32_t stripeId, LockedMessage& lck)
{
- return _impl->getNextMessage(thread, lck);
+ return _impl->getNextMessage(thread, stripeId, lck);
}
FileStorHandler::BucketLockInterface::SP
@@ -72,12 +77,9 @@ FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk)
}
void
-FileStorHandler::remapQueueAfterDiskMove(
- const document::Bucket& bucket,
- uint16_t sourceDisk, uint16_t targetDisk)
+FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket, uint16_t sourceDisk, uint16_t targetDisk)
{
RemapInfo target(bucket, targetDisk);
-
_impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE);
}
@@ -153,6 +155,11 @@ FileStorHandler::getNumActiveMerges() const
return _impl->getNumActiveMerges();
}
+uint32_t
+FileStorHandler::getNextStripeId(uint32_t disk) {
+ return _impl->getNextStripeId(disk);
+}
+
void
FileStorHandler::clearMergeStatus(const document::Bucket& bucket,
const api::ReturnCode& code)
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 2cfc97ca71b..0477845f3aa 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -62,8 +62,7 @@ public:
virtual ~BucketLockInterface() {};
};
- typedef std::pair<BucketLockInterface::SP,
- api::StorageMessage::SP> LockedMessage;
+ typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage;
enum DiskState {
AVAILABLE,
@@ -71,7 +70,10 @@ public:
CLOSED
};
- FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&);
+ FileStorHandler(uint32_t numStripes, MessageSender&, FileStorMetrics&,
+ const spi::PartitionStateList&, ServiceLayerComponentRegister&);
+ FileStorHandler(MessageSender&, FileStorMetrics&,
+ const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandler();
// Commands used by file stor manager
@@ -116,12 +118,12 @@ public:
*
* @param disk The disk to get messages for
*/
- LockedMessage getNextMessage(uint16_t disk);
+ LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
/**
* Returns the next message for the same bucket.
*/
- LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock);
+ LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lock);
/**
* Lock a bucket. By default, each file stor thread has the locks of all
@@ -229,6 +231,13 @@ public:
*/
uint32_t getNumActiveMerges() const;
+ /**
+ * Provides the next stripe id for a certain disk.
+ * @param disk
+ * @return
+ */
+ uint32_t getNextStripeId(uint32_t disk);
+
/** Removes the merge status for the given bucket. */
void clearMergeStatus(const document::Bucket&);
void clearMergeStatus(const document::Bucket&, const api::ReturnCode&);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index d282b0ee3ed..01e42133bac 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -23,17 +23,21 @@ using document::BucketSpace;
namespace storage {
-FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
+FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics,
const spi::PartitionStateList& partitions,
ServiceLayerComponentRegister& compReg)
: _partitions(partitions),
_component(compReg, "filestorhandlerimpl"),
- _diskInfo(_component.getDiskCount()),
+ _diskInfo(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
_getNextMessageTimeout(100),
_paused(false)
{
+ _diskInfo.reserve(_component.getDiskCount());
+ for (uint32_t i(0); i < _component.getDiskCount(); i++) {
+ _diskInfo.emplace_back(*this, sender, numStripes);
+ }
for (uint32_t i=0; i<_diskInfo.size(); ++i) {
_diskInfo[i].metrics = metrics.disks[i].get();
assert(_diskInfo[i].metrics != 0);
@@ -131,13 +135,7 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
{
for (uint32_t i=0; i<_diskInfo.size(); ++i) {
LOG(debug, "Wait until queues and bucket locks released for disk '%d'", i);
- Disk& t(_diskInfo[i]);
- vespalib::MonitorGuard lockGuard(t.lock);
- while (t.getQueueSize() != 0 || !t.lockedBuckets.empty()) {
- LOG(debug, "Still %d in queue and %ld locked buckets for disk '%d'",
- t.getQueueSize(), t.lockedBuckets.size(), i);
- lockGuard.wait(100);
- }
+ _diskInfo[i].flush();
LOG(debug, "All queues and bucket locks released for disk '%d'", i);
}
@@ -178,20 +176,15 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, DiskState state) const
}
void
-FileStorHandlerImpl::setDiskState(uint16_t disk, DiskState state)
+FileStorHandlerImpl::setDiskState(uint16_t diskId, DiskState state)
{
- Disk& t(_diskInfo[disk]);
- vespalib::MonitorGuard lockGuard(t.lock);
+ Disk& disk = _diskInfo[diskId];
// Mark disk closed
- t.setState(state);
+ disk.setState(state);
if (state != FileStorHandler::AVAILABLE) {
- while (t.queue.begin() != t.queue.end()) {
- reply(*t.queue.begin()->_command, state);
- t.queue.erase(t.queue.begin());
- }
+ disk.flush();
}
- lockGuard.broadcast();
}
FileStorHandler::DiskState
@@ -209,9 +202,7 @@ FileStorHandlerImpl::close()
setDiskState(i, FileStorHandler::CLOSED);
}
LOG(debug, "Closing disk[%d]", i);
- Disk& t(_diskInfo[i]);
- vespalib::MonitorGuard lockGuard(t.lock);
- lockGuard.broadcast();
+ _diskInfo[i].broadcast();
LOG(debug, "Closed disk[%d]", i);
}
}
@@ -219,38 +210,23 @@ FileStorHandlerImpl::close()
uint32_t
FileStorHandlerImpl::getQueueSize() const
{
- uint32_t count = 0;
- for (uint32_t i=0; i<_diskInfo.size(); ++i) {
- const Disk& t(_diskInfo[i]);
- vespalib::MonitorGuard lockGuard(t.lock);
- count += t.getQueueSize();
+ size_t count = 0;
+ for (const auto & disk : _diskInfo) {
+ count += disk.getQueueSize();
}
return count;
}
bool
-FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t disk)
+FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, uint16_t diskId)
{
- assert(disk < _diskInfo.size());
- Disk& t(_diskInfo[disk]);
- MessageEntry messageEntry(msg, getStorageMessageBucket(*msg));
- vespalib::MonitorGuard lockGuard(t.lock);
-
- if (t.getState() == FileStorHandler::AVAILABLE) {
- MBUS_TRACE(msg->getTrace(), 5, vespalib::make_string(
- "FileStorHandler: Operation added to disk %d's queue with priority %u", disk, msg->getPriority()));
-
- t.queue.emplace_back(std::move(messageEntry));
- LOG(spam, "Queued operation %s with priority %u.", msg->getType().toString().c_str(), msg->getPriority());
- lockGuard.broadcast();
- } else {
- return false;
- }
- return true;
+ assert(diskId < _diskInfo.size());
+ Disk& disk(_diskInfo[diskId]);
+ return disk.schedule(msg);
}
bool
-FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const
+FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg)
{
if (msg.getType().isReply()) {
return false;
@@ -281,50 +257,14 @@ FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const
void
FileStorHandlerImpl::abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd)
{
- Disk& t(disk);
- vespalib::MonitorGuard diskLock(t.lock);
- typedef PriorityQueue::iterator iter_t;
api::ReturnCode abortedCode(api::ReturnCode::ABORTED,
"Sending distributor no longer owns bucket operation was bound to, "
"or storage node went down");
- for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) {
- api::StorageMessage& msg(*it->_command);
- if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) {
- LOG(debug, "Aborting operation %s as it is bound for bucket %s",
- msg.toString().c_str(), it->_bucket.getBucketId().toString().c_str());
- std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply();
- msgReply->setResult(abortedCode);
- _messageSender.sendReply(msgReply);
-
- it = t.queue.erase(it);
- } else {
- ++it;
- }
- }
-}
-
-bool
-FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(const Disk& disk,
- const AbortBucketOperationsCommand& cmd) const
-{
- for (auto& lockedBucket : disk.lockedBuckets) {
- if (cmd.shouldAbort(lockedBucket.first)) {
- LOG(spam, "Disk had active operation for aborted bucket %s, waiting for it to complete...",
- lockedBucket.first.toString().c_str());
- return true;
- }
- }
- return false;
-}
-
-void
-FileStorHandlerImpl::waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd)
-{
- vespalib::MonitorGuard guard(disk.lock);
- while (diskHasActiveOperationForAbortedBucket(disk, cmd)) {
- guard.wait();
+ auto aborted = disk.abort(cmd);
+ for (auto & msgReply : aborted) {
+ msgReply->setResult(abortedCode);
+ _messageSender.sendReply(msgReply);
}
- guard.broadcast();
}
void
@@ -332,82 +272,46 @@ FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& c
{
// Do queue clearing and active operation waiting in two passes
// to allow disk threads to drain running operations in parallel.
- for (uint32_t i = 0; i < _diskInfo.size(); ++i) {
- abortQueuedCommandsForBuckets(_diskInfo[i], cmd);
+ for (Disk & disk : _diskInfo) {
+ abortQueuedCommandsForBuckets(disk, cmd);
}
- for (uint32_t i = 0; i < _diskInfo.size(); ++i) {
- waitUntilNoActiveOperationsForAbortedBuckets(_diskInfo[i], cmd);
+ for (Disk & disk : _diskInfo) {
+ disk.waitInactive(cmd);
}
}
void
FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
{
- for (uint32_t i=0; i<_diskInfo.size(); ++i) {
- const Disk& t(_diskInfo[i]);
- vespalib::MonitorGuard lockGuard(t.lock);
- t.metrics->pendingMerges.addValue(_mergeStates.size());
- t.metrics->queueSize.addValue(t.getQueueSize());
+ for (Disk & disk : _diskInfo) {
+ vespalib::MonitorGuard lockGuard(_mergeStatesLock);
+ disk.metrics->pendingMerges.addValue(_mergeStates.size());
+ disk.metrics->queueSize.addValue(disk.getQueueSize());
}
}
+uint32_t
+FileStorHandlerImpl::getNextStripeId(uint32_t disk) {
+ return _diskInfo[disk].getNextStripeId();
+}
+
+
FileStorHandler::LockedMessage &
-FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck)
+FileStorHandlerImpl::getNextMessage(uint16_t diskId, uint32_t stripeId, FileStorHandler::LockedMessage& lck)
{
document::Bucket bucket(lck.first->getBucket());
- LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str());
-
- assert(disk < _diskInfo.size());
- Disk& t(_diskInfo[disk]);
-
- if (getDiskState(disk) == FileStorHandler::CLOSED) {
- lck.second.reset();
- return lck;
- }
+ LOG(spam, "Disk %d retrieving message for buffered bucket %s", diskId, bucket.getBucketId().toString().c_str());
- vespalib::MonitorGuard lockGuard(t.lock);
- BucketIdx& idx = bmi::get<2>(t.queue);
- std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket);
+ assert(diskId < _diskInfo.size());
+ Disk& disk(_diskInfo[diskId]);
- // No more for this bucket.
- if (range.first == range.second) {
+ if (disk.isClosed()) {
lck.second.reset();
return lck;
}
- api::StorageMessage & m(*range.first->_command);
- mbus::Trace& trace = m.getTrace();
-
- MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket.");
-
- uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop(
- t.metrics->averageQueueWaitingTime[m.getLoadType()]));
-
- LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d",
- m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(),
- static_cast<api::StorageCommand&>(m).getTimeout());
-
- if (m.getType().isReply() ||
- waitTime < static_cast<api::StorageCommand&>(m).getTimeout())
- {
- std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command);
- idx.erase(range.first);
- lck.second.swap(msg);
- lockGuard.broadcast();
- lockGuard.unlock();
- return lck;
- } else {
- std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply();
- idx.erase(range.first);
- lockGuard.broadcast();
- lockGuard.unlock();
- msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
- _messageSender.sendReply(msgReply);
-
- lck.second.reset();
- return lck;
- }
+ return disk.getNextMessage(stripeId, lck);
}
bool
@@ -415,7 +319,7 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
{
if (isPaused()) {
// Wait a single time to see if filestor gets unpaused.
- if (getDiskState(disk) != FileStorHandler::CLOSED) {
+ if (!_diskInfo[disk].isClosed()) {
vespalib::MonitorGuard g(_pauseMonitor);
g.wait(100);
}
@@ -425,13 +329,7 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
}
bool
-FileStorHandlerImpl::diskIsClosed(uint16_t disk) const
-{
- return (getDiskState(disk) == FileStorHandler::CLOSED);
-}
-
-bool
-FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const
+FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime)
{
if (msg.getType().isReply()) {
return false; // Replies must always be processed and cannot time out.
@@ -439,15 +337,8 @@ FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint
return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout());
}
-std::unique_ptr<FileStorHandler::BucketLockInterface>
-FileStorHandlerImpl::takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard, Disk& disk,
- const document::Bucket &bucket, const api::StorageMessage& msg)
-{
- return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary());
-}
-
std::unique_ptr<api::StorageReply>
-FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const
+FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg)
{
assert(!msg.getType().isReply());
std::unique_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(msg).makeReply();
@@ -455,95 +346,30 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const
return msgReply;
}
-namespace {
- bool
- bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) {
- return (id.getBucketId().getRawId() != 0 && t.isLocked(id));
- }
-}
-
FileStorHandler::LockedMessage
-FileStorHandlerImpl::getNextMessage(uint16_t disk)
+FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId)
{
assert(disk < _diskInfo.size());
if (!tryHandlePause(disk)) {
return {}; // Still paused, return to allow tick.
}
- Disk& t(_diskInfo[disk]);
-
- vespalib::MonitorGuard lockGuard(t.lock);
- // Try to grab a message+lock, immediately retrying once after a wait
- // if none can be found and then exiting if the same is the case on the
- // second attempt. This is key to allowing the run loop to register
- // ticks at regular intervals while not busy-waiting.
- for (int attempt = 0; (attempt < 2) && ! diskIsClosed(disk); ++attempt) {
- PriorityIdx& idx(bmi::get<1>(t.queue));
- PriorityIdx::iterator iter(idx.begin()), end(idx.end());
-
- while (iter != end && bucketIsLockedOnDisk(iter->_bucket, t)) {
- iter++;
- }
- if (iter != end) {
- if (! isPaused()) {
- return getMessage(lockGuard, t, idx, iter);
- }
- }
- if (attempt == 0) {
- lockGuard.wait(_getNextMessageTimeout);
- }
- }
- return {}; // No message fetched.
-}
-
-FileStorHandler::LockedMessage
-FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter) {
-
- api::StorageMessage & m(*iter->_command);
- const uint64_t waitTime(
- const_cast<metrics::MetricTimer &>(iter->_timer).stop(t.metrics->averageQueueWaitingTime[m.getLoadType()]));
-
- mbus::Trace &trace(m.getTrace());
- MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread.");
- LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue, timeout %d",
- m.toString().c_str(), waitTime, static_cast<api::StorageCommand &>(m).getTimeout());
-
- std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
- document::Bucket bucket(iter->_bucket);
- idx.erase(iter); // iter not used after this point.
-
- if (!messageTimedOutInQueue(*msg, waitTime)) {
- auto locker = takeDiskBucketLockOwnership(guard, t, bucket, *msg);
- guard.unlock();
- MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket");
- return std::move(FileStorHandler::LockedMessage(std::move(locker), std::move(msg)));
- } else {
- std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg));
- guard.broadcast(); // XXX: needed here?
- guard.unlock();
- _messageSender.sendReply(msgReply);
- return {};
- }
+ return _diskInfo[disk].getNextMessage(stripeId, _getNextMessageTimeout);
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
-FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk)
+FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket)
{
- assert(disk < _diskInfo.size());
-
- Disk& t(_diskInfo[disk]);
- LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk);
-
- vespalib::MonitorGuard lockGuard(t.lock);
+ vespalib::MonitorGuard guard(_lock);
- while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) {
+ while (isLocked(guard, bucket)) {
LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str());
- lockGuard.wait(100);
+ guard.wait(100);
}
- auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock");
+ auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, "External lock");
- lockGuard.broadcast();
+ guard.broadcast();
return locker;
}
@@ -868,8 +694,8 @@ void
FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source,
std::vector<RemapInfo*>& targets, Operation op)
{
- BucketIdx& idx(bmi::get<2>(from.queue));
- std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(source.bucket));
+ BucketIdx& idx(from.stripe(source.bucket).exposeBucketIdx());
+ auto range(idx.equal_range(source.bucket));
std::vector<MessageEntry> entriesFound;
@@ -910,7 +736,7 @@ FileStorHandlerImpl::remapQueueNoLock(Disk& from, const RemapInfo& source,
} else {
entry._bucket = bucket;
// Move to correct disk queue if needed
- _diskInfo[targetDisk].queue.emplace_back(std::move(entry));
+ _diskInfo[targetDisk].stripe(bucket).exposeQueue().emplace_back(std::move(entry));
}
}
@@ -924,11 +750,11 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Oper
MultiLockGuard guard;
Disk& from(_diskInfo[source.diskIndex]);
- guard.addLock(from.lock, source.diskIndex);
+ guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex);
Disk& to1(_diskInfo[target.diskIndex]);
if (target.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(to1.lock, target.diskIndex);
+ guard.addLock(to1.stripe(target.bucket).exposeLock(), target.diskIndex);
}
std::vector<RemapInfo*> targets;
@@ -947,16 +773,16 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem
MultiLockGuard guard;
Disk& from(_diskInfo[source.diskIndex]);
- guard.addLock(from.lock, source.diskIndex);
+ guard.addLock(from.stripe(source.bucket).exposeLock(), source.diskIndex);
Disk& to1(_diskInfo[target1.diskIndex]);
if (target1.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(to1.lock, target1.diskIndex);
+ guard.addLock(to1.stripe(target1.bucket).exposeLock(), target1.diskIndex);
}
Disk& to2(_diskInfo[target2.diskIndex]);
if (target2.bucket.getBucketId().getRawId() != 0) {
- guard.addLock(to2.lock, target2.diskIndex);
+ guard.addLock(to2.stripe(target2.bucket).exposeLock(), target2.diskIndex);
}
guard.lock();
@@ -969,12 +795,11 @@ FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target1, Rem
}
void
-FileStorHandlerImpl::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err)
+FileStorHandlerImpl::Stripe::failOperations(const document::Bucket &bucket, const api::ReturnCode& err)
{
- Disk& from(_diskInfo[fromDisk]);
- vespalib::MonitorGuard lockGuard(from.lock);
+ vespalib::MonitorGuard guard(_lock);
- BucketIdx& idx(bmi::get<2>(from.queue));
+ BucketIdx& idx(bmi::get<2>(_queue));
std::pair<BucketIdx::iterator, BucketIdx::iterator> range(idx.equal_range(bucket));
for (auto iter = range.first; iter != range.second;) {
@@ -1032,47 +857,260 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept
FileStorHandlerImpl::MessageEntry::~MessageEntry() { }
-FileStorHandlerImpl::Disk::Disk()
- : lock(),
- queue(),
- lockedBuckets(100),
- metrics(0),
+FileStorHandlerImpl::Disk::Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numThreads)
+ : metrics(0),
+ _nextStripeId(0),
+ _stripes(numThreads, Stripe(owner, messageSender)),
state(FileStorHandler::AVAILABLE)
+{
+ assert(numThreads > 0);
+}
+
+FileStorHandlerImpl::Disk::Disk(Disk && rhs) noexcept
+ : metrics(std::move(rhs.metrics)),
+ _nextStripeId(rhs._nextStripeId),
+ _stripes(std::move(rhs._stripes)),
+ state(rhs.state.load())
{ }
-FileStorHandlerImpl::Disk::~Disk() { }
+FileStorHandlerImpl::Disk::~Disk() = default;
+FileStorHandlerImpl::Stripe::~Stripe() = default;
+
+void
+FileStorHandlerImpl::Disk::flush()
+{
+ for (auto & stripe : _stripes) {
+ stripe.flush();
+ }
+}
+
+void
+FileStorHandlerImpl::Disk::broadcast()
+{
+ for (auto & stripe : _stripes) {
+ stripe.broadcast();
+ }
+}
bool
-FileStorHandlerImpl::Disk::isLocked(const document::Bucket& bucket) const noexcept
+FileStorHandlerImpl::Disk::schedule(const std::shared_ptr<api::StorageMessage>& msg)
{
- return (lockedBuckets.find(bucket) != lockedBuckets.end());
+ MessageEntry messageEntry(msg, getStorageMessageBucket(*msg));
+ if (getState() == FileStorHandler::AVAILABLE) {
+ stripe(messageEntry._bucket).schedule(std::move(messageEntry));
+ } else {
+ return false;
+ }
+ return true;
+}
+
+FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender)
+ : _owner(owner),
+ _messageSender(messageSender)
+{ }
+FileStorHandler::LockedMessage
+FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
+{
+ vespalib::MonitorGuard guard(_lock);
+ // Try to grab a message+lock, immediately retrying once after a wait
+ // if none can be found and then exiting if the same is the case on the
+ // second attempt. This is key to allowing the run loop to register
+ // ticks at regular intervals while not busy-waiting.
+ for (int attempt = 0; (attempt < 2) && ! disk.isClosed() && !_owner.isPaused(); ++attempt) {
+ PriorityIdx& idx(bmi::get<1>(_queue));
+ PriorityIdx::iterator iter(idx.begin()), end(idx.end());
+
+ while (iter != end && isLocked(guard, iter->_bucket)) {
+ iter++;
+ }
+ if (iter != end) {
+ return getMessage(guard, disk, idx, iter);
+ }
+ if (attempt == 0) {
+ guard.wait(timeout);
+ }
+ }
+ return {}; // No message fetched.
+}
+
+FileStorHandler::LockedMessage &
+FileStorHandlerImpl::Stripe::getNextMessage(Disk & disk, FileStorHandler::LockedMessage& lck)
+{
+ const document::Bucket & bucket = lck.second->getBucket();
+ vespalib::MonitorGuard guard(_lock);
+ BucketIdx& idx = bmi::get<2>(_queue);
+ std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket);
+
+ // No more for this bucket.
+ if (range.first == range.second) {
+ lck.second.reset();
+ return lck;
+ }
+
+ api::StorageMessage & m(*range.first->_command);
+
+ uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop(disk.metrics->averageQueueWaitingTime[m.getLoadType()]));
+
+ if (!messageTimedOutInQueue(m, waitTime)) {
+ std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command);
+ idx.erase(range.first);
+ lck.second.swap(msg);
+ guard.broadcast();
+ } else {
+ std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply();
+ idx.erase(range.first);
+ guard.broadcast();
+ guard.unlock();
+ msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue"));
+ _messageSender.sendReply(msgReply);
+
+ lck.second.reset();
+ }
+ return lck;
+}
+
+FileStorHandler::LockedMessage
+FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Disk & disk, PriorityIdx & idx, PriorityIdx::iterator iter) {
+
+ api::StorageMessage & m(*iter->_command);
+ uint64_t waitTime(const_cast<metrics::MetricTimer &>(iter->_timer).stop(disk.metrics->averageQueueWaitingTime[m.getLoadType()]));
+
+ std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
+ document::Bucket bucket(iter->_bucket);
+ idx.erase(iter); // iter not used after this point.
+
+ if (!messageTimedOutInQueue(*msg, waitTime)) {
+ auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(), msg->getSummary());;
+ guard.unlock();
+ return FileStorHandler::LockedMessage(std::move(locker), std::move(msg));
+ } else {
+ std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg));
+ guard.broadcast(); // XXX: needed here?
+ guard.unlock();
+ _messageSender.sendReply(msgReply);
+ return {};
+ }
+}
+
+void
+FileStorHandlerImpl::Disk::waitUntilNoLocks() const
+{
+ for (const auto & stripe : _stripes) {
+ stripe.waitUntilNoLocks();
+ }
+}
+
+void
+FileStorHandlerImpl::Stripe::waitUntilNoLocks() const
+{
+ vespalib::MonitorGuard lockGuard(_lock);
+ while (!_lockedBuckets.empty()) {
+ lockGuard.wait();
+ }
+}
+
+void
+FileStorHandlerImpl::Disk::waitInactive(const AbortBucketOperationsCommand& cmd) const {
+ for (auto & stripe : _stripes) {
+ stripe.waitInactive(cmd);
+ }
+}
+
+void
+FileStorHandlerImpl::Stripe::waitInactive(const AbortBucketOperationsCommand& cmd) const {
+ vespalib::MonitorGuard lockGuard(_lock);
+ while (hasActive(lockGuard, cmd)) {
+ lockGuard.wait();
+ }
+}
+
+bool
+FileStorHandlerImpl::Stripe::hasActive(vespalib::MonitorGuard &, const AbortBucketOperationsCommand& cmd) const {
+ for (auto& lockedBucket : _lockedBuckets) {
+ if (cmd.shouldAbort(lockedBucket.first)) {
+ LOG(spam, "Disk had active operation for aborted bucket %s, waiting for it to complete...",
+ lockedBucket.first.toString().c_str());
+ return true;
+ }
+ }
+ return false;
+}
+
+std::vector<std::shared_ptr<api::StorageReply>>
+FileStorHandlerImpl::Disk::abort(const AbortBucketOperationsCommand& cmd)
+{
+ std::vector<std::shared_ptr<api::StorageReply>> aborted;
+ for (auto & stripe : _stripes) {
+ stripe.abort(aborted, cmd);
+ }
+ return aborted;
+}
+
+void FileStorHandlerImpl::Stripe::abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted,
+ const AbortBucketOperationsCommand& cmd)
+{
+ vespalib::MonitorGuard lockGuard(_lock);
+ for (auto it(_queue.begin()); it != _queue.end();) {
+ api::StorageMessage& msg(*it->_command);
+ if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) {
+ aborted.emplace_back(static_cast<api::StorageCommand&>(msg).makeReply());
+ it = _queue.erase(it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+bool FileStorHandlerImpl::Stripe::schedule(MessageEntry messageEntry)
+{
+ vespalib::MonitorGuard lockGuard(_lock);
+ _queue.emplace_back(std::move(messageEntry));
+ lockGuard.broadcast();
+ return true;
+}
+
+void
+FileStorHandlerImpl::Stripe::flush()
+{
+ vespalib::MonitorGuard lockGuard(_lock);
+ while (!(_queue.empty() && _lockedBuckets.empty())) {
+ LOG(debug, "Still %ld in queue and %ld locked buckets", _queue.size(), _lockedBuckets.size());
+ lockGuard.wait(100);
+ }
+}
+bool
+FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept
+{
+ return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end());
}
uint32_t
FileStorHandlerImpl::Disk::getQueueSize() const noexcept
{
- return queue.size();
+ size_t sum(0);
+ for (const auto & stripe : _stripes) {
+ sum += stripe.getQueueSize();
+ }
+ return sum;
}
uint32_t
FileStorHandlerImpl::getQueueSize(uint16_t disk) const
{
- const Disk& t(_diskInfo[disk]);
- vespalib::MonitorGuard lockGuard(t.lock);
- return t.getQueueSize();
+ return _diskInfo[disk].getQueueSize();
}
-FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Disk& disk,
+FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe,
const document::Bucket &bucket, uint8_t priority,
const vespalib::stringref & statusString)
- : _disk(disk),
+ : _stripe(stripe),
_bucket(bucket)
{
(void) guard;
if (_bucket.getBucketId().getRawId() != 0) {
// Lock the bucket and wait until it is not the current operation for
// the disk itself.
- _disk.lockedBuckets.insert(std::make_pair(_bucket, Disk::LockEntry(priority, statusString)));
+ _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, statusString));
LOG(debug, "Locked bucket %s with priority %u",
bucket.getBucketId().toString().c_str(), priority);
@@ -1086,31 +1124,73 @@ FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard
FileStorHandlerImpl::BucketLock::~BucketLock()
{
if (_bucket.getBucketId().getRawId() != 0) {
- vespalib::MonitorGuard lockGuard(_disk.lock);
- _disk.lockedBuckets.erase(_bucket);
+ _stripe.release(_bucket);
LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION_SET_LOCK_STATE(
_bucket.getBucketId(), "released filestor lock", true,
debug::BucketOperationLogger::State::BUCKET_UNLOCKED);
- lockGuard.broadcast();
}
}
std::string
-FileStorHandlerImpl::dumpQueue(uint16_t disk) const
+FileStorHandlerImpl::Disk::dumpQueue() const
+{
+ std::ostringstream os;
+ for (const Stripe & stripe : _stripes) {
+ stripe.dumpQueue(os);
+ }
+ return os.str();
+}
+
+void
+FileStorHandlerImpl::Disk::dumpQueueHtml(std::ostream & os) const
+{
+ for (const Stripe & stripe : _stripes) {
+ stripe.dumpQueueHtml(os);
+ }
+}
+
+void
+FileStorHandlerImpl::Disk::dumpActiveHtml(std::ostream & os) const
+{
+ for (const Stripe & stripe : _stripes) {
+ stripe.dumpActiveHtml(os);
+ }
+}
+
+void
+FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const
{
- std::ostringstream ost;
+ vespalib::MonitorGuard guard(_lock);
- const Disk& t(_diskInfo[disk]);
- vespalib::MonitorGuard lockGuard(t.lock);
+ const PriorityIdx& idx = bmi::get<1>(_queue);
+ for (const auto & entry : idx) {
+ os << "<li>" << entry._command->toString() << " (priority: "
+ << (int)entry._command->getPriority() << ")</li>\n";
+ }
+}
- const PriorityIdx& idx = bmi::get<1>(t.queue);
- for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) {
- ost << it->_bucket.getBucketId() << ": " << it->_command->toString() << " (priority: "
- << (int)it->_command->getPriority() << ")\n";
+void
+FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const
+{
+ uint32_t now = time(nullptr);
+ vespalib::MonitorGuard guard(_lock);
+ for (const auto & entry : _lockedBuckets) {
+ os << entry.second.statusString << " (" << entry.first.getBucketId()
+ << ") Running for " << (now - entry.second.timestamp) << " secs<br/>\n";
}
+}
+
+void
+FileStorHandlerImpl::Stripe::dumpQueue(std::ostream & os) const
+{
+ vespalib::MonitorGuard guard(_lock);
- return ost.str();
+ const PriorityIdx& idx = bmi::get<1>(_queue);
+ for (const auto & entry : idx) {
+ os << entry._bucket.getBucketId() << ": " << entry._command->toString() << " (priority: "
+ << (int)entry._command->getPriority() << ")\n";
+ }
}
void
@@ -1120,37 +1200,25 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
out << "<h1>Filestor handler</h1>\n";
for (uint32_t i=0; i<_diskInfo.size(); ++i) {
out << "<h2>Disk " << i << "</h2>\n";
- const Disk& t(_diskInfo[i]);
- vespalib::MonitorGuard lockGuard(t.lock);
- out << "Queue size: " << t.getQueueSize() << "<br>\n";
+ const Disk& disk(_diskInfo[i]);
+ out << "Queue size: " << disk.getQueueSize() << "<br>\n";
out << "Disk state: ";
- switch (t.getState()) {
- case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break;
- case FileStorHandler::DISABLED: out << "DISABLED"; break;
- case FileStorHandler::CLOSED: out << "CLOSED"; break;
+ switch (disk.getState()) {
+ case FileStorHandler::AVAILABLE: out << "AVAILABLE"; break;
+ case FileStorHandler::DISABLED: out << "DISABLED"; break;
+ case FileStorHandler::CLOSED: out << "CLOSED"; break;
}
out << "<h4>Active operations</h4>\n";
- for (const auto& lockedBucket : t.lockedBuckets) {
- out << lockedBucket.second.statusString
- << " (" << lockedBucket.first.getBucketId() << ") Running for "
- << (_component.getClock().getTimeInSeconds().getTime() - lockedBucket.second.timestamp)
- << " secs<br/>\n";
- }
+ disk.dumpActiveHtml(out);
if (!verbose) continue;
out << "<h4>Input queue</h4>\n";
-
out << "<ul>\n";
- const PriorityIdx& idx = bmi::get<1>(t.queue);
- for (PriorityIdx::const_iterator it = idx.begin(); it != idx.end(); it++) {
- out << "<li>" << it->_command->toString() << " (priority: "
- << (int)it->_command->getPriority() << ")</li>\n";
- }
+ disk.dumpQueueHtml(out);
out << "</ul>\n";
}
+ vespalib::LockGuard mergeGuard(_mergeStatesLock);
out << "<tr><td>Active merge operations</td><td>" << _mergeStates.size() << "</td></tr>\n";
-
- // Print merge states
if (verbose) {
out << "<h4>Active merges</h4>\n";
if (_mergeStates.size() == 0) {
@@ -1158,8 +1226,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
}
for (auto & entry : _mergeStates) {
out << "<b>" << entry.first.toString() << "</b><br>\n";
- // << "<p>" << it->second << "</p>\n"; // Gets very spammy with
- // the complete state here..
+ // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here..
}
}
}
@@ -1167,12 +1234,8 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
void
FileStorHandlerImpl::waitUntilNoLocks()
{
- for (uint32_t i=0; i<_diskInfo.size(); ++i) {
- const Disk& t(_diskInfo[i]);
- vespalib::MonitorGuard lockGuard(t.lock);
- while (!t.lockedBuckets.empty()) {
- lockGuard.wait();
- }
+ for (const auto & disk : _diskInfo) {
+ disk.waitUntilNoLocks();
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index fb084c41b7c..dd3dc01d69f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -71,29 +71,73 @@ public:
using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type;
using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type;
- struct Disk {
- vespalib::Monitor lock;
- PriorityQueue queue;
+ class Disk;
+ class Stripe {
+ public:
struct LockEntry {
- uint32_t timestamp;
- uint8_t priority;
+ uint32_t timestamp;
+ uint8_t priority;
vespalib::string statusString;
- LockEntry()
- : timestamp(0), priority(0), statusString()
- { }
+ LockEntry() : timestamp(0), priority(0), statusString() { }
LockEntry(uint8_t priority_, vespalib::stringref status)
- : timestamp(time(NULL)),
- priority(priority_),
- statusString(status)
+ : timestamp(time(nullptr)), priority(priority_), statusString(status)
{ }
};
+ Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender);
+ ~Stripe();
+ void flush();
+ bool schedule(MessageEntry messageEntry);
+ void waitUntilNoLocks() const;
+ void abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, const AbortBucketOperationsCommand& cmd);
+ void waitInactive(const AbortBucketOperationsCommand& cmd) const;
+
+ void broadcast() {
+ vespalib::MonitorGuard guard(_lock);
+ guard.broadcast();
+ }
+ size_t getQueueSize() const {
+ vespalib::MonitorGuard guard(_lock);
+ return _queue.size();
+ }
+ void release(const document::Bucket & bucket){
+ vespalib::MonitorGuard guard(_lock);
+ _lockedBuckets.erase(bucket);
+ guard.broadcast();
+ }
+ bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept;
+
+ void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) {
+ _lockedBuckets.insert(std::make_pair(bucket, lockEntry));
+ }
+
+ std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket);
+ void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
+
+ FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk);
+ FileStorHandler::LockedMessage & getNextMessage(Disk & disk, FileStorHandler::LockedMessage& lock);
+ void dumpQueue(std::ostream & os) const;
+ void dumpActiveHtml(std::ostream & os) const;
+ void dumpQueueHtml(std::ostream & os) const;
+ vespalib::Monitor & exposeLock() { return _lock; }
+ PriorityQueue & exposeQueue() { return _queue; }
+ BucketIdx & exposeBucketIdx() { return bmi::get<2>(_queue); }
+ private:
+ bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const;
+ FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t,
+ PriorityIdx & idx, PriorityIdx::iterator iter);
typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets;
- LockedBuckets lockedBuckets;
- FileStorDiskMetrics* metrics;
+ const FileStorHandlerImpl &_owner;
+ MessageSender &_messageSender;
+ vespalib::Monitor _lock;
+ PriorityQueue _queue;
+ LockedBuckets _lockedBuckets;
+ };
+ struct Disk {
+ FileStorDiskMetrics * metrics;
/**
* No assumption on memory ordering around disk state reads should
@@ -110,29 +154,61 @@ public:
state.store(s, std::memory_order_relaxed);
}
- Disk();
+ Disk(const FileStorHandlerImpl & owner, MessageSender & messageSender, uint32_t numThreads);
+ Disk(Disk &&) noexcept;
~Disk();
- bool isLocked(const document::Bucket&) const noexcept;
+ bool isClosed() const noexcept { return getState() == DiskState::CLOSED; }
+
+ void flush();
+ void broadcast();
+ bool schedule(const std::shared_ptr<api::StorageMessage>& msg);
+ void waitUntilNoLocks() const;
+ std::vector<std::shared_ptr<api::StorageReply>> abort(const AbortBucketOperationsCommand& cmd);
+ void waitInactive(const AbortBucketOperationsCommand& cmd) const;
+ FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, uint32_t timeout) {
+ assert(_nextStripeId >= _stripes.size());
+ return _stripes[stripeId].getNextMessage(timeout, *this);
+ }
+ FileStorHandler::LockedMessage & getNextMessage(uint32_t stripeId, FileStorHandler::LockedMessage & lck) {
+ return _stripes[stripeId].getNextMessage(*this, lck);
+ }
+ std::shared_ptr<FileStorHandler::BucketLockInterface>
+ lock(const document::Bucket & bucket) {
+ return stripe(bucket).lock(bucket);
+ }
+ void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
+ stripe(bucket).failOperations(bucket, code);
+ }
+
uint32_t getQueueSize() const noexcept;
+ uint32_t getNextStripeId() { return (_nextStripeId++)%_stripes.size(); }
+ std::string dumpQueue() const;
+ void dumpActiveHtml(std::ostream & os) const;
+ void dumpQueueHtml(std::ostream & os) const;
+ Stripe & stripe(const document::Bucket & bucket) {
+ return _stripes[bucket.getBucketId().getRawId()%_stripes.size()];
+ }
private:
+ uint32_t _nextStripeId;
+ std::vector<Stripe> _stripes;
std::atomic<DiskState> state;
};
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
- BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::Bucket &bucket, uint8_t priority,
- const vespalib::stringref & statusString);
+ BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket,
+ uint8_t priority, const vespalib::stringref & statusString);
~BucketLock();
const document::Bucket &getBucket() const override { return _bucket; }
private:
- Disk& _disk;
+ Stripe & _stripe;
document::Bucket _bucket;
};
- FileStorHandlerImpl(MessageSender&, FileStorMetrics&,
+ FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&,
const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandlerImpl();
@@ -144,17 +220,18 @@ public:
void close();
bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
- FileStorHandler::LockedMessage getNextMessage(uint16_t disk);
- FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter);
+ FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId);
- FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock);
+ FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, FileStorHandler::LockedMessage& lock);
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op);
- void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&);
+ void failOperations(const document::Bucket & bucket, uint16_t disk, const api::ReturnCode & code) {
+ _diskInfo[disk].failOperations(bucket, code);
+ }
void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
void sendReply(const std::shared_ptr<api::StorageReply>&) override;
@@ -162,9 +239,12 @@ public:
uint32_t getQueueSize() const;
uint32_t getQueueSize(uint16_t disk) const;
+ uint32_t getNextStripeId(uint32_t disk);
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket&, uint16_t disk);
+ lock(const document::Bucket & bucket, uint16_t disk) {
+ return _diskInfo[disk].lock(bucket);
+ }
void addMergeStatus(const document::Bucket&, MergeStatus::SP);
MergeStatus& editMergeStatus(const document::Bucket&);
@@ -172,7 +252,9 @@ public:
uint32_t getNumActiveMerges() const;
void clearMergeStatus(const document::Bucket&, const api::ReturnCode*);
- std::string dumpQueue(uint16_t disk) const;
+ std::string dumpQueue(uint16_t disk) const {
+ return _diskInfo[disk].dumpQueue();
+ }
ResumeGuard pause();
void resume() override;
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd);
@@ -213,44 +295,25 @@ private:
bool isPaused() const { return _paused.load(std::memory_order_relaxed); }
/**
- * Return whether a disk has been shut down by the system (IO failure is
- * the most likely candidate here) and should not serve any more requests.
- */
- bool diskIsClosed(uint16_t disk) const;
-
- /**
* Return whether msg has timed out based on waitTime and the message's
* specified timeout.
*/
- bool messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const;
-
- /**
- * Assume ownership of lock for a given bucket on a given disk.
- * Disk lock MUST have been taken prior to calling this function.
- */
- std::unique_ptr<FileStorHandler::BucketLockInterface>
- takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard,
- Disk& disk, const document::Bucket &bucket, const api::StorageMessage& msg);
+ static bool messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime);
/**
* Creates and returns a reply with api::TIMEOUT return code for msg.
* Swaps (invalidates) context from msg into reply.
*/
- std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const;
- bool messageMayBeAborted(const api::StorageMessage& msg) const;
+ static std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg);
+ static bool messageMayBeAborted(const api::StorageMessage& msg);
void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
- bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const;
- void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd);
// Update hook
void updateMetrics(const MetricLockGuard &) override;
- document::Bucket remapMessage(api::StorageMessage& msg,
- const document::Bucket &source,
- Operation op,
- std::vector<RemapInfo*>& targets,
- uint16_t& targetDisk,
- api::ReturnCode& returnCode);
+ document::Bucket
+ remapMessage(api::StorageMessage& msg, const document::Bucket &source, Operation op,
+ std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode);
void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index b4735c2961a..1c2e415c39c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -116,7 +116,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
size_t numThreads = _config->numThreads;
_metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads);
- _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg));
+ _filestorHandler.reset(new FileStorHandler(std::min(numThreads, numThreads), *this, *_metrics, _partitions, _compReg));
for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
if (_partitions[i].isUp()) {
LOG(spam, "Setting up disk %u", i);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index be6f9577642..130fb91e199 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -21,7 +21,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
uint16_t deviceIndex)
- : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider),
+ : _threadId(filestorHandler.getNextStripeId(deviceIndex)),
+ _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider),
_warnOnSlowOperations(5000),
_spi(provider),
_processAllHandler(_env, provider),
@@ -33,7 +34,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
_closed(false)
{
std::ostringstream threadName;
- threadName << "Disk " << _env._partition << " thread " << (void*) this;
+ threadName << "Disk " << _env._partition << " thread " << _threadId;
_component.reset(new ServiceLayerComponent(compReg, threadName.str()));
_bucketOwnershipNotifier.reset(new BucketOwnershipNotifier(*_component, filestorHandler));
framework::MilliSecTime maxProcessingTime(60 * 1000);
@@ -1063,7 +1064,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
trackers.push_back(std::move(tracker));
if (trackers.back()->getReply()->getResult().success()) {
- _env._fileStorHandler.getNextMessage(_env._partition, lock);
+ _env._fileStorHandler.getNextMessage(_env._partition, _threadId, lock);
} else {
break;
}
@@ -1087,7 +1088,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) {
thread.registerTick();
- FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition));
+ FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _threadId));
if (lock.first) {
processMessages(lock);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 640614f7edf..c001a52a6be 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -47,6 +47,7 @@ public:
MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd);
private:
+ uint32_t _threadId;
PersistenceUtil _env;
uint32_t _warnOnSlowOperations;
spi::PersistenceProvider& _spi;