diff options
26 files changed, 527 insertions, 347 deletions
diff --git a/parent/pom.xml b/parent/pom.xml index 56da33a9c93..0b141046d8a 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -130,7 +130,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-enforcer-plugin</artifactId> - <version>3.0.0-M1</version> + <version>3.0.0-M2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index d3883744229..25c6b71f7ff 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -528,7 +528,7 @@ DummyPersistence::get(const Bucket& b, b.toString().c_str(), did.toString().c_str()); assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - BucketContentGuard::UP bc(acquireBucketWithLock(b)); + BucketContentGuard::UP bc(acquireBucketWithLock(b, LockMode::Shared)); if (!bc.get()) { } else { DocEntry::SP entry((*bc)->getEntry(did)); @@ -568,7 +568,7 @@ DummyPersistence::createIterator( "Got invalid/unparseable document selection string"); } } - BucketContentGuard::UP bc(acquireBucketWithLock(b)); + BucketContentGuard::UP bc(acquireBucketWithLock(b, LockMode::Shared)); if (!bc.get()) { return CreateIteratorResult(Result::TRANSIENT_ERROR, "Bucket not found"); } @@ -656,7 +656,7 @@ DummyPersistence::iterate(IteratorId id, uint64_t maxByteSize, Context& ctx) con it = iter->second.get(); } - BucketContentGuard::UP bc(acquireBucketWithLock(it->_bucket)); + BucketContentGuard::UP bc(acquireBucketWithLock(it->_bucket, LockMode::Shared)); if (!bc.get()) { ctx.trace(9, "finished iterate(); bucket not found"); return IterateResult(Result::TRANSIENT_ERROR, "Bucket not found"); @@ -942,11 +942,11 @@ DummyPersistence::isActive(const Bucket& b) const BucketContentGuard::~BucketContentGuard() { - _persistence.releaseBucketNoLock(_content); + _persistence.releaseBucketNoLock(_content, _lock_mode); } BucketContentGuard::UP -DummyPersistence::acquireBucketWithLock(const Bucket& b) const +DummyPersistence::acquireBucketWithLock(const Bucket& b, LockMode lock_mode) const { assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); vespalib::MonitorGuard lock(_monitor); @@ -955,28 +955,32 @@ DummyPersistence::acquireBucketWithLock(const Bucket& b) const if (it == ncp._content[b.getPartition()].end()) { return BucketContentGuard::UP(); } - // Sanity check that SPI-level locking is doing its job correctly. - // Atomic CAS might be a bit overkill, but since we "release" the bucket - // outside of the mutex, we want to ensure the write is visible across all - // threads. - bool my_false(false); - bool bucketNotInUse(it->second->_inUse.compare_exchange_strong(my_false, true)); - if (!bucketNotInUse) { - LOG(error, "Attempted to acquire %s, but it was already marked as being in use!", - b.toString().c_str()); - LOG_ABORT("should not reach here"); + if (lock_mode == LockMode::Exclusive) { + // Sanity check that SPI-level locking is doing its job correctly. + // Atomic CAS might be a bit overkill, but since we "release" the bucket + // outside of the mutex, we want to ensure the write is visible across all + // threads. + bool my_false(false); + bool bucketNotInUse(it->second->_inUse.compare_exchange_strong(my_false, true)); + if (!bucketNotInUse) { + LOG(error, "Attempted to acquire %s, but it was already marked as being in use!", + b.toString().c_str()); + LOG_ABORT("dummy persistence bucket locking invariant violation"); + } } - return BucketContentGuard::UP(new BucketContentGuard(ncp, *it->second)); + return std::make_unique<BucketContentGuard>(ncp, *it->second, lock_mode); } void -DummyPersistence::releaseBucketNoLock(const BucketContent& bc) const +DummyPersistence::releaseBucketNoLock(const BucketContent& bc, LockMode lock_mode) const noexcept { - bool my_true(true); - bool bucketInUse(bc._inUse.compare_exchange_strong(my_true, false)); - assert(bucketInUse); - (void) bucketInUse; + if (lock_mode == LockMode::Exclusive) { + bool my_true(true); + bool bucketInUse(bc._inUse.compare_exchange_strong(my_true, false)); + assert(bucketInUse); + (void) bucketInUse; + } } } diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c93b7fd22c7..c97aab822ac 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -24,13 +24,18 @@ class DocumentTypeRepo; namespace storage::spi::dummy { +enum class LockMode { + Exclusive, + Shared +}; + struct BucketEntry { DocEntry::SP entry; GlobalId gid; BucketEntry(DocEntry::SP e, const GlobalId& g) - : entry(e), + : entry(std::move(e)), gid(g) { } }; @@ -98,30 +103,33 @@ class BucketContentGuard BucketContentGuard(const BucketContentGuard&); BucketContentGuard& operator=(const BucketContentGuard&); public: - typedef std::unique_ptr<BucketContentGuard> UP; + using UP = std::unique_ptr<BucketContentGuard>; BucketContentGuard(DummyPersistence& persistence, - BucketContent& content) + BucketContent& content, + LockMode lock_mode) : _persistence(persistence), - _content(content) + _content(content), + _lock_mode(lock_mode) { } ~BucketContentGuard(); - BucketContent& getContent() { + BucketContent& getContent() noexcept { return _content; } - BucketContent* operator->() { + BucketContent* operator->() noexcept { return &_content; } - BucketContent& operator*() { + BucketContent& operator*() noexcept { return _content; } private: DummyPersistence& _persistence; BucketContent& _content; + LockMode _lock_mode; }; class DummyPersistence : public AbstractPersistenceProvider @@ -207,8 +215,8 @@ public: private: friend class BucketContentGuard; // Const since funcs only alter mutable field in BucketContent - BucketContentGuard::UP acquireBucketWithLock(const Bucket& b) const; - void releaseBucketNoLock(const BucketContent& bc) const; + BucketContentGuard::UP acquireBucketWithLock(const Bucket& b, LockMode lock_mode = LockMode::Exclusive) const; + void releaseBucketNoLock(const BucketContent& bc, LockMode lock_mode = LockMode::Exclusive) const noexcept; mutable bool _initialized; std::shared_ptr<const document::DocumentTypeRepo> _repo; diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index b5f2fc198c4..96b3d385b87 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -232,6 +232,9 @@ struct PersistenceProvider * document id. If no versions were found, or the document was removed, * the result should be successful, but contain no document (see GetResult). * + * Concurrency note: may be called concurrently with other read-only + * operations. + * * @param fieldSet A set of fields that should be retrieved. * @param id The document id to retrieve. */ @@ -253,6 +256,9 @@ struct PersistenceProvider * iteration progress and selection criteria. destroyIterator will NOT * be called when createIterator returns an error. * + * Concurrency note: may be called concurrently with other read-only + * operations. + * * @param selection Selection criteria used to limit the subset of * the bucket's documents that will be returned by the iterator. The * provider implementation may use these criteria to optimize its @@ -323,6 +329,9 @@ struct PersistenceProvider * iterator must only set this flag on the result and return without any * documents. * + * Concurrency note: may be called concurrently with other read-only + * operations. + * * @param id An iterator ID returned by a previous call to createIterator * @param maxByteSize An indication of the maximum number of bytes that * should be returned. diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 95b3008985b..0d2c556b4d6 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -8,6 +8,7 @@ #include "tls_stats_factory.h" #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/vespalib/util/jsonwriter.h> +#include <thread> #include <vespa/log/log.h> LOG_SETUP(".proton.flushengine.flushengine"); @@ -22,8 +23,7 @@ namespace proton { namespace { search::SerialNum -findOldestFlushedSerial(const IFlushTarget::List &lst, - const IFlushHandler &handler) +findOldestFlushedSerial(const IFlushTarget::List &lst, const IFlushHandler &handler) { search::SerialNum ret(handler.getCurrentSerialNumber()); for (const IFlushTarget::SP & target : lst) { @@ -33,42 +33,46 @@ findOldestFlushedSerial(const IFlushTarget::List &lst, return ret; } +void +logTarget(const char * text, const FlushContext & ctx) { + LOG(debug, "Target '%s' %s flush of transactions %" PRIu64 " through %" PRIu64 ".", + ctx.getName().c_str(), text, + ctx.getTarget()->getFlushedSerialNum() + 1, + ctx.getHandler()->getCurrentSerialNumber()); +} + } -FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id) : - _name(name), - _start(start), - _id(id) +FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id) + : _name(name), + _start(start), + _id(id) { } -FlushEngine::FlushMeta::~FlushMeta() { } +FlushEngine::FlushMeta::~FlushMeta() = default; -FlushEngine::FlushInfo::FlushInfo() : - FlushMeta("", fastos::ClockSystem::now(), 0), - _target() +FlushEngine::FlushInfo::FlushInfo() + : FlushMeta("", fastos::ClockSystem::now(), 0), + _target() { } -FlushEngine::FlushInfo::~FlushInfo() { } +FlushEngine::FlushInfo::~FlushInfo() = default; -FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, - const IFlushTarget::SP &target, - const vespalib::string & destination) : - FlushMeta(destination, fastos::ClockSystem::now(), taskId), - _target(target) +FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const IFlushTarget::SP &target, const vespalib::string & destination) + : FlushMeta(destination, fastos::ClockSystem::now(), taskId), + _target(target) { } -FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> - tlsStatsFactory, - IFlushStrategy::SP strategy, uint32_t numThreads, - uint32_t idleIntervalMS) +FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory, + IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS) : _closed(false), _maxConcurrent(numThreads), _idleIntervalMS(idleIntervalMS), _taskId(0), _threadPool(128 * 1024), - _strategy(strategy), + _strategy(std::move(strategy)), _priorityStrategy(), _executor(numThreads, 128 * 1024), _lock(), @@ -78,11 +82,9 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> _setStrategyLock(), _strategyLock(), _strategyCond(), - _tlsStatsFactory(tlsStatsFactory), + _tlsStatsFactory(std::move(tlsStatsFactory)), _pendingPrune() -{ - // empty -} +{ } FlushEngine::~FlushEngine() { @@ -92,7 +94,7 @@ FlushEngine::~FlushEngine() FlushEngine & FlushEngine::start() { - if (_threadPool.NewThread(this) == NULL) { + if (_threadPool.NewThread(this) == nullptr) { throw vespalib::IllegalStateException("Failed to start engine thread."); } return *this; @@ -148,10 +150,8 @@ FlushEngine::wait(size_t minimumWaitTimeIfReady) } void -FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg) +FlushEngine::Run(FastOS_ThreadInterface *, void *) { - (void)thread; - (void)arg; bool shouldIdle = false; vespalib::string prevFlushName; while (wait(shouldIdle ? _idleIntervalMS : 0)) { @@ -161,13 +161,14 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg) } prevFlushName = flushNextTarget(prevFlushName); if ( ! prevFlushName.empty()) { - // Sleep at least 10 ms after a successful flush in order to avoid busy loop in case - // of strategy error or target error. - FastOS_Thread::Sleep(10); + // Sleep 1 ms after a successful flush in order to avoid busy loop in case + // of strategy or target error. + std::this_thread::sleep_for(1ms); } else { shouldIdle = true; } - LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str()); + LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", + shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str()); } _executor.sync(); prune(); @@ -211,18 +212,16 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const for (const auto & it : _handlers) { IFlushHandler & handler(*it.second); search::SerialNum serial(handler.getCurrentSerialNumber()); - LOG(spam, "Checking FlushHandler '%s' current serial = %ld", - handler.getName().c_str(), serial); + LOG(spam, "Checking FlushHandler '%s' current serial = %ld", handler.getName().c_str(), serial); IFlushTarget::List lst = handler.getFlushTargets(); for (const IFlushTarget::SP & target : lst) { - LOG(spam, "Checking target '%s' with flushedSerialNum = %ld", target->getName().c_str(), target->getFlushedSerialNum()); + LOG(spam, "Checking target '%s' with flushedSerialNum = %ld", + target->getName().c_str(), target->getFlushedSerialNum()); if (!isFlushing(guard, FlushContext::createName(handler, *target)) || includeFlushingTargets) { - ret.push_back(FlushContext::SP(new FlushContext(it.second, - IFlushTarget::SP(new CachedFlushTarget(target)), - serial))); + ret.push_back(std::make_shared<FlushContext>(it.second, std::make_shared<CachedFlushTarget>(target), serial)); } else { LOG(debug, "Target '%s' with flushedSerialNum = %ld already has a flush going. Local last serial = %ld.", - target->getName().c_str(), target->getFlushedSerialNum(), serial); + target->getName().c_str(), target->getFlushedSerialNum(), serial); } } } @@ -258,17 +257,12 @@ FlushEngine::initNextFlush(const FlushContext::List &lst) break; } } - if (ctx.get() != NULL) { - LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".", - ctx->getName().c_str(), - ctx->getTarget()->getFlushedSerialNum() + 1, - ctx->getHandler()->getCurrentSerialNumber()); + if (ctx) { + logTarget("initiated", *ctx); } return ctx; } - - void FlushEngine::flushAll(const FlushContext::List &lst) { @@ -276,19 +270,12 @@ FlushEngine::flushAll(const FlushContext::List &lst) for (const FlushContext::SP & ctx : lst) { if (wait(0)) { if (ctx->initFlush()) { - LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".", - ctx->getName().c_str(), - ctx->getTarget()->getFlushedSerialNum() + 1, - ctx->getHandler()->getCurrentSerialNumber()); - _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx))); + logTarget("initiated", *ctx); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); } else { - LOG(debug, "Target '%s' failed to initiate flush of transactions %" PRIu64 " through %" PRIu64 ".", - ctx->getName().c_str(), - ctx->getTarget()->getFlushedSerialNum() + 1, - ctx->getHandler()->getCurrentSerialNumber()); + logTarget("failed to initiate", *ctx); } } - } } @@ -311,17 +298,17 @@ FlushEngine::flushNextTarget(const vespalib::string & name) return ""; } FlushContext::SP ctx = initNextFlush(lst.first); - if (ctx.get() == NULL) { + if ( ! ctx) { LOG(debug, "All targets refused to flush."); return ""; } if ( name == ctx->getName()) { LOG(info, "The same target %s out of %ld has been asked to flush again. " - "This might indicate flush logic flaw so I will wait 1s before doing it.", + "This might indicate flush logic flaw so I will wait 100 ms before doing it.", name.c_str(), lst.first.size()); - FastOS_Thread::Sleep(1000); + std::this_thread::sleep_for(100ms); } - _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx))); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); return ctx->getName(); } @@ -330,12 +317,8 @@ FlushEngine::initFlush(const FlushContext &ctx) { if (LOG_WOULD_LOG(event)) { IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain()); - EventLogger::flushStart(ctx.getName(), - mgain.getBefore(), - mgain.getAfter(), - mgain.gain(), - ctx.getTarget()->getFlushedSerialNum() + 1, - ctx.getHandler()->getCurrentSerialNumber()); + EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(), + ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber()); } return initFlush(ctx.getHandler(), ctx.getTarget()); } @@ -350,10 +333,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) } if (LOG_WOULD_LOG(event)) { FlushStats stats = ctx.getTarget()->getLastFlushStats(); - EventLogger::flushComplete(ctx.getName(), - duration.ms(), - stats.getPath(), - stats.getPathElementsToLog()); + EventLogger::flushComplete(ctx.getName(), duration.ms(), stats.getPath(), stats.getPathElementsToLog()); } LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec()); std::lock_guard<std::mutex> guard(_lock); @@ -366,8 +346,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) } IFlushHandler::SP -FlushEngine::putFlushHandler(const DocTypeName &docTypeName, - const IFlushHandler::SP &flushHandler) +FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler) { std::lock_guard<std::mutex> guard(_lock); IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler)); @@ -379,13 +358,6 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, } IFlushHandler::SP -FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const -{ - std::lock_guard<std::mutex> guard(_lock); - return _handlers.getHandler(docTypeName); -} - -IFlushHandler::SP FlushEngine::removeFlushHandler(const DocTypeName &docTypeName) { std::lock_guard<std::mutex> guard(_lock); @@ -430,7 +402,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy) return; } assert(!_priorityStrategy); - _priorityStrategy = strategy; + _priorityStrategy = std::move(strategy); { std::lock_guard<std::mutex> guard(_lock); _cond.notify_all(); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 19175f9ce2a..c1be05ba067 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -15,7 +15,7 @@ namespace proton { namespace flushengine { class ITlsStatsFactory; } -class FlushEngine : public FastOS_Runnable +class FlushEngine final : public FastOS_Runnable { public: class FlushMeta { @@ -37,9 +37,7 @@ private: struct FlushInfo : public FlushMeta { FlushInfo(); - FlushInfo(uint32_t taskId, - const IFlushTarget::SP &target, - const vespalib::string &destination); + FlushInfo(uint32_t taskId, const IFlushTarget::SP &target, const vespalib::string &destination); ~FlushInfo(); IFlushTarget::SP _target; @@ -96,14 +94,13 @@ public: * @param numThreads The number of worker threads to use. * @param idleInterval The interval between when flushes are checked whne there are no one progressing. */ - FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> - tlsStatsFactory, + FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory, IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS); /** * Destructor. Waits for all pending tasks to complete. */ - ~FlushEngine(); + ~FlushEngine() override; /** * Observe and reset internal executor stats @@ -145,19 +142,8 @@ public: * @param flushHandler The handler to register. * @return The replaced handler, if any. */ - IFlushHandler::SP - putFlushHandler(const DocTypeName &docTypeName, - const IFlushHandler::SP &flushHandler); + IFlushHandler::SP putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler); - /** - * Returns the flush handler for the given document type. If no handler was - * registered, this method returns an empty shared pointer. - * - * @param docType The document type whose handler to return. - * @return The registered handler, if any. - */ - IFlushHandler::SP - getFlushHandler(const DocTypeName &docTypeName) const; /** * Removes and returns the flush handler for the given document type. If no @@ -166,10 +152,8 @@ public: * @param docType The document type whose handler to remove. * @return The removed handler, if any. */ - IFlushHandler::SP - removeFlushHandler(const DocTypeName &docTypeName); + IFlushHandler::SP removeFlushHandler(const DocTypeName &docTypeName); - // Implements FastOS_Runnable. void Run(FastOS_ThreadInterface *thread, void *arg) override; FlushMetaSet getCurrentlyFlushingSet() const; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h index 3a24330f8ec..914de9df30c 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h @@ -30,9 +30,7 @@ public: */ IFlushHandler(const vespalib::string &name) : _name(name) - { - // empty - } + { } /** * Virtual destructor required for inheritance. @@ -76,8 +74,7 @@ public: * This method is called to sync tls to stable media, up to and * including the given serial number. * - * @param syncTo The last serial number that has to be persisted to stable - * media. + * @param syncTo The last serial number that has to be persisted to stable media. */ virtual void syncTls(SerialNum syncTo) = 0; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp b/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp index aef49c51f7a..1c930ef1ddc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp @@ -50,8 +50,7 @@ PrepareRestartHandler::performPrepareRestart(const ProtonConfig &protonCfg, std: { _running = true; lock.unlock(); - auto strategy = std::make_shared<PrepareRestartFlushStrategy>(createPrepareRestartConfig(protonCfg)); - _flushEngine.setStrategy(strategy); + _flushEngine.setStrategy(std::make_shared<PrepareRestartFlushStrategy>(createPrepareRestartConfig(protonCfg))); lock.lock(); _running = false; _cond.notify_all(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 32dc711d5cf..6ffb82a2ad3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -240,9 +240,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) const HwInfo & hwInfo = configSnapshot->getHwInfo(); setFS4Compression(protonConfig); - _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler> - (protonConfig.basedir, - diskMemUsageSamplerConfig(protonConfig, hwInfo)); + _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, + diskMemUsageSamplerConfig(protonConfig, hwInfo)); _tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); _metricsEngine->addMetricsHook(_metricsHook); @@ -253,6 +252,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _distributionKey = protonConfig.distributionkey; _summaryEngine= std::make_unique<SummaryEngine>(protonConfig.numsummarythreads); _docsumBySlime = std::make_unique<DocsumBySlime>(*_summaryEngine); + IFlushStrategy::SP strategy; const ProtonConfig::Flush & flush(protonConfig.flush); switch (flush.strategy) { @@ -283,17 +283,15 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) LOG(debug, "Start proton server with root at %s and cwd at %s", protonConfig.basedir.c_str(), getcwd(tmp, sizeof(tmp))); - _persistenceEngine.reset(new PersistenceEngine(*this, - _diskMemUsageSampler->writeFilter(), - protonConfig.visit.defaultserializedsize, - protonConfig.visit.ignoremaxbytes)); - + _persistenceEngine = std::make_unique<PersistenceEngine>(*this, _diskMemUsageSampler->writeFilter(), + protonConfig.visit.defaultserializedsize, + protonConfig.visit.ignoremaxbytes); vespalib::string fileConfigId; - _warmupExecutor.reset(new vespalib::ThreadStackExecutor(4, 128*1024)); + _warmupExecutor = std::make_unique<vespalib::ThreadStackExecutor>(4, 128*1024); const size_t summaryThreads = deriveCompactionCompressionThreads(protonConfig, hwInfo.cpu()); - _summaryExecutor.reset(new vespalib::BlockingThreadStackExecutor(summaryThreads, 128*1024, summaryThreads*16)); + _summaryExecutor = std::make_unique<vespalib::BlockingThreadStackExecutor>(summaryThreads, 128*1024, summaryThreads*16); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128 * 1024); @@ -305,7 +303,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _prepareRestartHandler = std::make_unique<PrepareRestartHandler>(*_flushEngine); RPCHooks::Params rpcParams(*this, protonConfig.rpcport, _configUri.getConfigId()); rpcParams.slobrok_config = _configUri.createWithNewId(protonConfig.slobrokconfigid); - _rpcHooks.reset(new RPCHooks(rpcParams)); + _rpcHooks = std::make_unique<RPCHooks>(rpcParams); waitForInitDone(); @@ -528,23 +526,11 @@ Proton::addDocumentDB(const document::DocumentType &docType, // 1 thread per document type. initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(1, 128 * 1024); } - DocumentDB::SP ret(new DocumentDB(config.basedir + "/documents", - documentDBConfig, - config.tlsspec, - _queryLimiter, - _clock, - docTypeName, - bucketSpace, - config, - *this, - *_warmupExecutor, - *_summaryExecutor, - *_tls->getTransLogServer(), - *_metricsEngine, - _fileHeaderContext, - std::move(config_store), - initializeThreads, - bootstrapConfig->getHwInfo())); + auto ret = std::make_shared<DocumentDB>(config.basedir + "/documents", documentDBConfig, config.tlsspec, + _queryLimiter, _clock, docTypeName, bucketSpace, config, *this, + *_warmupExecutor, *_summaryExecutor, *_tls->getTransLogServer(), + *_metricsEngine, _fileHeaderContext, std::move(config_store), + initializeThreads, bootstrapConfig->getHwInfo()); try { ret->start(); } catch (vespalib::Exception &e) { @@ -571,10 +557,10 @@ Proton::addDocumentDB(const document::DocumentType &docType, // TODO: Fix race with new cluster state setting. _persistenceEngine->putHandler(bucketSpace, docTypeName, persistenceHandler); } - SearchHandlerProxy::SP searchHandler(new SearchHandlerProxy(ret)); + auto searchHandler = std::make_shared<SearchHandlerProxy>(ret); _summaryEngine->putSearchHandler(docTypeName, searchHandler); _matchEngine->putSearchHandler(docTypeName, searchHandler); - FlushHandlerProxy::SP flushHandler(new FlushHandlerProxy(ret)); + auto flushHandler = std::make_shared<FlushHandlerProxy>(ret); _flushEngine->putFlushHandler(docTypeName, flushHandler); _diskMemUsageSampler->notifier().addDiskMemUsageListener(ret->diskMemUsageListener()); return ret; @@ -624,7 +610,7 @@ Proton::MonitorReply::UP Proton::ping(MonitorRequest::UP request, MonitorClient & client) { (void) client; - MonitorReply::UP reply(new MonitorReply()); + auto reply = std::make_unique<MonitorReply>(); MonitorReply &ret = *reply; BootstrapConfig::SP configSnapshot = getActiveConfigSnapshot(); @@ -807,8 +793,8 @@ struct DocumentDBMapExplorer : vespalib::StateExplorer { std::shared_timed_mutex &mutex; DocumentDBMapExplorer(const DocumentDBMap &documentDBMap_in, std::shared_timed_mutex &mutex_in) : documentDBMap(documentDBMap_in), mutex(mutex_in) {} - virtual void get_state(const vespalib::slime::Inserter &, bool) const override {} - virtual std::vector<vespalib::string> get_children_names() const override { + void get_state(const vespalib::slime::Inserter &, bool) const override {} + std::vector<vespalib::string> get_children_names() const override { std::shared_lock<std::shared_timed_mutex> guard(mutex); std::vector<vespalib::string> names; for (const auto &item: documentDBMap) { @@ -816,14 +802,14 @@ struct DocumentDBMapExplorer : vespalib::StateExplorer { } return names; } - virtual std::unique_ptr<vespalib::StateExplorer> get_child(vespalib::stringref name) const override { + std::unique_ptr<vespalib::StateExplorer> get_child(vespalib::stringref name) const override { typedef std::unique_ptr<StateExplorer> Explorer_UP; std::shared_lock<std::shared_timed_mutex> guard(mutex); auto result = documentDBMap.find(DocTypeName(vespalib::string(name))); if (result == documentDBMap.end()) { return Explorer_UP(nullptr); } - return Explorer_UP(new DocumentDBExplorer(result->second)); + return std::make_unique<DocumentDBExplorer>(result->second); } }; diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index c92687f798b..835b8ef1044 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -18,19 +18,17 @@ spi::LoadType FileStorTestFixture::defaultLoadType = spi::LoadType(0, "default") const uint32_t FileStorTestFixture::MSG_WAIT_TIME; void -FileStorTestFixture::setupDisks(uint32_t diskCount) +FileStorTestFixture::setupPersistenceThreads(uint32_t threads) { std::string rootOfRoot = "todo-make-unique-filestorefixture"; - _config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot))); - - _config2.reset(new vdstestlib::DirConfig(*_config)); - _config2->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config2->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config2->getConfig("stor-server").set("node_index", "1"); - - _smallConfig.reset(new vdstestlib::DirConfig(*_config)); - _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(1), - _config->getConfigId())); + _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot)); + _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2")); + _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2")); + _config->getConfig("stor-server").set("node_index", "1"); + _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads)); + + _node = std::make_unique<TestServiceLayerApp>( + DiskCount(1), NodeIndex(1), _config->getConfigId()); _testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1"); } @@ -38,16 +36,15 @@ FileStorTestFixture::setupDisks(uint32_t diskCount) void FileStorTestFixture::setUp() { - setupDisks(1); + setupPersistenceThreads(1); _node->setPersistenceProvider( - spi::PersistenceProvider::UP( - new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); + std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1)); } void FileStorTestFixture::tearDown() { - _node.reset(0); + _node.reset(); } void @@ -91,7 +88,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents( } api::StorageMessageAddress -FileStorTestFixture::TestFileStorComponents::makeSelfAddress() const { +FileStorTestFixture::makeSelfAddress() { return api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 0); } diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h index c8158d01224..c46f9de24fc 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.h +++ b/storage/src/tests/persistence/common/filestortestfixture.h @@ -19,8 +19,6 @@ public: std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<vdstestlib::DirConfig> _config; - std::unique_ptr<vdstestlib::DirConfig> _config2; - std::unique_ptr<vdstestlib::DirConfig> _smallConfig; const document::DocumentType* _testdoctype1; static const uint32_t MSG_WAIT_TIME = 60 * 1000; @@ -30,10 +28,12 @@ public: void setUp() override; void tearDown() override; - void setupDisks(uint32_t diskCount); + void setupPersistenceThreads(uint32_t diskCount); void createBucket(const document::BucketId& bid); bool bucketExistsInDb(const document::BucketId& bucket) const; + static api::StorageMessageAddress makeSelfAddress(); + api::ReturnCode::Result resultOf(const api::StorageReply& reply) const { return reply.getResult().getResult(); } @@ -99,8 +99,6 @@ public: const char* testName, const StorageLinkInjector& i = NoOpStorageLinkInjector()); - api::StorageMessageAddress makeSelfAddress() const; - void sendDummyGet(const document::BucketId& bid); void sendPut(const document::BucketId& bid, uint32_t docIdx, diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp index 50999f5883e..d4cec415937 100644 --- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp +++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp @@ -16,7 +16,7 @@ class MergeBlockingTest : public FileStorTestFixture { public: void setupDisks() { - FileStorTestFixture::setupDisks(1); + FileStorTestFixture::setupPersistenceThreads(1); _node->setPersistenceProvider( spi::PersistenceProvider::UP( new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 64ef48b5719..e12f48bcdea 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -86,9 +86,9 @@ public: std::unique_ptr<vespalib::Barrier> _queueBarrier; std::unique_ptr<vespalib::Barrier> _completionBarrier; - void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) { - FileStorTestFixture::setupDisks(diskCount); - _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount)); + void setupProviderAndBarriers(uint32_t queueBarrierThreads) { + FileStorTestFixture::setupPersistenceThreads(1); + _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)); _queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads)); _completionBarrier.reset(new vespalib::Barrier(2)); _blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier); @@ -219,7 +219,7 @@ makeAbortCmd(const Container& buckets) void OperationAbortingTest::testAbortMessageClearsRelevantQueuedOperations() { - setupDisks(1, 2); + setupProviderAndBarriers(2); TestFileStorComponents c(*this, "testAbortMessageClearsRelevantQueuedOperations"); document::BucketId bucket(16, 1); createBucket(bucket); @@ -305,7 +305,7 @@ public: void OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket() { - setupDisks(1, 3); + setupProviderAndBarriers(3); TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket"); document::BucketId bucket(16, 1); @@ -386,7 +386,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName, const std::vector<api::StorageMessage::SP>& msgs, bool shouldCreateBucketInitially) { - setupDisks(1, 2); + setupProviderAndBarriers(2); TestFileStorComponents c(*this, testName); document::BucketId bucket(16, 1); document::BucketId blockerBucket(16, 2); diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index f31623eed61..e96ad013923 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -15,86 +15,190 @@ using document::test::makeDocumentBucket; namespace storage { -class PersistenceQueueTest : public FileStorTestFixture -{ +class PersistenceQueueTest : public FileStorTestFixture { public: void testFetchNextUnlockedMessageIfBucketLocked(); + void shared_locked_operations_allow_concurrent_bucket_access(); + void exclusive_locked_operation_not_started_if_shared_op_active(); + void shared_locked_operation_not_started_if_exclusive_op_active(); + void exclusive_locked_operation_not_started_if_exclusive_op_active(); + void operation_batching_not_allowed_across_different_lock_modes(); - std::shared_ptr<api::StorageMessage> - createPut(uint64_t bucket, uint64_t docIdx); + std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx); + std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const; void setUp() override; - void tearDown() override; CPPUNIT_TEST_SUITE(PersistenceQueueTest); CPPUNIT_TEST(testFetchNextUnlockedMessageIfBucketLocked); + CPPUNIT_TEST(shared_locked_operations_allow_concurrent_bucket_access); + CPPUNIT_TEST(exclusive_locked_operation_not_started_if_shared_op_active); + CPPUNIT_TEST(shared_locked_operation_not_started_if_exclusive_op_active); + CPPUNIT_TEST(exclusive_locked_operation_not_started_if_exclusive_op_active); + CPPUNIT_TEST(operation_batching_not_allowed_across_different_lock_modes); CPPUNIT_TEST_SUITE_END(); + + struct Fixture { + FileStorTestFixture& parent; + DummyStorageLink top; + std::unique_ptr<DummyStorageLink> dummyManager; + ForwardingMessageSender messageSender; + documentapi::LoadTypeSet loadTypes; + FileStorMetrics metrics; + std::unique_ptr<FileStorHandler> filestorHandler; + uint32_t stripeId; + + explicit Fixture(FileStorTestFixture& parent); + ~Fixture(); + }; + + static constexpr uint16_t _disk = 0; }; CPPUNIT_TEST_SUITE_REGISTRATION(PersistenceQueueTest); -void -PersistenceQueueTest::setUp() +PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) + : parent(parent_), + top(), + dummyManager(std::make_unique<DummyStorageLink>()), + messageSender(*dummyManager), + loadTypes("raw:"), + metrics(loadTypes.getMetricLoadTypes()) { - setupDisks(1); - _node->setPersistenceProvider( - spi::PersistenceProvider::UP( - new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); + top.push_back(std::move(dummyManager)); + top.open(); + + metrics.initDiskMetrics(parent._node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); + + filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, parent._node->getPartitions(), + parent._node->getComponentRegister()); + // getNextMessage will time out if no unlocked buckets are present. Choose a timeout + // that is large enough to fail tests with high probability if this is not the case, + // and small enough to not slow down testing too much. + filestorHandler->setGetNextMessageTimeout(20); + + stripeId = filestorHandler->getNextStripeId(0); } -void -PersistenceQueueTest::tearDown() -{ - _node.reset(0); +PersistenceQueueTest::Fixture::~Fixture() = default; + +void PersistenceQueueTest::setUp() { + setupPersistenceThreads(1); + _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1)); } -std::shared_ptr<api::StorageMessage> -PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) -{ - std::ostringstream id; - id << "id:foo:testdoctype1:n=" << bucket << ":" << docIdx; - document::Document::SP doc( - _node->getTestDocMan().createDocument("foobar", id.str())); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234)); - cmd->setAddress(api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 0)); +std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) { + std::shared_ptr<document::Document> doc = _node->getTestDocMan().createDocument( + "foobar", vespalib::make_string("id:foo:testdoctype1:n=%zu:%zu", bucket, docIdx)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234); + cmd->setAddress(makeSelfAddress()); return cmd; } -void -PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() -{ - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); - uint32_t stripeId = filestorHandler.getNextStripeId(0); +std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createGet(uint64_t bucket) const { + auto cmd = std::make_shared<api::GetCommand>( + makeDocumentBucket(document::BucketId(16, bucket)), + document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%zu:0", bucket)), "[all]"); + cmd->setAddress(makeSelfAddress()); + return cmd; +} +void PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() { + Fixture f(*this); // Send 2 puts, 2 to the first bucket, 1 to the second. Calling // getNextMessage 2 times should then return a lock on the first bucket, // then subsequently on the second, skipping the already locked bucket. // Puts all have same pri, so order is well defined. - filestorHandler.schedule(createPut(1234, 0), 0); - filestorHandler.schedule(createPut(1234, 1), 0); - filestorHandler.schedule(createPut(5432, 0), 0); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 1), _disk); + f.filestorHandler->schedule(createPut(5432, 0), _disk); - auto lock0 = filestorHandler.getNextMessage(0, stripeId); + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); CPPUNIT_ASSERT(lock0.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = filestorHandler.getNextMessage(0, stripeId); + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); CPPUNIT_ASSERT(lock1.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432), dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); } +void PersistenceQueueTest::shared_locked_operations_allow_concurrent_bucket_access() { + Fixture f(*this); + + f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + + // Even though we already have a lock on the bucket, Gets allow shared locking and we + // should therefore be able to get another lock. + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock1.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock1.first->lockingRequirements()); +} + +void PersistenceQueueTest::exclusive_locked_operation_not_started_if_shared_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::shared_locked_operation_not_started_if_exclusive_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::exclusive_locked_operation_not_started_if_exclusive_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::operation_batching_not_allowed_across_different_lock_modes() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first); + CPPUNIT_ASSERT(lock0.second); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0); + CPPUNIT_ASSERT(!lock0.second); +} + } // namespace storage diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index 27281d9b95f..4fc577226ca 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -62,7 +62,7 @@ private: CPPUNIT_TEST(testNormalUsage); CPPUNIT_TEST(testFailedCreateIterator); CPPUNIT_TEST(testFailedGetIter); - CPPUNIT_TEST(testMultipleFailedGetIter); + CPPUNIT_TEST(iterators_per_bucket_config_is_ignored_and_hardcoded_to_1); CPPUNIT_TEST(testDocumentAPIClientError); CPPUNIT_TEST(testNoDocumentAPIResendingForFailedVisitor); CPPUNIT_TEST(testIteratorCreatedForFailedVisitor); @@ -90,7 +90,7 @@ public: void testNormalUsage(); void testFailedCreateIterator(); void testFailedGetIter(); - void testMultipleFailedGetIter(); + void iterators_per_bucket_config_is_ignored_and_hardcoded_to_1(); void testDocumentAPIClientError(); void testNoDocumentAPIResendingForFailedVisitor(); void testIteratorCreatedForFailedVisitor(); @@ -592,36 +592,31 @@ VisitorTest::testFailedGetIter() CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); } -void -VisitorTest::testMultipleFailedGetIter() -{ - initializeTest(TestParams().iteratorsPerBucket(2)); - std::shared_ptr<api::CreateVisitorCommand> cmd( - makeCreateVisitor()); +void VisitorTest::iterators_per_bucket_config_is_ignored_and_hardcoded_to_1() { + initializeTest(TestParams().iteratorsPerBucket(20)); + auto cmd = makeCreateVisitor(); _top->sendDown(cmd); sendCreateIteratorReply(); - std::vector<GetIterCommand::SP> getIterCmds( - fetchMultipleCommands<GetIterCommand>(*_bottom, 2)); - - sendGetIterReply(*getIterCmds[0], - api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND)); - - // Wait for an "appropriate" amount of time so that wrongful logic - // will send a DestroyIteratorCommand before all pending GetIters - // have been replied to. - std::this_thread::sleep_for(100ms); + auto getIterCmd = fetchSingleCommand<GetIterCommand>(*_bottom); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + sendGetIterReply(*getIterCmd); CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands()); - sendGetIterReply(*getIterCmds[1], - api::ReturnCode(api::ReturnCode::BUCKET_DELETED)); + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + getMessagesAndReply(_documents.size(), getSession(0), docs, docIds, infoMessages); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), docIds.size()); - DestroyIteratorCommand::SP destroyIterCmd( - fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + auto destroyIterCmd = fetchSingleCommand<DestroyIteratorCommand>(*_bottom); - verifyCreateVisitorReply(api::ReturnCode::BUCKET_DELETED, 0, 0); + verifyCreateVisitorReply(api::ReturnCode::OK); CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); + CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount()); } void diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 74baecbf026..0da0fd5ce66 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -71,9 +71,9 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& } FileStorHandler::BucketLockInterface::SP -FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk) +FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq) { - return _impl->lock(bucket, disk); + return _impl->lock(bucket, disk, lockReq); } void diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index b74765b17d2..02c959df2f0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -58,8 +58,9 @@ public: typedef std::shared_ptr<BucketLockInterface> SP; virtual const document::Bucket &getBucket() const = 0; + virtual api::LockingRequirements lockingRequirements() const noexcept = 0; - virtual ~BucketLockInterface() {}; + virtual ~BucketLockInterface() = default; }; typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage; @@ -139,7 +140,7 @@ public: * * */ - BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk); + BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq); /** * Called by FileStorThread::onBucketDiskMove() after moving file, in case diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index a01881b6fbe..f9571228ef9 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -370,16 +370,16 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId) } std::shared_ptr<FileStorHandler::BucketLockInterface> -FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket) -{ +FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) { vespalib::MonitorGuard guard(_lock); - while (isLocked(guard, bucket)) { - LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str()); + while (isLocked(guard, bucket, lockReq)) { + LOG(spam, "Contending for filestor lock for %s with %s access", + bucket.getBucketId().toString().c_str(), api::to_string(lockReq)); guard.wait(100); } - auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0); + auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0, lockReq); guard.broadcast(); return locker; @@ -388,9 +388,9 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket) namespace { struct MultiLockGuard { std::map<uint16_t, vespalib::Monitor*> monitors; - std::vector<std::shared_ptr<vespalib::MonitorGuard> > guards; + std::vector<std::shared_ptr<vespalib::MonitorGuard>> guards; - MultiLockGuard() {} + MultiLockGuard() = default; void addLock(vespalib::Monitor& monitor, uint16_t index) { monitors[index] = &monitor; @@ -931,7 +931,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk) PriorityIdx& idx(bmi::get<1>(_queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - while (iter != end && isLocked(guard, iter->_bucket)) { + while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) { iter++; } if (iter != end) { @@ -959,6 +959,13 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck) } api::StorageMessage & m(*range.first->_command); + // For now, don't allow batching of operations across lock requirement modes. + // We might relax this requirement later once we're 100% sure it can't trigger + // any unfortunate edge cases. + if (lck.first->lockingRequirements() != m.lockingRequirements()) { + lck.second.reset(); + return lck; + } uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])); @@ -992,7 +999,8 @@ FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Priority if (!messageTimedOutInQueue(*msg, waitTime)) { auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(), - msg->getType().getId(), msg->getMsgId()); + msg->getType().getId(), msg->getMsgId(), + msg->lockingRequirements()); guard.unlock(); return FileStorHandler::LockedMessage(std::move(locker), std::move(msg)); } else { @@ -1090,10 +1098,65 @@ FileStorHandlerImpl::Stripe::flush() lockGuard.wait(100); } } + +void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket, + api::LockingRequirements reqOfReleasedLock, + api::StorageMessage::Id lockMsgId) { + vespalib::MonitorGuard guard(_lock); + auto iter = _lockedBuckets.find(bucket); + assert(iter != _lockedBuckets.end()); + auto& entry = iter->second; + + if (reqOfReleasedLock == api::LockingRequirements::Exclusive) { + assert(entry._exclusiveLock); + assert(entry._exclusiveLock->msgId == lockMsgId); + entry._exclusiveLock.reset(); + } else { + assert(!entry._exclusiveLock); + auto shared_iter = entry._sharedLocks.find(lockMsgId); + assert(shared_iter != entry._sharedLocks.end()); + entry._sharedLocks.erase(shared_iter); + } + + if (!entry._exclusiveLock && entry._sharedLocks.empty()) { + _lockedBuckets.erase(iter); // No more locks held + } + guard.broadcast(); +} + +void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, + api::LockingRequirements lockReq, const LockEntry & lockEntry) { + auto& entry = _lockedBuckets[bucket]; + assert(!entry._exclusiveLock); + if (lockReq == api::LockingRequirements::Exclusive) { + assert(entry._sharedLocks.empty()); + entry._exclusiveLock = lockEntry; + } else { + // TODO use a hash set with a custom comparator/hasher instead...? + auto inserted = entry._sharedLocks.insert(std::make_pair(lockEntry.msgId, lockEntry)); + (void) inserted; + assert(inserted.second); + } +} + bool -FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept +FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket, + api::LockingRequirements lockReq) const noexcept { - return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end()); + if (bucket.getBucketId().getRawId() == 0) { + return false; + } + auto iter = _lockedBuckets.find(bucket); + if (iter == _lockedBuckets.end()) { + return false; + } + if (iter->second._exclusiveLock) { + return true; + } + // Shared locks can be taken alongside other shared locks, but exclusive locks + // require that no shared locks are currently present. + return ((lockReq == api::LockingRequirements::Exclusive) + && !iter->second._sharedLocks.empty()); } uint32_t @@ -1114,33 +1177,26 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe, const document::Bucket &bucket, uint8_t priority, - api::MessageType::Id msgType, api::StorageMessage::Id msgId) + api::MessageType::Id msgType, api::StorageMessage::Id msgId, + api::LockingRequirements lockReq) : _stripe(stripe), - _bucket(bucket) + _bucket(bucket), + _uniqueMsgId(msgId), + _lockReq(lockReq) { - (void) guard; if (_bucket.getBucketId().getRawId() != 0) { - // Lock the bucket and wait until it is not the current operation for - // the disk itself. - _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, msgType, msgId)); - LOG(debug, "Locked bucket %s with priority %u", - bucket.getBucketId().toString().c_str(), priority); - - LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _bucket.getBucketId(), "acquired filestor lock", false, - debug::BucketOperationLogger::State::BUCKET_LOCKED); + _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId)); + LOG(debug, "Locked bucket %s for message %zu with priority %u in mode %s", + bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq)); } } -FileStorHandlerImpl::BucketLock::~BucketLock() -{ +FileStorHandlerImpl::BucketLock::~BucketLock() { if (_bucket.getBucketId().getRawId() != 0) { - _stripe.release(_bucket); - LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str()); - LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _bucket.getBucketId(), "released filestor lock", true, - debug::BucketOperationLogger::State::BUCKET_UNLOCKED); + _stripe.release(_bucket, _lockReq, _uniqueMsgId); + LOG(debug, "Unlocked bucket %s for message %zu in mode %s", + _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq)); } } @@ -1182,14 +1238,31 @@ FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const } } +namespace { + +void dump_lock_entry(const document::BucketId& bucketId, const FileStorHandlerImpl::Stripe::LockEntry& entry, + api::LockingRequirements lock_mode, uint32_t now_ts, std::ostream& os) { + os << api::MessageType::get(entry.msgType).getName() << ":" << entry.msgId << " (" + << bucketId << ", " << api::to_string(lock_mode) + << " lock) Running for " << (now_ts - entry.timestamp) << " secs<br/>\n"; +} + +} + void FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const { uint32_t now = time(nullptr); vespalib::MonitorGuard guard(_lock); for (const auto & e : _lockedBuckets) { - os << api::MessageType::get(e.second.msgType).getName() << ":" << e.second.msgId << " (" << e.first.getBucketId() - << ") Running for " << (now - e.second.timestamp) << " secs<br/>\n"; + if (e.second._exclusiveLock) { + dump_lock_entry(e.first.getBucketId(), *e.second._exclusiveLock, + api::LockingRequirements::Exclusive, now, os); + } + for (const auto& shared : e.second._sharedLocks) { + dump_lock_entry(e.first.getBucketId(), shared.second, + api::LockingRequirements::Shared, now, os); + } } } @@ -1238,7 +1311,6 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& } for (auto & entry : _mergeStates) { out << "<b>" << entry.first.toString() << "</b><br>\n"; - // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here.. } } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 45ac5ded47f..f9dcca4315b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -30,6 +30,7 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/stllike/hash_map.h> #include <atomic> +#include <optional> namespace storage { @@ -82,13 +83,19 @@ public: api::MessageType::Id msgType; api::StorageMessage::Id msgId; - LockEntry() : timestamp(0), priority(0), msgType(), msgId(0) { } LockEntry(uint8_t priority_, api::MessageType::Id msgType_, api::StorageMessage::Id msgId_) : timestamp(time(nullptr)), priority(priority_), msgType(msgType_), msgId(msgId_) { } }; + + struct MultiLockEntry { + std::optional<LockEntry> _exclusiveLock; + using SharedLocks = vespalib::hash_map<api::StorageMessage::Id, LockEntry>; + SharedLocks _sharedLocks; + }; + Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender); ~Stripe(); void flush(); @@ -105,19 +112,16 @@ public: vespalib::MonitorGuard guard(_lock); return _queue.size(); } - void release(const document::Bucket & bucket){ - vespalib::MonitorGuard guard(_lock); - _lockedBuckets.erase(bucket); - guard.broadcast(); - } + void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock, + api::StorageMessage::Id lockMsgId); - bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept; + bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&, + api::LockingRequirements lockReq) const noexcept; - void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) { - _lockedBuckets.insert(std::make_pair(bucket, lockEntry)); - } + void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, + api::LockingRequirements lockReq, const LockEntry & lockEntry); - std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket); + std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk); @@ -131,9 +135,11 @@ public: void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; } private: bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const; + // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts + // with its locking requirements. FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter); - typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets; + using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; FileStorStripeMetrics *_metrics; @@ -178,8 +184,8 @@ public: return _stripes[stripeId].getNextMessage(lck); } std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket) { - return stripe(bucket).lock(bucket); + lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { + return stripe(bucket).lock(bucket, lockReq); } void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { stripe(bucket).failOperations(bucket, code); @@ -194,7 +200,7 @@ public: // Disperse bucket bits by multiplying with the 64-bit FNV-1 prime. // This avoids an inherent affinity between the LSB of a bucket's bits // and the stripe an operation ends up on. - return bucket.getBucketId().getRawId() * 1099511628211ULL; + return bucket.getBucketId().getId() * 1099511628211ULL; } Stripe & stripe(const document::Bucket & bucket) { return _stripes[dispersed_bucket_bits(bucket) % _stripes.size()]; @@ -208,15 +214,20 @@ public: class BucketLock : public FileStorHandler::BucketLockInterface { public: + // TODO refactor, too many params BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket, - uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id); + uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id, + api::LockingRequirements lockReq); ~BucketLock(); const document::Bucket &getBucket() const override { return _bucket; } + api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; } private: Stripe & _stripe; document::Bucket _bucket; + api::StorageMessage::Id _uniqueMsgId; + api::LockingRequirements _lockReq; }; FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&, @@ -253,8 +264,8 @@ public: uint32_t getNextStripeId(uint32_t disk); std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket, uint16_t disk) { - return _diskInfo[disk].lock(bucket); + lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) { + return _diskInfo[disk].lock(bucket, lockReq); } void addMergeStatus(const document::Bucket&, MergeStatus::SP); diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index ba7f5979569..d0572e7dbf8 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -38,6 +38,9 @@ public: void setMaxByteSize(uint32_t maxByteSize) { _maxByteSize = maxByteSize; } uint32_t getMaxByteSize() const { return _maxByteSize; } + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Shared; + } void print(std::ostream& out, bool verbose, const std::string& indent) const override; private: @@ -105,6 +108,9 @@ public: spi::ReadConsistency getReadConsistency() const noexcept { return _readConsistency; } + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Shared; + } std::unique_ptr<api::StorageReply> makeReply() override; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index c2dcb8e2a29..888dc93dd82 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -122,9 +122,14 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, result.disk = getPreferredAvailableDisk(bucket); while (true) { + // This function is only called in a context where we require exclusive + // locking (split/join). Refactor if this no longer the case. std::shared_ptr<FileStorHandler::BucketLockInterface> lock( - _fileStorHandler.lock(bucket, result.disk)); + _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive)); + // TODO disks are no longer used in practice, can we safely discard this? + // Might need it for synchronization purposes if something has taken the + // disk lock _and_ the bucket lock...? StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "join-lockAndGetDisk-1", flags)); if (entry.exist() && entry->disk != result.disk) { diff --git a/storage/src/vespa/storage/visiting/stor-visitor.def b/storage/src/vespa/storage/visiting/stor-visitor.def index 1e80f2993a5..6f16bcb60a2 100644 --- a/storage/src/vespa/storage/visiting/stor-visitor.def +++ b/storage/src/vespa/storage/visiting/stor-visitor.def @@ -24,6 +24,7 @@ defaultparalleliterators int default=8 ## will be 16 requests to persistence layer, but only 8 will be able to execute ## at the same time, since only one operation can be executed at the same time ## for one bucket) +## DEPRECATED: ignored by backend, 1 is always used. iterators_per_bucket int default=1 ## Default number of maximum client replies pending. diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index a8f31514eb1..b12a1eb6e7f 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -637,7 +637,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd) _ignoreNonExistingVisitorTimeLimit = config.ignorenonexistingvisitortimelimit; _defaultParallelIterators = config.defaultparalleliterators; - _iteratorsPerBucket = config.iteratorsPerBucket; _defaultPendingMessages = config.defaultpendingmessages; _defaultDocBlockSize = config.defaultdocblocksize; _visitorMemoryUsageLimit = config.visitorMemoryUsageLimit; @@ -647,12 +646,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd) LOG(config, "Cannot use value of defaultParallelIterators < 1"); _defaultParallelIterators = 1; } - if (_iteratorsPerBucket < 1 && _iteratorsPerBucket > 10) { - if (_iteratorsPerBucket < 1) _iteratorsPerBucket = 1; - else _iteratorsPerBucket = 10; - LOG(config, "Invalid value of iterators per bucket %u using %u", - config.iteratorsPerBucket, _iteratorsPerBucket); - } if (_defaultPendingMessages < 1) { LOG(config, "Cannot use value of defaultPendingMessages < 1"); _defaultPendingMessages = 1; diff --git a/storageapi/src/vespa/storageapi/message/persistence.h b/storageapi/src/vespa/storageapi/message/persistence.h index bda1bd0f038..59934154cf5 100644 --- a/storageapi/src/vespa/storageapi/message/persistence.h +++ b/storageapi/src/vespa/storageapi/message/persistence.h @@ -24,7 +24,7 @@ class TestAndSetCommand : public BucketInfoCommand { TestAndSetCondition _condition; public: TestAndSetCommand(const MessageType & messageType, const document::Bucket &bucket); - ~TestAndSetCommand(); + ~TestAndSetCommand() override; void setCondition(const TestAndSetCondition & condition) { _condition = condition; } const TestAndSetCondition & getCondition() const { return _condition; } @@ -49,7 +49,7 @@ class PutCommand : public TestAndSetCommand { public: PutCommand(const document::Bucket &bucket, const DocumentSP&, Timestamp); - ~PutCommand(); + ~PutCommand() override; void setTimestamp(Timestamp ts) { _timestamp = ts; } @@ -86,7 +86,7 @@ class PutReply : public BucketInfoReply { public: explicit PutReply(const PutCommand& cmd, bool wasFound = true); - ~PutReply(); + ~PutReply() override; const document::DocumentId& getDocumentId() const { return _docId; } bool hasDocument() const { return _document.get(); } @@ -116,7 +116,7 @@ class UpdateCommand : public TestAndSetCommand { public: UpdateCommand(const document::Bucket &bucket, const std::shared_ptr<document::DocumentUpdate>&, Timestamp); - ~UpdateCommand(); + ~UpdateCommand() override; void setTimestamp(Timestamp ts) { _timestamp = ts; } void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; } @@ -147,7 +147,7 @@ class UpdateReply : public BucketInfoReply { public: UpdateReply(const UpdateCommand& cmd, Timestamp oldTimestamp = 0); - ~UpdateReply(); + ~UpdateReply() override; void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; } @@ -189,7 +189,7 @@ class GetCommand : public BucketInfoCommand { public: GetCommand(const document::Bucket &bucket, const document::DocumentId&, const vespalib::stringref & fieldSet, Timestamp before = MAX_TIMESTAMP); - ~GetCommand(); + ~GetCommand() override; void setBeforeTimestamp(Timestamp ts) { _beforeTimestamp = ts; } const document::DocumentId& getDocumentId() const { return _docId; } Timestamp getBeforeTimestamp() const { return _beforeTimestamp; } @@ -199,6 +199,10 @@ public: vespalib::string getSummary() const override; void print(std::ostream& out, bool verbose, const std::string& indent) const override; + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Shared; + } + DECLARE_STORAGECOMMAND(GetCommand, onGet) }; @@ -219,7 +223,7 @@ public: GetReply(const GetCommand& cmd, const DocumentSP& doc = DocumentSP(), Timestamp lastModified = 0); - ~GetReply(); + ~GetReply() override; const DocumentSP& getDocument() const { return _doc; } const document::DocumentId& getDocumentId() const { return _docId; } @@ -245,7 +249,7 @@ class RemoveCommand : public TestAndSetCommand { public: RemoveCommand(const document::Bucket &bucket, const document::DocumentId& docId, Timestamp timestamp); - ~RemoveCommand(); + ~RemoveCommand() override; void setTimestamp(Timestamp ts) { _timestamp = ts; } const document::DocumentId& getDocumentId() const override { return _docId; } @@ -267,7 +271,7 @@ class RemoveReply : public BucketInfoReply { Timestamp _oldTimestamp; public: explicit RemoveReply(const RemoveCommand& cmd, Timestamp oldTimestamp = 0); - ~RemoveReply(); + ~RemoveReply() override; const document::DocumentId& getDocumentId() const { return _docId; } Timestamp getTimestamp() { return _timestamp; }; @@ -289,7 +293,7 @@ class RevertCommand : public BucketInfoCommand { public: RevertCommand(const document::Bucket &bucket, const std::vector<Timestamp>& revertTokens); - ~RevertCommand(); + ~RevertCommand() override; const std::vector<Timestamp>& getRevertTokens() const { return _tokens; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(RevertCommand, onRevert) @@ -305,7 +309,7 @@ class RevertReply : public BucketInfoReply { std::vector<Timestamp> _tokens; public: explicit RevertReply(const RevertCommand& cmd); - ~RevertReply(); + ~RevertReply() override; const std::vector<Timestamp>& getRevertTokens() const { return _tokens; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGEREPLY(RevertReply, onRevertReply) diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index f970091f695..380d846dd93 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -302,4 +302,19 @@ StorageMessage::getSummary() const { return toString(); } +const char* to_string(LockingRequirements req) noexcept { + switch (req) { + case LockingRequirements::Exclusive: + return "Exclusive"; + case LockingRequirements::Shared: + return "Shared"; + } + assert(false); +} + +std::ostream& operator<<(std::ostream& os, LockingRequirements req) { + os << to_string(req); + return os; +} + } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index dadb68c644d..6c561f3af21 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -19,6 +19,7 @@ #include <vespa/document/bucket/bucket.h> #include <vespa/vespalib/util/printable.h> #include <map> +#include <iosfwd> namespace vespalib { class asciistream; @@ -306,6 +307,20 @@ struct TransportContext { virtual ~TransportContext() = 0; }; +enum class LockingRequirements : uint8_t { + // Operations with exclusive locking can only be executed iff no other + // exclusive or shared locks are taken for its bucket. + Exclusive = 0, + // Operations with shared locking can only be executed iff no exclusive + // lock is taken for its bucket. Should only be used for read-only operations + // that cannot mutate a bucket's state. + Shared +}; + +const char* to_string(LockingRequirements req) noexcept; + +std::ostream& operator<<(std::ostream&, LockingRequirements); + class StorageMessage : public vespalib::Printable { friend class StorageMessageTest; // Used for testing only @@ -421,6 +436,10 @@ public: virtual document::Bucket getBucket() const { return getDummyBucket(); } document::BucketId getBucketId() const { return getBucket().getBucketId(); } virtual bool hasSingleBucketId() const { return false; } + virtual LockingRequirements lockingRequirements() const noexcept { + // Safe default: assume exclusive locking is required. + return LockingRequirements::Exclusive; + } }; } |