summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp136
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h47
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp7
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp10
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h16
7 files changed, 169 insertions, 56 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 74baecbf026..0da0fd5ce66 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -71,9 +71,9 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage&
}
FileStorHandler::BucketLockInterface::SP
-FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk)
+FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq)
{
- return _impl->lock(bucket, disk);
+ return _impl->lock(bucket, disk, lockReq);
}
void
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index b74765b17d2..02c959df2f0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -58,8 +58,9 @@ public:
typedef std::shared_ptr<BucketLockInterface> SP;
virtual const document::Bucket &getBucket() const = 0;
+ virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
- virtual ~BucketLockInterface() {};
+ virtual ~BucketLockInterface() = default;
};
typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage;
@@ -139,7 +140,7 @@ public:
*
*
*/
- BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk);
+ BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq);
/**
* Called by FileStorThread::onBucketDiskMove() after moving file, in case
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index a01881b6fbe..1cae13ea95c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -370,16 +370,16 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId)
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
-FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket)
-{
+FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) {
vespalib::MonitorGuard guard(_lock);
- while (isLocked(guard, bucket)) {
- LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str());
+ while (isLocked(guard, bucket, lockReq)) {
+ LOG(spam, "Contending for filestor lock for %s with %s access",
+ bucket.getBucketId().toString().c_str(), api::to_string(lockReq));
guard.wait(100);
}
- auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0);
+ auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0, lockReq);
guard.broadcast();
return locker;
@@ -388,9 +388,9 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket)
namespace {
struct MultiLockGuard {
std::map<uint16_t, vespalib::Monitor*> monitors;
- std::vector<std::shared_ptr<vespalib::MonitorGuard> > guards;
+ std::vector<std::shared_ptr<vespalib::MonitorGuard>> guards;
- MultiLockGuard() {}
+ MultiLockGuard() = default;
void addLock(vespalib::Monitor& monitor, uint16_t index) {
monitors[index] = &monitor;
@@ -931,7 +931,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
PriorityIdx& idx(bmi::get<1>(_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
- while (iter != end && isLocked(guard, iter->_bucket)) {
+ while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) {
iter++;
}
if (iter != end) {
@@ -959,6 +959,11 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck)
}
api::StorageMessage & m(*range.first->_command);
+ // We don't allow batching of operations across lock requirement modes.
+ if (lck.first->lockingRequirements() != m.lockingRequirements()) {
+ lck.second.reset();
+ return lck;
+ }
uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]));
@@ -992,7 +997,8 @@ FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Priority
if (!messageTimedOutInQueue(*msg, waitTime)) {
auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(),
- msg->getType().getId(), msg->getMsgId());
+ msg->getType().getId(), msg->getMsgId(),
+ msg->lockingRequirements());
guard.unlock();
return FileStorHandler::LockedMessage(std::move(locker), std::move(msg));
} else {
@@ -1090,10 +1096,65 @@ FileStorHandlerImpl::Stripe::flush()
lockGuard.wait(100);
}
}
+
+void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
+ api::LockingRequirements reqOfReleasedLock,
+ api::StorageMessage::Id lockMsgId) {
+ vespalib::MonitorGuard guard(_lock);
+ auto iter = _lockedBuckets.find(bucket);
+ assert(iter != _lockedBuckets.end());
+ auto& entry = iter->second;
+
+ if (reqOfReleasedLock == api::LockingRequirements::Exclusive) {
+ assert(entry._exclusiveLock);
+ assert(entry._exclusiveLock->msgId == lockMsgId);
+ entry._exclusiveLock.reset();
+ } else {
+ assert(!entry._exclusiveLock);
+ auto shared_iter = entry._sharedLocks.find(lockMsgId);
+ assert(shared_iter != entry._sharedLocks.end());
+ entry._sharedLocks.erase(shared_iter);
+ }
+
+ if (!entry._exclusiveLock && entry._sharedLocks.empty()) {
+ _lockedBuckets.erase(iter); // No more locks held
+ }
+ guard.broadcast();
+}
+
+void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const document::Bucket & bucket,
+ api::LockingRequirements lockReq, const LockEntry & lockEntry) {
+ auto& entry = _lockedBuckets[bucket];
+ assert(!entry._exclusiveLock);
+ if (lockReq == api::LockingRequirements::Exclusive) {
+ assert(entry._sharedLocks.empty());
+ entry._exclusiveLock = lockEntry;
+ } else {
+ // TODO use a hash set with a custom comparator/hasher instead...?
+ auto inserted = entry._sharedLocks.insert(std::make_pair(lockEntry.msgId, lockEntry));
+ (void) inserted;
+ assert(inserted.second);
+ }
+}
+
bool
-FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept
+FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket,
+ api::LockingRequirements lockReq) const noexcept
{
- return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end());
+ if (bucket.getBucketId().getRawId() == 0) {
+ return false;
+ }
+ auto iter = _lockedBuckets.find(bucket);
+ if (iter == _lockedBuckets.end()) {
+ return false;
+ }
+ if (iter->second._exclusiveLock) {
+ return true;
+ }
+ // Shared locks can be taken alongside other shared locks, but exclusive locks
+ // require that no shared locks are currently present.
+ return ((lockReq == api::LockingRequirements::Exclusive)
+ && !iter->second._sharedLocks.empty());
}
uint32_t
@@ -1114,33 +1175,26 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const
FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe,
const document::Bucket &bucket, uint8_t priority,
- api::MessageType::Id msgType, api::StorageMessage::Id msgId)
+ api::MessageType::Id msgType, api::StorageMessage::Id msgId,
+ api::LockingRequirements lockReq)
: _stripe(stripe),
- _bucket(bucket)
+ _bucket(bucket),
+ _uniqueMsgId(msgId),
+ _lockReq(lockReq)
{
- (void) guard;
if (_bucket.getBucketId().getRawId() != 0) {
- // Lock the bucket and wait until it is not the current operation for
- // the disk itself.
- _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, msgType, msgId));
- LOG(debug, "Locked bucket %s with priority %u",
- bucket.getBucketId().toString().c_str(), priority);
-
- LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _bucket.getBucketId(), "acquired filestor lock", false,
- debug::BucketOperationLogger::State::BUCKET_LOCKED);
+ _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId));
+ LOG(debug, "Locked bucket %s for message %zu with priority %u in mode %s",
+ bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq));
}
}
-FileStorHandlerImpl::BucketLock::~BucketLock()
-{
+FileStorHandlerImpl::BucketLock::~BucketLock() {
if (_bucket.getBucketId().getRawId() != 0) {
- _stripe.release(_bucket);
- LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str());
- LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _bucket.getBucketId(), "released filestor lock", true,
- debug::BucketOperationLogger::State::BUCKET_UNLOCKED);
+ _stripe.release(_bucket, _lockReq, _uniqueMsgId);
+ LOG(debug, "Unlocked bucket %s for message %zu in mode %s",
+ _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq));
}
}
@@ -1182,14 +1236,31 @@ FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const
}
}
+namespace {
+
+void dump_lock_entry(const document::BucketId& bucketId, const FileStorHandlerImpl::Stripe::LockEntry& entry,
+ api::LockingRequirements lock_mode, uint32_t now_ts, std::ostream& os) {
+ os << api::MessageType::get(entry.msgType).getName() << ":" << entry.msgId << " ("
+ << bucketId << ", " << api::to_string(lock_mode)
+ << " lock) Running for " << (now_ts - entry.timestamp) << " secs<br/>\n";
+}
+
+}
+
void
FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const
{
uint32_t now = time(nullptr);
vespalib::MonitorGuard guard(_lock);
for (const auto & e : _lockedBuckets) {
- os << api::MessageType::get(e.second.msgType).getName() << ":" << e.second.msgId << " (" << e.first.getBucketId()
- << ") Running for " << (now - e.second.timestamp) << " secs<br/>\n";
+ if (e.second._exclusiveLock) {
+ dump_lock_entry(e.first.getBucketId(), *e.second._exclusiveLock,
+ api::LockingRequirements::Exclusive, now, os);
+ }
+ for (const auto& shared : e.second._sharedLocks) {
+ dump_lock_entry(e.first.getBucketId(), shared.second,
+ api::LockingRequirements::Shared, now, os);
+ }
}
}
@@ -1238,7 +1309,6 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
}
for (auto & entry : _mergeStates) {
out << "<b>" << entry.first.toString() << "</b><br>\n";
- // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here..
}
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 45ac5ded47f..f9dcca4315b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -30,6 +30,7 @@
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <atomic>
+#include <optional>
namespace storage {
@@ -82,13 +83,19 @@ public:
api::MessageType::Id msgType;
api::StorageMessage::Id msgId;
-
LockEntry() : timestamp(0), priority(0), msgType(), msgId(0) { }
LockEntry(uint8_t priority_, api::MessageType::Id msgType_, api::StorageMessage::Id msgId_)
: timestamp(time(nullptr)), priority(priority_), msgType(msgType_), msgId(msgId_)
{ }
};
+
+ struct MultiLockEntry {
+ std::optional<LockEntry> _exclusiveLock;
+ using SharedLocks = vespalib::hash_map<api::StorageMessage::Id, LockEntry>;
+ SharedLocks _sharedLocks;
+ };
+
Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender);
~Stripe();
void flush();
@@ -105,19 +112,16 @@ public:
vespalib::MonitorGuard guard(_lock);
return _queue.size();
}
- void release(const document::Bucket & bucket){
- vespalib::MonitorGuard guard(_lock);
- _lockedBuckets.erase(bucket);
- guard.broadcast();
- }
+ void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock,
+ api::StorageMessage::Id lockMsgId);
- bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept;
+ bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&,
+ api::LockingRequirements lockReq) const noexcept;
- void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) {
- _lockedBuckets.insert(std::make_pair(bucket, lockEntry));
- }
+ void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket,
+ api::LockingRequirements lockReq, const LockEntry & lockEntry);
- std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket);
+ std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk);
@@ -131,9 +135,11 @@ public:
void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; }
private:
bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const;
+ // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
+ // with its locking requirements.
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx,
PriorityIdx::iterator iter);
- typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets;
+ using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
FileStorStripeMetrics *_metrics;
@@ -178,8 +184,8 @@ public:
return _stripes[stripeId].getNextMessage(lck);
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket) {
- return stripe(bucket).lock(bucket);
+ lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
+ return stripe(bucket).lock(bucket, lockReq);
}
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
stripe(bucket).failOperations(bucket, code);
@@ -194,7 +200,7 @@ public:
// Disperse bucket bits by multiplying with the 64-bit FNV-1 prime.
// This avoids an inherent affinity between the LSB of a bucket's bits
// and the stripe an operation ends up on.
- return bucket.getBucketId().getRawId() * 1099511628211ULL;
+ return bucket.getBucketId().getId() * 1099511628211ULL;
}
Stripe & stripe(const document::Bucket & bucket) {
return _stripes[dispersed_bucket_bits(bucket) % _stripes.size()];
@@ -208,15 +214,20 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
+ // TODO refactor, too many params
BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket,
- uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id);
+ uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id,
+ api::LockingRequirements lockReq);
~BucketLock();
const document::Bucket &getBucket() const override { return _bucket; }
+ api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; }
private:
Stripe & _stripe;
document::Bucket _bucket;
+ api::StorageMessage::Id _uniqueMsgId;
+ api::LockingRequirements _lockReq;
};
FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&,
@@ -253,8 +264,8 @@ public:
uint32_t getNextStripeId(uint32_t disk);
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket, uint16_t disk) {
- return _diskInfo[disk].lock(bucket);
+ lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) {
+ return _diskInfo[disk].lock(bucket, lockReq);
}
void addMergeStatus(const document::Bucket&, MergeStatus::SP);
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index c2dcb8e2a29..888dc93dd82 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -122,9 +122,14 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
result.disk = getPreferredAvailableDisk(bucket);
while (true) {
+ // This function is only called in a context where we require exclusive
+ // locking (split/join). Refactor if this no longer the case.
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(
- _fileStorHandler.lock(bucket, result.disk));
+ _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive));
+ // TODO disks are no longer used in practice, can we safely discard this?
+ // Might need it for synchronization purposes if something has taken the
+ // disk lock _and_ the bucket lock...?
StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "join-lockAndGetDisk-1", flags));
if (entry.exist() && entry->disk != result.disk) {
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
index f970091f695..3413feeeeab 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -302,4 +302,14 @@ StorageMessage::getSummary() const {
return toString();
}
+const char* to_string(LockingRequirements req) noexcept {
+ switch (req) {
+ case LockingRequirements::Exclusive:
+ return "Exclusive";
+ case LockingRequirements::Shared:
+ return "Shared";
+ }
+ assert(false);
+}
+
}
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index dadb68c644d..90261c2b9b1 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -306,6 +306,18 @@ struct TransportContext {
virtual ~TransportContext() = 0;
};
+enum class LockingRequirements : uint8_t {
+ // Operations with exclusive locking can only be executed iff no other
+ // exclusive or shared locks are taken for its bucket.
+ Exclusive = 0,
+ // Operations with shared locking can only be executed iff no exclusive
+ // lock is taken for its bucket. Should only be used for read-only operations
+ // that cannot mutate a bucket's state.
+ Shared
+};
+
+const char* to_string(LockingRequirements req) noexcept;
+
class StorageMessage : public vespalib::Printable
{
friend class StorageMessageTest; // Used for testing only
@@ -421,6 +433,10 @@ public:
virtual document::Bucket getBucket() const { return getDummyBucket(); }
document::BucketId getBucketId() const { return getBucket().getBucketId(); }
virtual bool hasSingleBucketId() const { return false; }
+ virtual LockingRequirements lockingRequirements() const noexcept {
+ // Safe default: assume exclusive locking is required.
+ return LockingRequirements::Exclusive;
+ }
};
}