diff options
7 files changed, 169 insertions, 56 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 74baecbf026..0da0fd5ce66 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -71,9 +71,9 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& } FileStorHandler::BucketLockInterface::SP -FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk) +FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq) { - return _impl->lock(bucket, disk); + return _impl->lock(bucket, disk, lockReq); } void diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index b74765b17d2..02c959df2f0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -58,8 +58,9 @@ public: typedef std::shared_ptr<BucketLockInterface> SP; virtual const document::Bucket &getBucket() const = 0; + virtual api::LockingRequirements lockingRequirements() const noexcept = 0; - virtual ~BucketLockInterface() {}; + virtual ~BucketLockInterface() = default; }; typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage; @@ -139,7 +140,7 @@ public: * * */ - BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk); + BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq); /** * Called by FileStorThread::onBucketDiskMove() after moving file, in case diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index a01881b6fbe..1cae13ea95c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -370,16 +370,16 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId) } std::shared_ptr<FileStorHandler::BucketLockInterface> -FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket) -{ +FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) { vespalib::MonitorGuard guard(_lock); - while (isLocked(guard, bucket)) { - LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str()); + while (isLocked(guard, bucket, lockReq)) { + LOG(spam, "Contending for filestor lock for %s with %s access", + bucket.getBucketId().toString().c_str(), api::to_string(lockReq)); guard.wait(100); } - auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0); + auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0, lockReq); guard.broadcast(); return locker; @@ -388,9 +388,9 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket) namespace { struct MultiLockGuard { std::map<uint16_t, vespalib::Monitor*> monitors; - std::vector<std::shared_ptr<vespalib::MonitorGuard> > guards; + std::vector<std::shared_ptr<vespalib::MonitorGuard>> guards; - MultiLockGuard() {} + MultiLockGuard() = default; void addLock(vespalib::Monitor& monitor, uint16_t index) { monitors[index] = &monitor; @@ -931,7 +931,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk) PriorityIdx& idx(bmi::get<1>(_queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - while (iter != end && isLocked(guard, iter->_bucket)) { + while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) { iter++; } if (iter != end) { @@ -959,6 +959,11 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck) } api::StorageMessage & m(*range.first->_command); + // We don't allow batching of operations across lock requirement modes. + if (lck.first->lockingRequirements() != m.lockingRequirements()) { + lck.second.reset(); + return lck; + } uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])); @@ -992,7 +997,8 @@ FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Priority if (!messageTimedOutInQueue(*msg, waitTime)) { auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(), - msg->getType().getId(), msg->getMsgId()); + msg->getType().getId(), msg->getMsgId(), + msg->lockingRequirements()); guard.unlock(); return FileStorHandler::LockedMessage(std::move(locker), std::move(msg)); } else { @@ -1090,10 +1096,65 @@ FileStorHandlerImpl::Stripe::flush() lockGuard.wait(100); } } + +void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket, + api::LockingRequirements reqOfReleasedLock, + api::StorageMessage::Id lockMsgId) { + vespalib::MonitorGuard guard(_lock); + auto iter = _lockedBuckets.find(bucket); + assert(iter != _lockedBuckets.end()); + auto& entry = iter->second; + + if (reqOfReleasedLock == api::LockingRequirements::Exclusive) { + assert(entry._exclusiveLock); + assert(entry._exclusiveLock->msgId == lockMsgId); + entry._exclusiveLock.reset(); + } else { + assert(!entry._exclusiveLock); + auto shared_iter = entry._sharedLocks.find(lockMsgId); + assert(shared_iter != entry._sharedLocks.end()); + entry._sharedLocks.erase(shared_iter); + } + + if (!entry._exclusiveLock && entry._sharedLocks.empty()) { + _lockedBuckets.erase(iter); // No more locks held + } + guard.broadcast(); +} + +void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, + api::LockingRequirements lockReq, const LockEntry & lockEntry) { + auto& entry = _lockedBuckets[bucket]; + assert(!entry._exclusiveLock); + if (lockReq == api::LockingRequirements::Exclusive) { + assert(entry._sharedLocks.empty()); + entry._exclusiveLock = lockEntry; + } else { + // TODO use a hash set with a custom comparator/hasher instead...? + auto inserted = entry._sharedLocks.insert(std::make_pair(lockEntry.msgId, lockEntry)); + (void) inserted; + assert(inserted.second); + } +} + bool -FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept +FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket, + api::LockingRequirements lockReq) const noexcept { - return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end()); + if (bucket.getBucketId().getRawId() == 0) { + return false; + } + auto iter = _lockedBuckets.find(bucket); + if (iter == _lockedBuckets.end()) { + return false; + } + if (iter->second._exclusiveLock) { + return true; + } + // Shared locks can be taken alongside other shared locks, but exclusive locks + // require that no shared locks are currently present. + return ((lockReq == api::LockingRequirements::Exclusive) + && !iter->second._sharedLocks.empty()); } uint32_t @@ -1114,33 +1175,26 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe, const document::Bucket &bucket, uint8_t priority, - api::MessageType::Id msgType, api::StorageMessage::Id msgId) + api::MessageType::Id msgType, api::StorageMessage::Id msgId, + api::LockingRequirements lockReq) : _stripe(stripe), - _bucket(bucket) + _bucket(bucket), + _uniqueMsgId(msgId), + _lockReq(lockReq) { - (void) guard; if (_bucket.getBucketId().getRawId() != 0) { - // Lock the bucket and wait until it is not the current operation for - // the disk itself. - _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, msgType, msgId)); - LOG(debug, "Locked bucket %s with priority %u", - bucket.getBucketId().toString().c_str(), priority); - - LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _bucket.getBucketId(), "acquired filestor lock", false, - debug::BucketOperationLogger::State::BUCKET_LOCKED); + _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId)); + LOG(debug, "Locked bucket %s for message %zu with priority %u in mode %s", + bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq)); } } -FileStorHandlerImpl::BucketLock::~BucketLock() -{ +FileStorHandlerImpl::BucketLock::~BucketLock() { if (_bucket.getBucketId().getRawId() != 0) { - _stripe.release(_bucket); - LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str()); - LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _bucket.getBucketId(), "released filestor lock", true, - debug::BucketOperationLogger::State::BUCKET_UNLOCKED); + _stripe.release(_bucket, _lockReq, _uniqueMsgId); + LOG(debug, "Unlocked bucket %s for message %zu in mode %s", + _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq)); } } @@ -1182,14 +1236,31 @@ FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const } } +namespace { + +void dump_lock_entry(const document::BucketId& bucketId, const FileStorHandlerImpl::Stripe::LockEntry& entry, + api::LockingRequirements lock_mode, uint32_t now_ts, std::ostream& os) { + os << api::MessageType::get(entry.msgType).getName() << ":" << entry.msgId << " (" + << bucketId << ", " << api::to_string(lock_mode) + << " lock) Running for " << (now_ts - entry.timestamp) << " secs<br/>\n"; +} + +} + void FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const { uint32_t now = time(nullptr); vespalib::MonitorGuard guard(_lock); for (const auto & e : _lockedBuckets) { - os << api::MessageType::get(e.second.msgType).getName() << ":" << e.second.msgId << " (" << e.first.getBucketId() - << ") Running for " << (now - e.second.timestamp) << " secs<br/>\n"; + if (e.second._exclusiveLock) { + dump_lock_entry(e.first.getBucketId(), *e.second._exclusiveLock, + api::LockingRequirements::Exclusive, now, os); + } + for (const auto& shared : e.second._sharedLocks) { + dump_lock_entry(e.first.getBucketId(), shared.second, + api::LockingRequirements::Shared, now, os); + } } } @@ -1238,7 +1309,6 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& } for (auto & entry : _mergeStates) { out << "<b>" << entry.first.toString() << "</b><br>\n"; - // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here.. } } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 45ac5ded47f..f9dcca4315b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -30,6 +30,7 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/stllike/hash_map.h> #include <atomic> +#include <optional> namespace storage { @@ -82,13 +83,19 @@ public: api::MessageType::Id msgType; api::StorageMessage::Id msgId; - LockEntry() : timestamp(0), priority(0), msgType(), msgId(0) { } LockEntry(uint8_t priority_, api::MessageType::Id msgType_, api::StorageMessage::Id msgId_) : timestamp(time(nullptr)), priority(priority_), msgType(msgType_), msgId(msgId_) { } }; + + struct MultiLockEntry { + std::optional<LockEntry> _exclusiveLock; + using SharedLocks = vespalib::hash_map<api::StorageMessage::Id, LockEntry>; + SharedLocks _sharedLocks; + }; + Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender); ~Stripe(); void flush(); @@ -105,19 +112,16 @@ public: vespalib::MonitorGuard guard(_lock); return _queue.size(); } - void release(const document::Bucket & bucket){ - vespalib::MonitorGuard guard(_lock); - _lockedBuckets.erase(bucket); - guard.broadcast(); - } + void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock, + api::StorageMessage::Id lockMsgId); - bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept; + bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&, + api::LockingRequirements lockReq) const noexcept; - void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) { - _lockedBuckets.insert(std::make_pair(bucket, lockEntry)); - } + void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, + api::LockingRequirements lockReq, const LockEntry & lockEntry); - std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket); + std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk); @@ -131,9 +135,11 @@ public: void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; } private: bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const; + // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts + // with its locking requirements. FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter); - typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets; + using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; FileStorStripeMetrics *_metrics; @@ -178,8 +184,8 @@ public: return _stripes[stripeId].getNextMessage(lck); } std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket) { - return stripe(bucket).lock(bucket); + lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { + return stripe(bucket).lock(bucket, lockReq); } void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { stripe(bucket).failOperations(bucket, code); @@ -194,7 +200,7 @@ public: // Disperse bucket bits by multiplying with the 64-bit FNV-1 prime. // This avoids an inherent affinity between the LSB of a bucket's bits // and the stripe an operation ends up on. - return bucket.getBucketId().getRawId() * 1099511628211ULL; + return bucket.getBucketId().getId() * 1099511628211ULL; } Stripe & stripe(const document::Bucket & bucket) { return _stripes[dispersed_bucket_bits(bucket) % _stripes.size()]; @@ -208,15 +214,20 @@ public: class BucketLock : public FileStorHandler::BucketLockInterface { public: + // TODO refactor, too many params BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket, - uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id); + uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id, + api::LockingRequirements lockReq); ~BucketLock(); const document::Bucket &getBucket() const override { return _bucket; } + api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; } private: Stripe & _stripe; document::Bucket _bucket; + api::StorageMessage::Id _uniqueMsgId; + api::LockingRequirements _lockReq; }; FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&, @@ -253,8 +264,8 @@ public: uint32_t getNextStripeId(uint32_t disk); std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket, uint16_t disk) { - return _diskInfo[disk].lock(bucket); + lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) { + return _diskInfo[disk].lock(bucket, lockReq); } void addMergeStatus(const document::Bucket&, MergeStatus::SP); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index c2dcb8e2a29..888dc93dd82 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -122,9 +122,14 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, result.disk = getPreferredAvailableDisk(bucket); while (true) { + // This function is only called in a context where we require exclusive + // locking (split/join). Refactor if this no longer the case. std::shared_ptr<FileStorHandler::BucketLockInterface> lock( - _fileStorHandler.lock(bucket, result.disk)); + _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive)); + // TODO disks are no longer used in practice, can we safely discard this? + // Might need it for synchronization purposes if something has taken the + // disk lock _and_ the bucket lock...? StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "join-lockAndGetDisk-1", flags)); if (entry.exist() && entry->disk != result.disk) { diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index f970091f695..3413feeeeab 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -302,4 +302,14 @@ StorageMessage::getSummary() const { return toString(); } +const char* to_string(LockingRequirements req) noexcept { + switch (req) { + case LockingRequirements::Exclusive: + return "Exclusive"; + case LockingRequirements::Shared: + return "Shared"; + } + assert(false); +} + } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index dadb68c644d..90261c2b9b1 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -306,6 +306,18 @@ struct TransportContext { virtual ~TransportContext() = 0; }; +enum class LockingRequirements : uint8_t { + // Operations with exclusive locking can only be executed iff no other + // exclusive or shared locks are taken for its bucket. + Exclusive = 0, + // Operations with shared locking can only be executed iff no exclusive + // lock is taken for its bucket. Should only be used for read-only operations + // that cannot mutate a bucket's state. + Shared +}; + +const char* to_string(LockingRequirements req) noexcept; + class StorageMessage : public vespalib::Printable { friend class StorageMessageTest; // Used for testing only @@ -421,6 +433,10 @@ public: virtual document::Bucket getBucket() const { return getDummyBucket(); } document::BucketId getBucketId() const { return getBucket().getBucketId(); } virtual bool hasSingleBucketId() const { return false; } + virtual LockingRequirements lockingRequirements() const noexcept { + // Safe default: assume exclusive locking is required. + return LockingRequirements::Exclusive; + } }; } |