aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--parent/pom.xml2
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp46
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h26
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp134
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h28
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp56
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.cpp29
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.h8
-rw-r--r--storage/src/tests/persistence/filestorage/mergeblockingtest.cpp2
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp12
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp196
-rw-r--r--storage/src/tests/visiting/visitortest.cpp41
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp138
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h47
-rw-r--r--storage/src/vespa/storage/persistence/messages.h6
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp7
-rw-r--r--storage/src/vespa/storage/visiting/stor-visitor.def1
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp7
-rw-r--r--storageapi/src/vespa/storageapi/message/persistence.h26
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp15
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h19
26 files changed, 527 insertions, 347 deletions
diff --git a/parent/pom.xml b/parent/pom.xml
index 56da33a9c93..0b141046d8a 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -130,7 +130,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
- <version>3.0.0-M1</version>
+ <version>3.0.0-M2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index d3883744229..25c6b71f7ff 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -528,7 +528,7 @@ DummyPersistence::get(const Bucket& b,
b.toString().c_str(),
did.toString().c_str());
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
- BucketContentGuard::UP bc(acquireBucketWithLock(b));
+ BucketContentGuard::UP bc(acquireBucketWithLock(b, LockMode::Shared));
if (!bc.get()) {
} else {
DocEntry::SP entry((*bc)->getEntry(did));
@@ -568,7 +568,7 @@ DummyPersistence::createIterator(
"Got invalid/unparseable document selection string");
}
}
- BucketContentGuard::UP bc(acquireBucketWithLock(b));
+ BucketContentGuard::UP bc(acquireBucketWithLock(b, LockMode::Shared));
if (!bc.get()) {
return CreateIteratorResult(Result::TRANSIENT_ERROR, "Bucket not found");
}
@@ -656,7 +656,7 @@ DummyPersistence::iterate(IteratorId id, uint64_t maxByteSize, Context& ctx) con
it = iter->second.get();
}
- BucketContentGuard::UP bc(acquireBucketWithLock(it->_bucket));
+ BucketContentGuard::UP bc(acquireBucketWithLock(it->_bucket, LockMode::Shared));
if (!bc.get()) {
ctx.trace(9, "finished iterate(); bucket not found");
return IterateResult(Result::TRANSIENT_ERROR, "Bucket not found");
@@ -942,11 +942,11 @@ DummyPersistence::isActive(const Bucket& b) const
BucketContentGuard::~BucketContentGuard()
{
- _persistence.releaseBucketNoLock(_content);
+ _persistence.releaseBucketNoLock(_content, _lock_mode);
}
BucketContentGuard::UP
-DummyPersistence::acquireBucketWithLock(const Bucket& b) const
+DummyPersistence::acquireBucketWithLock(const Bucket& b, LockMode lock_mode) const
{
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
vespalib::MonitorGuard lock(_monitor);
@@ -955,28 +955,32 @@ DummyPersistence::acquireBucketWithLock(const Bucket& b) const
if (it == ncp._content[b.getPartition()].end()) {
return BucketContentGuard::UP();
}
- // Sanity check that SPI-level locking is doing its job correctly.
- // Atomic CAS might be a bit overkill, but since we "release" the bucket
- // outside of the mutex, we want to ensure the write is visible across all
- // threads.
- bool my_false(false);
- bool bucketNotInUse(it->second->_inUse.compare_exchange_strong(my_false, true));
- if (!bucketNotInUse) {
- LOG(error, "Attempted to acquire %s, but it was already marked as being in use!",
- b.toString().c_str());
- LOG_ABORT("should not reach here");
+ if (lock_mode == LockMode::Exclusive) {
+ // Sanity check that SPI-level locking is doing its job correctly.
+ // Atomic CAS might be a bit overkill, but since we "release" the bucket
+ // outside of the mutex, we want to ensure the write is visible across all
+ // threads.
+ bool my_false(false);
+ bool bucketNotInUse(it->second->_inUse.compare_exchange_strong(my_false, true));
+ if (!bucketNotInUse) {
+ LOG(error, "Attempted to acquire %s, but it was already marked as being in use!",
+ b.toString().c_str());
+ LOG_ABORT("dummy persistence bucket locking invariant violation");
+ }
}
- return BucketContentGuard::UP(new BucketContentGuard(ncp, *it->second));
+ return std::make_unique<BucketContentGuard>(ncp, *it->second, lock_mode);
}
void
-DummyPersistence::releaseBucketNoLock(const BucketContent& bc) const
+DummyPersistence::releaseBucketNoLock(const BucketContent& bc, LockMode lock_mode) const noexcept
{
- bool my_true(true);
- bool bucketInUse(bc._inUse.compare_exchange_strong(my_true, false));
- assert(bucketInUse);
- (void) bucketInUse;
+ if (lock_mode == LockMode::Exclusive) {
+ bool my_true(true);
+ bool bucketInUse(bc._inUse.compare_exchange_strong(my_true, false));
+ assert(bucketInUse);
+ (void) bucketInUse;
+ }
}
}
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index c93b7fd22c7..c97aab822ac 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -24,13 +24,18 @@ class DocumentTypeRepo;
namespace storage::spi::dummy {
+enum class LockMode {
+ Exclusive,
+ Shared
+};
+
struct BucketEntry
{
DocEntry::SP entry;
GlobalId gid;
BucketEntry(DocEntry::SP e, const GlobalId& g)
- : entry(e),
+ : entry(std::move(e)),
gid(g)
{ }
};
@@ -98,30 +103,33 @@ class BucketContentGuard
BucketContentGuard(const BucketContentGuard&);
BucketContentGuard& operator=(const BucketContentGuard&);
public:
- typedef std::unique_ptr<BucketContentGuard> UP;
+ using UP = std::unique_ptr<BucketContentGuard>;
BucketContentGuard(DummyPersistence& persistence,
- BucketContent& content)
+ BucketContent& content,
+ LockMode lock_mode)
: _persistence(persistence),
- _content(content)
+ _content(content),
+ _lock_mode(lock_mode)
{
}
~BucketContentGuard();
- BucketContent& getContent() {
+ BucketContent& getContent() noexcept {
return _content;
}
- BucketContent* operator->() {
+ BucketContent* operator->() noexcept {
return &_content;
}
- BucketContent& operator*() {
+ BucketContent& operator*() noexcept {
return _content;
}
private:
DummyPersistence& _persistence;
BucketContent& _content;
+ LockMode _lock_mode;
};
class DummyPersistence : public AbstractPersistenceProvider
@@ -207,8 +215,8 @@ public:
private:
friend class BucketContentGuard;
// Const since funcs only alter mutable field in BucketContent
- BucketContentGuard::UP acquireBucketWithLock(const Bucket& b) const;
- void releaseBucketNoLock(const BucketContent& bc) const;
+ BucketContentGuard::UP acquireBucketWithLock(const Bucket& b, LockMode lock_mode = LockMode::Exclusive) const;
+ void releaseBucketNoLock(const BucketContent& bc, LockMode lock_mode = LockMode::Exclusive) const noexcept;
mutable bool _initialized;
std::shared_ptr<const document::DocumentTypeRepo> _repo;
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index b5f2fc198c4..96b3d385b87 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -232,6 +232,9 @@ struct PersistenceProvider
* document id. If no versions were found, or the document was removed,
* the result should be successful, but contain no document (see GetResult).
*
+ * Concurrency note: may be called concurrently with other read-only
+ * operations.
+ *
* @param fieldSet A set of fields that should be retrieved.
* @param id The document id to retrieve.
*/
@@ -253,6 +256,9 @@ struct PersistenceProvider
* iteration progress and selection criteria. destroyIterator will NOT
* be called when createIterator returns an error.
*
+ * Concurrency note: may be called concurrently with other read-only
+ * operations.
+ *
* @param selection Selection criteria used to limit the subset of
* the bucket's documents that will be returned by the iterator. The
* provider implementation may use these criteria to optimize its
@@ -323,6 +329,9 @@ struct PersistenceProvider
* iterator must only set this flag on the result and return without any
* documents.
*
+ * Concurrency note: may be called concurrently with other read-only
+ * operations.
+ *
* @param id An iterator ID returned by a previous call to createIterator
* @param maxByteSize An indication of the maximum number of bytes that
* should be returned.
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 95b3008985b..0d2c556b4d6 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -8,6 +8,7 @@
#include "tls_stats_factory.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/vespalib/util/jsonwriter.h>
+#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".proton.flushengine.flushengine");
@@ -22,8 +23,7 @@ namespace proton {
namespace {
search::SerialNum
-findOldestFlushedSerial(const IFlushTarget::List &lst,
- const IFlushHandler &handler)
+findOldestFlushedSerial(const IFlushTarget::List &lst, const IFlushHandler &handler)
{
search::SerialNum ret(handler.getCurrentSerialNumber());
for (const IFlushTarget::SP & target : lst) {
@@ -33,42 +33,46 @@ findOldestFlushedSerial(const IFlushTarget::List &lst,
return ret;
}
+void
+logTarget(const char * text, const FlushContext & ctx) {
+ LOG(debug, "Target '%s' %s flush of transactions %" PRIu64 " through %" PRIu64 ".",
+ ctx.getName().c_str(), text,
+ ctx.getTarget()->getFlushedSerialNum() + 1,
+ ctx.getHandler()->getCurrentSerialNumber());
+}
+
}
-FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id) :
- _name(name),
- _start(start),
- _id(id)
+FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id)
+ : _name(name),
+ _start(start),
+ _id(id)
{ }
-FlushEngine::FlushMeta::~FlushMeta() { }
+FlushEngine::FlushMeta::~FlushMeta() = default;
-FlushEngine::FlushInfo::FlushInfo() :
- FlushMeta("", fastos::ClockSystem::now(), 0),
- _target()
+FlushEngine::FlushInfo::FlushInfo()
+ : FlushMeta("", fastos::ClockSystem::now(), 0),
+ _target()
{
}
-FlushEngine::FlushInfo::~FlushInfo() { }
+FlushEngine::FlushInfo::~FlushInfo() = default;
-FlushEngine::FlushInfo::FlushInfo(uint32_t taskId,
- const IFlushTarget::SP &target,
- const vespalib::string & destination) :
- FlushMeta(destination, fastos::ClockSystem::now(), taskId),
- _target(target)
+FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const IFlushTarget::SP &target, const vespalib::string & destination)
+ : FlushMeta(destination, fastos::ClockSystem::now(), taskId),
+ _target(target)
{
}
-FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
- tlsStatsFactory,
- IFlushStrategy::SP strategy, uint32_t numThreads,
- uint32_t idleIntervalMS)
+FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory,
+ IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS)
: _closed(false),
_maxConcurrent(numThreads),
_idleIntervalMS(idleIntervalMS),
_taskId(0),
_threadPool(128 * 1024),
- _strategy(strategy),
+ _strategy(std::move(strategy)),
_priorityStrategy(),
_executor(numThreads, 128 * 1024),
_lock(),
@@ -78,11 +82,9 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
_setStrategyLock(),
_strategyLock(),
_strategyCond(),
- _tlsStatsFactory(tlsStatsFactory),
+ _tlsStatsFactory(std::move(tlsStatsFactory)),
_pendingPrune()
-{
- // empty
-}
+{ }
FlushEngine::~FlushEngine()
{
@@ -92,7 +94,7 @@ FlushEngine::~FlushEngine()
FlushEngine &
FlushEngine::start()
{
- if (_threadPool.NewThread(this) == NULL) {
+ if (_threadPool.NewThread(this) == nullptr) {
throw vespalib::IllegalStateException("Failed to start engine thread.");
}
return *this;
@@ -148,10 +150,8 @@ FlushEngine::wait(size_t minimumWaitTimeIfReady)
}
void
-FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg)
+FlushEngine::Run(FastOS_ThreadInterface *, void *)
{
- (void)thread;
- (void)arg;
bool shouldIdle = false;
vespalib::string prevFlushName;
while (wait(shouldIdle ? _idleIntervalMS : 0)) {
@@ -161,13 +161,14 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg)
}
prevFlushName = flushNextTarget(prevFlushName);
if ( ! prevFlushName.empty()) {
- // Sleep at least 10 ms after a successful flush in order to avoid busy loop in case
- // of strategy error or target error.
- FastOS_Thread::Sleep(10);
+ // Sleep 1 ms after a successful flush in order to avoid busy loop in case
+ // of strategy or target error.
+ std::this_thread::sleep_for(1ms);
} else {
shouldIdle = true;
}
- LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str());
+ LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'",
+ shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str());
}
_executor.sync();
prune();
@@ -211,18 +212,16 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const
for (const auto & it : _handlers) {
IFlushHandler & handler(*it.second);
search::SerialNum serial(handler.getCurrentSerialNumber());
- LOG(spam, "Checking FlushHandler '%s' current serial = %ld",
- handler.getName().c_str(), serial);
+ LOG(spam, "Checking FlushHandler '%s' current serial = %ld", handler.getName().c_str(), serial);
IFlushTarget::List lst = handler.getFlushTargets();
for (const IFlushTarget::SP & target : lst) {
- LOG(spam, "Checking target '%s' with flushedSerialNum = %ld", target->getName().c_str(), target->getFlushedSerialNum());
+ LOG(spam, "Checking target '%s' with flushedSerialNum = %ld",
+ target->getName().c_str(), target->getFlushedSerialNum());
if (!isFlushing(guard, FlushContext::createName(handler, *target)) || includeFlushingTargets) {
- ret.push_back(FlushContext::SP(new FlushContext(it.second,
- IFlushTarget::SP(new CachedFlushTarget(target)),
- serial)));
+ ret.push_back(std::make_shared<FlushContext>(it.second, std::make_shared<CachedFlushTarget>(target), serial));
} else {
LOG(debug, "Target '%s' with flushedSerialNum = %ld already has a flush going. Local last serial = %ld.",
- target->getName().c_str(), target->getFlushedSerialNum(), serial);
+ target->getName().c_str(), target->getFlushedSerialNum(), serial);
}
}
}
@@ -258,17 +257,12 @@ FlushEngine::initNextFlush(const FlushContext::List &lst)
break;
}
}
- if (ctx.get() != NULL) {
- LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".",
- ctx->getName().c_str(),
- ctx->getTarget()->getFlushedSerialNum() + 1,
- ctx->getHandler()->getCurrentSerialNumber());
+ if (ctx) {
+ logTarget("initiated", *ctx);
}
return ctx;
}
-
-
void
FlushEngine::flushAll(const FlushContext::List &lst)
{
@@ -276,19 +270,12 @@ FlushEngine::flushAll(const FlushContext::List &lst)
for (const FlushContext::SP & ctx : lst) {
if (wait(0)) {
if (ctx->initFlush()) {
- LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".",
- ctx->getName().c_str(),
- ctx->getTarget()->getFlushedSerialNum() + 1,
- ctx->getHandler()->getCurrentSerialNumber());
- _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx)));
+ logTarget("initiated", *ctx);
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
} else {
- LOG(debug, "Target '%s' failed to initiate flush of transactions %" PRIu64 " through %" PRIu64 ".",
- ctx->getName().c_str(),
- ctx->getTarget()->getFlushedSerialNum() + 1,
- ctx->getHandler()->getCurrentSerialNumber());
+ logTarget("failed to initiate", *ctx);
}
}
-
}
}
@@ -311,17 +298,17 @@ FlushEngine::flushNextTarget(const vespalib::string & name)
return "";
}
FlushContext::SP ctx = initNextFlush(lst.first);
- if (ctx.get() == NULL) {
+ if ( ! ctx) {
LOG(debug, "All targets refused to flush.");
return "";
}
if ( name == ctx->getName()) {
LOG(info, "The same target %s out of %ld has been asked to flush again. "
- "This might indicate flush logic flaw so I will wait 1s before doing it.",
+ "This might indicate flush logic flaw so I will wait 100 ms before doing it.",
name.c_str(), lst.first.size());
- FastOS_Thread::Sleep(1000);
+ std::this_thread::sleep_for(100ms);
}
- _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx)));
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
return ctx->getName();
}
@@ -330,12 +317,8 @@ FlushEngine::initFlush(const FlushContext &ctx)
{
if (LOG_WOULD_LOG(event)) {
IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain());
- EventLogger::flushStart(ctx.getName(),
- mgain.getBefore(),
- mgain.getAfter(),
- mgain.gain(),
- ctx.getTarget()->getFlushedSerialNum() + 1,
- ctx.getHandler()->getCurrentSerialNumber());
+ EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(),
+ ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber());
}
return initFlush(ctx.getHandler(), ctx.getTarget());
}
@@ -350,10 +333,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
}
if (LOG_WOULD_LOG(event)) {
FlushStats stats = ctx.getTarget()->getLastFlushStats();
- EventLogger::flushComplete(ctx.getName(),
- duration.ms(),
- stats.getPath(),
- stats.getPathElementsToLog());
+ EventLogger::flushComplete(ctx.getName(), duration.ms(), stats.getPath(), stats.getPathElementsToLog());
}
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec());
std::lock_guard<std::mutex> guard(_lock);
@@ -366,8 +346,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
}
IFlushHandler::SP
-FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
- const IFlushHandler::SP &flushHandler)
+FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler)
{
std::lock_guard<std::mutex> guard(_lock);
IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler));
@@ -379,13 +358,6 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
}
IFlushHandler::SP
-FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const
-{
- std::lock_guard<std::mutex> guard(_lock);
- return _handlers.getHandler(docTypeName);
-}
-
-IFlushHandler::SP
FlushEngine::removeFlushHandler(const DocTypeName &docTypeName)
{
std::lock_guard<std::mutex> guard(_lock);
@@ -430,7 +402,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy)
return;
}
assert(!_priorityStrategy);
- _priorityStrategy = strategy;
+ _priorityStrategy = std::move(strategy);
{
std::lock_guard<std::mutex> guard(_lock);
_cond.notify_all();
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 19175f9ce2a..c1be05ba067 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -15,7 +15,7 @@ namespace proton {
namespace flushengine { class ITlsStatsFactory; }
-class FlushEngine : public FastOS_Runnable
+class FlushEngine final : public FastOS_Runnable
{
public:
class FlushMeta {
@@ -37,9 +37,7 @@ private:
struct FlushInfo : public FlushMeta
{
FlushInfo();
- FlushInfo(uint32_t taskId,
- const IFlushTarget::SP &target,
- const vespalib::string &destination);
+ FlushInfo(uint32_t taskId, const IFlushTarget::SP &target, const vespalib::string &destination);
~FlushInfo();
IFlushTarget::SP _target;
@@ -96,14 +94,13 @@ public:
* @param numThreads The number of worker threads to use.
* @param idleInterval The interval between when flushes are checked whne there are no one progressing.
*/
- FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
- tlsStatsFactory,
+ FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory,
IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS);
/**
* Destructor. Waits for all pending tasks to complete.
*/
- ~FlushEngine();
+ ~FlushEngine() override;
/**
* Observe and reset internal executor stats
@@ -145,19 +142,8 @@ public:
* @param flushHandler The handler to register.
* @return The replaced handler, if any.
*/
- IFlushHandler::SP
- putFlushHandler(const DocTypeName &docTypeName,
- const IFlushHandler::SP &flushHandler);
+ IFlushHandler::SP putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler);
- /**
- * Returns the flush handler for the given document type. If no handler was
- * registered, this method returns an empty shared pointer.
- *
- * @param docType The document type whose handler to return.
- * @return The registered handler, if any.
- */
- IFlushHandler::SP
- getFlushHandler(const DocTypeName &docTypeName) const;
/**
* Removes and returns the flush handler for the given document type. If no
@@ -166,10 +152,8 @@ public:
* @param docType The document type whose handler to remove.
* @return The removed handler, if any.
*/
- IFlushHandler::SP
- removeFlushHandler(const DocTypeName &docTypeName);
+ IFlushHandler::SP removeFlushHandler(const DocTypeName &docTypeName);
- // Implements FastOS_Runnable.
void Run(FastOS_ThreadInterface *thread, void *arg) override;
FlushMetaSet getCurrentlyFlushingSet() const;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
index 3a24330f8ec..914de9df30c 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
@@ -30,9 +30,7 @@ public:
*/
IFlushHandler(const vespalib::string &name)
: _name(name)
- {
- // empty
- }
+ { }
/**
* Virtual destructor required for inheritance.
@@ -76,8 +74,7 @@ public:
* This method is called to sync tls to stable media, up to and
* including the given serial number.
*
- * @param syncTo The last serial number that has to be persisted to stable
- * media.
+ * @param syncTo The last serial number that has to be persisted to stable media.
*/
virtual void syncTls(SerialNum syncTo) = 0;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp b/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp
index aef49c51f7a..1c930ef1ddc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/prepare_restart_handler.cpp
@@ -50,8 +50,7 @@ PrepareRestartHandler::performPrepareRestart(const ProtonConfig &protonCfg, std:
{
_running = true;
lock.unlock();
- auto strategy = std::make_shared<PrepareRestartFlushStrategy>(createPrepareRestartConfig(protonCfg));
- _flushEngine.setStrategy(strategy);
+ _flushEngine.setStrategy(std::make_shared<PrepareRestartFlushStrategy>(createPrepareRestartConfig(protonCfg)));
lock.lock();
_running = false;
_cond.notify_all();
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 32dc711d5cf..6ffb82a2ad3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -240,9 +240,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
const HwInfo & hwInfo = configSnapshot->getHwInfo();
setFS4Compression(protonConfig);
- _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>
- (protonConfig.basedir,
- diskMemUsageSamplerConfig(protonConfig, hwInfo));
+ _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir,
+ diskMemUsageSamplerConfig(protonConfig, hwInfo));
_tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext);
_metricsEngine->addMetricsHook(_metricsHook);
@@ -253,6 +252,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_distributionKey = protonConfig.distributionkey;
_summaryEngine= std::make_unique<SummaryEngine>(protonConfig.numsummarythreads);
_docsumBySlime = std::make_unique<DocsumBySlime>(*_summaryEngine);
+
IFlushStrategy::SP strategy;
const ProtonConfig::Flush & flush(protonConfig.flush);
switch (flush.strategy) {
@@ -283,17 +283,15 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
LOG(debug, "Start proton server with root at %s and cwd at %s",
protonConfig.basedir.c_str(), getcwd(tmp, sizeof(tmp)));
- _persistenceEngine.reset(new PersistenceEngine(*this,
- _diskMemUsageSampler->writeFilter(),
- protonConfig.visit.defaultserializedsize,
- protonConfig.visit.ignoremaxbytes));
-
+ _persistenceEngine = std::make_unique<PersistenceEngine>(*this, _diskMemUsageSampler->writeFilter(),
+ protonConfig.visit.defaultserializedsize,
+ protonConfig.visit.ignoremaxbytes);
vespalib::string fileConfigId;
- _warmupExecutor.reset(new vespalib::ThreadStackExecutor(4, 128*1024));
+ _warmupExecutor = std::make_unique<vespalib::ThreadStackExecutor>(4, 128*1024);
const size_t summaryThreads = deriveCompactionCompressionThreads(protonConfig, hwInfo.cpu());
- _summaryExecutor.reset(new vespalib::BlockingThreadStackExecutor(summaryThreads, 128*1024, summaryThreads*16));
+ _summaryExecutor = std::make_unique<vespalib::BlockingThreadStackExecutor>(summaryThreads, 128*1024, summaryThreads*16);
InitializeThreads initializeThreads;
if (protonConfig.initialize.threads > 0) {
initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128 * 1024);
@@ -305,7 +303,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_prepareRestartHandler = std::make_unique<PrepareRestartHandler>(*_flushEngine);
RPCHooks::Params rpcParams(*this, protonConfig.rpcport, _configUri.getConfigId());
rpcParams.slobrok_config = _configUri.createWithNewId(protonConfig.slobrokconfigid);
- _rpcHooks.reset(new RPCHooks(rpcParams));
+ _rpcHooks = std::make_unique<RPCHooks>(rpcParams);
waitForInitDone();
@@ -528,23 +526,11 @@ Proton::addDocumentDB(const document::DocumentType &docType,
// 1 thread per document type.
initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(1, 128 * 1024);
}
- DocumentDB::SP ret(new DocumentDB(config.basedir + "/documents",
- documentDBConfig,
- config.tlsspec,
- _queryLimiter,
- _clock,
- docTypeName,
- bucketSpace,
- config,
- *this,
- *_warmupExecutor,
- *_summaryExecutor,
- *_tls->getTransLogServer(),
- *_metricsEngine,
- _fileHeaderContext,
- std::move(config_store),
- initializeThreads,
- bootstrapConfig->getHwInfo()));
+ auto ret = std::make_shared<DocumentDB>(config.basedir + "/documents", documentDBConfig, config.tlsspec,
+ _queryLimiter, _clock, docTypeName, bucketSpace, config, *this,
+ *_warmupExecutor, *_summaryExecutor, *_tls->getTransLogServer(),
+ *_metricsEngine, _fileHeaderContext, std::move(config_store),
+ initializeThreads, bootstrapConfig->getHwInfo());
try {
ret->start();
} catch (vespalib::Exception &e) {
@@ -571,10 +557,10 @@ Proton::addDocumentDB(const document::DocumentType &docType,
// TODO: Fix race with new cluster state setting.
_persistenceEngine->putHandler(bucketSpace, docTypeName, persistenceHandler);
}
- SearchHandlerProxy::SP searchHandler(new SearchHandlerProxy(ret));
+ auto searchHandler = std::make_shared<SearchHandlerProxy>(ret);
_summaryEngine->putSearchHandler(docTypeName, searchHandler);
_matchEngine->putSearchHandler(docTypeName, searchHandler);
- FlushHandlerProxy::SP flushHandler(new FlushHandlerProxy(ret));
+ auto flushHandler = std::make_shared<FlushHandlerProxy>(ret);
_flushEngine->putFlushHandler(docTypeName, flushHandler);
_diskMemUsageSampler->notifier().addDiskMemUsageListener(ret->diskMemUsageListener());
return ret;
@@ -624,7 +610,7 @@ Proton::MonitorReply::UP
Proton::ping(MonitorRequest::UP request, MonitorClient & client)
{
(void) client;
- MonitorReply::UP reply(new MonitorReply());
+ auto reply = std::make_unique<MonitorReply>();
MonitorReply &ret = *reply;
BootstrapConfig::SP configSnapshot = getActiveConfigSnapshot();
@@ -807,8 +793,8 @@ struct DocumentDBMapExplorer : vespalib::StateExplorer {
std::shared_timed_mutex &mutex;
DocumentDBMapExplorer(const DocumentDBMap &documentDBMap_in, std::shared_timed_mutex &mutex_in)
: documentDBMap(documentDBMap_in), mutex(mutex_in) {}
- virtual void get_state(const vespalib::slime::Inserter &, bool) const override {}
- virtual std::vector<vespalib::string> get_children_names() const override {
+ void get_state(const vespalib::slime::Inserter &, bool) const override {}
+ std::vector<vespalib::string> get_children_names() const override {
std::shared_lock<std::shared_timed_mutex> guard(mutex);
std::vector<vespalib::string> names;
for (const auto &item: documentDBMap) {
@@ -816,14 +802,14 @@ struct DocumentDBMapExplorer : vespalib::StateExplorer {
}
return names;
}
- virtual std::unique_ptr<vespalib::StateExplorer> get_child(vespalib::stringref name) const override {
+ std::unique_ptr<vespalib::StateExplorer> get_child(vespalib::stringref name) const override {
typedef std::unique_ptr<StateExplorer> Explorer_UP;
std::shared_lock<std::shared_timed_mutex> guard(mutex);
auto result = documentDBMap.find(DocTypeName(vespalib::string(name)));
if (result == documentDBMap.end()) {
return Explorer_UP(nullptr);
}
- return Explorer_UP(new DocumentDBExplorer(result->second));
+ return std::make_unique<DocumentDBExplorer>(result->second);
}
};
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp
index c92687f798b..835b8ef1044 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.cpp
+++ b/storage/src/tests/persistence/common/filestortestfixture.cpp
@@ -18,19 +18,17 @@ spi::LoadType FileStorTestFixture::defaultLoadType = spi::LoadType(0, "default")
const uint32_t FileStorTestFixture::MSG_WAIT_TIME;
void
-FileStorTestFixture::setupDisks(uint32_t diskCount)
+FileStorTestFixture::setupPersistenceThreads(uint32_t threads)
{
std::string rootOfRoot = "todo-make-unique-filestorefixture";
- _config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot)));
-
- _config2.reset(new vdstestlib::DirConfig(*_config));
- _config2->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config2->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config2->getConfig("stor-server").set("node_index", "1");
-
- _smallConfig.reset(new vdstestlib::DirConfig(*_config));
- _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(1),
- _config->getConfigId()));
+ _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot));
+ _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2"));
+ _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2"));
+ _config->getConfig("stor-server").set("node_index", "1");
+ _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads));
+
+ _node = std::make_unique<TestServiceLayerApp>(
+ DiskCount(1), NodeIndex(1), _config->getConfigId());
_testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1");
}
@@ -38,16 +36,15 @@ FileStorTestFixture::setupDisks(uint32_t diskCount)
void
FileStorTestFixture::setUp()
{
- setupDisks(1);
+ setupPersistenceThreads(1);
_node->setPersistenceProvider(
- spi::PersistenceProvider::UP(
- new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
+ std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1));
}
void
FileStorTestFixture::tearDown()
{
- _node.reset(0);
+ _node.reset();
}
void
@@ -91,7 +88,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents(
}
api::StorageMessageAddress
-FileStorTestFixture::TestFileStorComponents::makeSelfAddress() const {
+FileStorTestFixture::makeSelfAddress() {
return api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 0);
}
diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h
index c8158d01224..c46f9de24fc 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.h
+++ b/storage/src/tests/persistence/common/filestortestfixture.h
@@ -19,8 +19,6 @@ public:
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<vdstestlib::DirConfig> _config;
- std::unique_ptr<vdstestlib::DirConfig> _config2;
- std::unique_ptr<vdstestlib::DirConfig> _smallConfig;
const document::DocumentType* _testdoctype1;
static const uint32_t MSG_WAIT_TIME = 60 * 1000;
@@ -30,10 +28,12 @@ public:
void setUp() override;
void tearDown() override;
- void setupDisks(uint32_t diskCount);
+ void setupPersistenceThreads(uint32_t diskCount);
void createBucket(const document::BucketId& bid);
bool bucketExistsInDb(const document::BucketId& bucket) const;
+ static api::StorageMessageAddress makeSelfAddress();
+
api::ReturnCode::Result resultOf(const api::StorageReply& reply) const {
return reply.getResult().getResult();
}
@@ -99,8 +99,6 @@ public:
const char* testName,
const StorageLinkInjector& i = NoOpStorageLinkInjector());
- api::StorageMessageAddress makeSelfAddress() const;
-
void sendDummyGet(const document::BucketId& bid);
void sendPut(const document::BucketId& bid,
uint32_t docIdx,
diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
index 50999f5883e..d4cec415937 100644
--- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
@@ -16,7 +16,7 @@ class MergeBlockingTest : public FileStorTestFixture
{
public:
void setupDisks() {
- FileStorTestFixture::setupDisks(1);
+ FileStorTestFixture::setupPersistenceThreads(1);
_node->setPersistenceProvider(
spi::PersistenceProvider::UP(
new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 64ef48b5719..e12f48bcdea 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -86,9 +86,9 @@ public:
std::unique_ptr<vespalib::Barrier> _queueBarrier;
std::unique_ptr<vespalib::Barrier> _completionBarrier;
- void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) {
- FileStorTestFixture::setupDisks(diskCount);
- _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount));
+ void setupProviderAndBarriers(uint32_t queueBarrierThreads) {
+ FileStorTestFixture::setupPersistenceThreads(1);
+ _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1));
_queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads));
_completionBarrier.reset(new vespalib::Barrier(2));
_blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier);
@@ -219,7 +219,7 @@ makeAbortCmd(const Container& buckets)
void
OperationAbortingTest::testAbortMessageClearsRelevantQueuedOperations()
{
- setupDisks(1, 2);
+ setupProviderAndBarriers(2);
TestFileStorComponents c(*this, "testAbortMessageClearsRelevantQueuedOperations");
document::BucketId bucket(16, 1);
createBucket(bucket);
@@ -305,7 +305,7 @@ public:
void
OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket()
{
- setupDisks(1, 3);
+ setupProviderAndBarriers(3);
TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket");
document::BucketId bucket(16, 1);
@@ -386,7 +386,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName,
const std::vector<api::StorageMessage::SP>& msgs,
bool shouldCreateBucketInitially)
{
- setupDisks(1, 2);
+ setupProviderAndBarriers(2);
TestFileStorComponents c(*this, testName);
document::BucketId bucket(16, 1);
document::BucketId blockerBucket(16, 2);
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index f31623eed61..e96ad013923 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -15,86 +15,190 @@ using document::test::makeDocumentBucket;
namespace storage {
-class PersistenceQueueTest : public FileStorTestFixture
-{
+class PersistenceQueueTest : public FileStorTestFixture {
public:
void testFetchNextUnlockedMessageIfBucketLocked();
+ void shared_locked_operations_allow_concurrent_bucket_access();
+ void exclusive_locked_operation_not_started_if_shared_op_active();
+ void shared_locked_operation_not_started_if_exclusive_op_active();
+ void exclusive_locked_operation_not_started_if_exclusive_op_active();
+ void operation_batching_not_allowed_across_different_lock_modes();
- std::shared_ptr<api::StorageMessage>
- createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const;
void setUp() override;
- void tearDown() override;
CPPUNIT_TEST_SUITE(PersistenceQueueTest);
CPPUNIT_TEST(testFetchNextUnlockedMessageIfBucketLocked);
+ CPPUNIT_TEST(shared_locked_operations_allow_concurrent_bucket_access);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_shared_op_active);
+ CPPUNIT_TEST(shared_locked_operation_not_started_if_exclusive_op_active);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_exclusive_op_active);
+ CPPUNIT_TEST(operation_batching_not_allowed_across_different_lock_modes);
CPPUNIT_TEST_SUITE_END();
+
+ struct Fixture {
+ FileStorTestFixture& parent;
+ DummyStorageLink top;
+ std::unique_ptr<DummyStorageLink> dummyManager;
+ ForwardingMessageSender messageSender;
+ documentapi::LoadTypeSet loadTypes;
+ FileStorMetrics metrics;
+ std::unique_ptr<FileStorHandler> filestorHandler;
+ uint32_t stripeId;
+
+ explicit Fixture(FileStorTestFixture& parent);
+ ~Fixture();
+ };
+
+ static constexpr uint16_t _disk = 0;
};
CPPUNIT_TEST_SUITE_REGISTRATION(PersistenceQueueTest);
-void
-PersistenceQueueTest::setUp()
+PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
+ : parent(parent_),
+ top(),
+ dummyManager(std::make_unique<DummyStorageLink>()),
+ messageSender(*dummyManager),
+ loadTypes("raw:"),
+ metrics(loadTypes.getMetricLoadTypes())
{
- setupDisks(1);
- _node->setPersistenceProvider(
- spi::PersistenceProvider::UP(
- new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
+ top.push_back(std::move(dummyManager));
+ top.open();
+
+ metrics.initDiskMetrics(parent._node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
+
+ filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, parent._node->getPartitions(),
+ parent._node->getComponentRegister());
+ // getNextMessage will time out if no unlocked buckets are present. Choose a timeout
+ // that is large enough to fail tests with high probability if this is not the case,
+ // and small enough to not slow down testing too much.
+ filestorHandler->setGetNextMessageTimeout(20);
+
+ stripeId = filestorHandler->getNextStripeId(0);
}
-void
-PersistenceQueueTest::tearDown()
-{
- _node.reset(0);
+PersistenceQueueTest::Fixture::~Fixture() = default;
+
+void PersistenceQueueTest::setUp() {
+ setupPersistenceThreads(1);
+ _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1));
}
-std::shared_ptr<api::StorageMessage>
-PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx)
-{
- std::ostringstream id;
- id << "id:foo:testdoctype1:n=" << bucket << ":" << docIdx;
- document::Document::SP doc(
- _node->getTestDocMan().createDocument("foobar", id.str()));
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234));
- cmd->setAddress(api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 0));
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) {
+ std::shared_ptr<document::Document> doc = _node->getTestDocMan().createDocument(
+ "foobar", vespalib::make_string("id:foo:testdoctype1:n=%zu:%zu", bucket, docIdx));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234);
+ cmd->setAddress(makeSelfAddress());
return cmd;
}
-void
-PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
-{
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createGet(uint64_t bucket) const {
+ auto cmd = std::make_shared<api::GetCommand>(
+ makeDocumentBucket(document::BucketId(16, bucket)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%zu:0", bucket)), "[all]");
+ cmd->setAddress(makeSelfAddress());
+ return cmd;
+}
+void PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() {
+ Fixture f(*this);
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
// then subsequently on the second, skipping the already locked bucket.
// Puts all have same pri, so order is well defined.
- filestorHandler.schedule(createPut(1234, 0), 0);
- filestorHandler.schedule(createPut(1234, 1), 0);
- filestorHandler.schedule(createPut(5432, 0), 0);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 1), _disk);
+ f.filestorHandler->schedule(createPut(5432, 0), _disk);
- auto lock0 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock0.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock1.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
}
+void PersistenceQueueTest::shared_locked_operations_allow_concurrent_bucket_access() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Even though we already have a lock on the bucket, Gets allow shared locking and we
+ // should therefore be able to get another lock.
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock1.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_shared_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::shared_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::operation_batching_not_allowed_across_different_lock_modes() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first);
+ CPPUNIT_ASSERT(lock0.second);
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0);
+ CPPUNIT_ASSERT(!lock0.second);
+}
+
} // namespace storage
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index 27281d9b95f..4fc577226ca 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -62,7 +62,7 @@ private:
CPPUNIT_TEST(testNormalUsage);
CPPUNIT_TEST(testFailedCreateIterator);
CPPUNIT_TEST(testFailedGetIter);
- CPPUNIT_TEST(testMultipleFailedGetIter);
+ CPPUNIT_TEST(iterators_per_bucket_config_is_ignored_and_hardcoded_to_1);
CPPUNIT_TEST(testDocumentAPIClientError);
CPPUNIT_TEST(testNoDocumentAPIResendingForFailedVisitor);
CPPUNIT_TEST(testIteratorCreatedForFailedVisitor);
@@ -90,7 +90,7 @@ public:
void testNormalUsage();
void testFailedCreateIterator();
void testFailedGetIter();
- void testMultipleFailedGetIter();
+ void iterators_per_bucket_config_is_ignored_and_hardcoded_to_1();
void testDocumentAPIClientError();
void testNoDocumentAPIResendingForFailedVisitor();
void testIteratorCreatedForFailedVisitor();
@@ -592,36 +592,31 @@ VisitorTest::testFailedGetIter()
CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
}
-void
-VisitorTest::testMultipleFailedGetIter()
-{
- initializeTest(TestParams().iteratorsPerBucket(2));
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- makeCreateVisitor());
+void VisitorTest::iterators_per_bucket_config_is_ignored_and_hardcoded_to_1() {
+ initializeTest(TestParams().iteratorsPerBucket(20));
+ auto cmd = makeCreateVisitor();
_top->sendDown(cmd);
sendCreateIteratorReply();
- std::vector<GetIterCommand::SP> getIterCmds(
- fetchMultipleCommands<GetIterCommand>(*_bottom, 2));
-
- sendGetIterReply(*getIterCmds[0],
- api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND));
-
- // Wait for an "appropriate" amount of time so that wrongful logic
- // will send a DestroyIteratorCommand before all pending GetIters
- // have been replied to.
- std::this_thread::sleep_for(100ms);
+ auto getIterCmd = fetchSingleCommand<GetIterCommand>(*_bottom);
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+ sendGetIterReply(*getIterCmd);
CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands());
- sendGetIterReply(*getIterCmds[1],
- api::ReturnCode(api::ReturnCode::BUCKET_DELETED));
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ getMessagesAndReply(_documents.size(), getSession(0), docs, docIds, infoMessages);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), docIds.size());
- DestroyIteratorCommand::SP destroyIterCmd(
- fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+ auto destroyIterCmd = fetchSingleCommand<DestroyIteratorCommand>(*_bottom);
- verifyCreateVisitorReply(api::ReturnCode::BUCKET_DELETED, 0, 0);
+ verifyCreateVisitorReply(api::ReturnCode::OK);
CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+ CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount());
}
void
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 74baecbf026..0da0fd5ce66 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -71,9 +71,9 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage&
}
FileStorHandler::BucketLockInterface::SP
-FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk)
+FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq)
{
- return _impl->lock(bucket, disk);
+ return _impl->lock(bucket, disk, lockReq);
}
void
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index b74765b17d2..02c959df2f0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -58,8 +58,9 @@ public:
typedef std::shared_ptr<BucketLockInterface> SP;
virtual const document::Bucket &getBucket() const = 0;
+ virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
- virtual ~BucketLockInterface() {};
+ virtual ~BucketLockInterface() = default;
};
typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage;
@@ -139,7 +140,7 @@ public:
*
*
*/
- BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk);
+ BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq);
/**
* Called by FileStorThread::onBucketDiskMove() after moving file, in case
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index a01881b6fbe..f9571228ef9 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -370,16 +370,16 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId)
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
-FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket)
-{
+FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) {
vespalib::MonitorGuard guard(_lock);
- while (isLocked(guard, bucket)) {
- LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str());
+ while (isLocked(guard, bucket, lockReq)) {
+ LOG(spam, "Contending for filestor lock for %s with %s access",
+ bucket.getBucketId().toString().c_str(), api::to_string(lockReq));
guard.wait(100);
}
- auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0);
+ auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0, lockReq);
guard.broadcast();
return locker;
@@ -388,9 +388,9 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket)
namespace {
struct MultiLockGuard {
std::map<uint16_t, vespalib::Monitor*> monitors;
- std::vector<std::shared_ptr<vespalib::MonitorGuard> > guards;
+ std::vector<std::shared_ptr<vespalib::MonitorGuard>> guards;
- MultiLockGuard() {}
+ MultiLockGuard() = default;
void addLock(vespalib::Monitor& monitor, uint16_t index) {
monitors[index] = &monitor;
@@ -931,7 +931,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
PriorityIdx& idx(bmi::get<1>(_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
- while (iter != end && isLocked(guard, iter->_bucket)) {
+ while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) {
iter++;
}
if (iter != end) {
@@ -959,6 +959,13 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck)
}
api::StorageMessage & m(*range.first->_command);
+ // For now, don't allow batching of operations across lock requirement modes.
+ // We might relax this requirement later once we're 100% sure it can't trigger
+ // any unfortunate edge cases.
+ if (lck.first->lockingRequirements() != m.lockingRequirements()) {
+ lck.second.reset();
+ return lck;
+ }
uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]));
@@ -992,7 +999,8 @@ FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Priority
if (!messageTimedOutInQueue(*msg, waitTime)) {
auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(),
- msg->getType().getId(), msg->getMsgId());
+ msg->getType().getId(), msg->getMsgId(),
+ msg->lockingRequirements());
guard.unlock();
return FileStorHandler::LockedMessage(std::move(locker), std::move(msg));
} else {
@@ -1090,10 +1098,65 @@ FileStorHandlerImpl::Stripe::flush()
lockGuard.wait(100);
}
}
+
+void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
+ api::LockingRequirements reqOfReleasedLock,
+ api::StorageMessage::Id lockMsgId) {
+ vespalib::MonitorGuard guard(_lock);
+ auto iter = _lockedBuckets.find(bucket);
+ assert(iter != _lockedBuckets.end());
+ auto& entry = iter->second;
+
+ if (reqOfReleasedLock == api::LockingRequirements::Exclusive) {
+ assert(entry._exclusiveLock);
+ assert(entry._exclusiveLock->msgId == lockMsgId);
+ entry._exclusiveLock.reset();
+ } else {
+ assert(!entry._exclusiveLock);
+ auto shared_iter = entry._sharedLocks.find(lockMsgId);
+ assert(shared_iter != entry._sharedLocks.end());
+ entry._sharedLocks.erase(shared_iter);
+ }
+
+ if (!entry._exclusiveLock && entry._sharedLocks.empty()) {
+ _lockedBuckets.erase(iter); // No more locks held
+ }
+ guard.broadcast();
+}
+
+void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const document::Bucket & bucket,
+ api::LockingRequirements lockReq, const LockEntry & lockEntry) {
+ auto& entry = _lockedBuckets[bucket];
+ assert(!entry._exclusiveLock);
+ if (lockReq == api::LockingRequirements::Exclusive) {
+ assert(entry._sharedLocks.empty());
+ entry._exclusiveLock = lockEntry;
+ } else {
+ // TODO use a hash set with a custom comparator/hasher instead...?
+ auto inserted = entry._sharedLocks.insert(std::make_pair(lockEntry.msgId, lockEntry));
+ (void) inserted;
+ assert(inserted.second);
+ }
+}
+
bool
-FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept
+FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket,
+ api::LockingRequirements lockReq) const noexcept
{
- return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end());
+ if (bucket.getBucketId().getRawId() == 0) {
+ return false;
+ }
+ auto iter = _lockedBuckets.find(bucket);
+ if (iter == _lockedBuckets.end()) {
+ return false;
+ }
+ if (iter->second._exclusiveLock) {
+ return true;
+ }
+ // Shared locks can be taken alongside other shared locks, but exclusive locks
+ // require that no shared locks are currently present.
+ return ((lockReq == api::LockingRequirements::Exclusive)
+ && !iter->second._sharedLocks.empty());
}
uint32_t
@@ -1114,33 +1177,26 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const
FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe,
const document::Bucket &bucket, uint8_t priority,
- api::MessageType::Id msgType, api::StorageMessage::Id msgId)
+ api::MessageType::Id msgType, api::StorageMessage::Id msgId,
+ api::LockingRequirements lockReq)
: _stripe(stripe),
- _bucket(bucket)
+ _bucket(bucket),
+ _uniqueMsgId(msgId),
+ _lockReq(lockReq)
{
- (void) guard;
if (_bucket.getBucketId().getRawId() != 0) {
- // Lock the bucket and wait until it is not the current operation for
- // the disk itself.
- _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, msgType, msgId));
- LOG(debug, "Locked bucket %s with priority %u",
- bucket.getBucketId().toString().c_str(), priority);
-
- LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _bucket.getBucketId(), "acquired filestor lock", false,
- debug::BucketOperationLogger::State::BUCKET_LOCKED);
+ _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId));
+ LOG(debug, "Locked bucket %s for message %zu with priority %u in mode %s",
+ bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq));
}
}
-FileStorHandlerImpl::BucketLock::~BucketLock()
-{
+FileStorHandlerImpl::BucketLock::~BucketLock() {
if (_bucket.getBucketId().getRawId() != 0) {
- _stripe.release(_bucket);
- LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str());
- LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _bucket.getBucketId(), "released filestor lock", true,
- debug::BucketOperationLogger::State::BUCKET_UNLOCKED);
+ _stripe.release(_bucket, _lockReq, _uniqueMsgId);
+ LOG(debug, "Unlocked bucket %s for message %zu in mode %s",
+ _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq));
}
}
@@ -1182,14 +1238,31 @@ FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const
}
}
+namespace {
+
+void dump_lock_entry(const document::BucketId& bucketId, const FileStorHandlerImpl::Stripe::LockEntry& entry,
+ api::LockingRequirements lock_mode, uint32_t now_ts, std::ostream& os) {
+ os << api::MessageType::get(entry.msgType).getName() << ":" << entry.msgId << " ("
+ << bucketId << ", " << api::to_string(lock_mode)
+ << " lock) Running for " << (now_ts - entry.timestamp) << " secs<br/>\n";
+}
+
+}
+
void
FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const
{
uint32_t now = time(nullptr);
vespalib::MonitorGuard guard(_lock);
for (const auto & e : _lockedBuckets) {
- os << api::MessageType::get(e.second.msgType).getName() << ":" << e.second.msgId << " (" << e.first.getBucketId()
- << ") Running for " << (now - e.second.timestamp) << " secs<br/>\n";
+ if (e.second._exclusiveLock) {
+ dump_lock_entry(e.first.getBucketId(), *e.second._exclusiveLock,
+ api::LockingRequirements::Exclusive, now, os);
+ }
+ for (const auto& shared : e.second._sharedLocks) {
+ dump_lock_entry(e.first.getBucketId(), shared.second,
+ api::LockingRequirements::Shared, now, os);
+ }
}
}
@@ -1238,7 +1311,6 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
}
for (auto & entry : _mergeStates) {
out << "<b>" << entry.first.toString() << "</b><br>\n";
- // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here..
}
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 45ac5ded47f..f9dcca4315b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -30,6 +30,7 @@
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <atomic>
+#include <optional>
namespace storage {
@@ -82,13 +83,19 @@ public:
api::MessageType::Id msgType;
api::StorageMessage::Id msgId;
-
LockEntry() : timestamp(0), priority(0), msgType(), msgId(0) { }
LockEntry(uint8_t priority_, api::MessageType::Id msgType_, api::StorageMessage::Id msgId_)
: timestamp(time(nullptr)), priority(priority_), msgType(msgType_), msgId(msgId_)
{ }
};
+
+ struct MultiLockEntry {
+ std::optional<LockEntry> _exclusiveLock;
+ using SharedLocks = vespalib::hash_map<api::StorageMessage::Id, LockEntry>;
+ SharedLocks _sharedLocks;
+ };
+
Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender);
~Stripe();
void flush();
@@ -105,19 +112,16 @@ public:
vespalib::MonitorGuard guard(_lock);
return _queue.size();
}
- void release(const document::Bucket & bucket){
- vespalib::MonitorGuard guard(_lock);
- _lockedBuckets.erase(bucket);
- guard.broadcast();
- }
+ void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock,
+ api::StorageMessage::Id lockMsgId);
- bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept;
+ bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&,
+ api::LockingRequirements lockReq) const noexcept;
- void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) {
- _lockedBuckets.insert(std::make_pair(bucket, lockEntry));
- }
+ void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket,
+ api::LockingRequirements lockReq, const LockEntry & lockEntry);
- std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket);
+ std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk);
@@ -131,9 +135,11 @@ public:
void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; }
private:
bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const;
+ // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
+ // with its locking requirements.
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx,
PriorityIdx::iterator iter);
- typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets;
+ using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
FileStorStripeMetrics *_metrics;
@@ -178,8 +184,8 @@ public:
return _stripes[stripeId].getNextMessage(lck);
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket) {
- return stripe(bucket).lock(bucket);
+ lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
+ return stripe(bucket).lock(bucket, lockReq);
}
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
stripe(bucket).failOperations(bucket, code);
@@ -194,7 +200,7 @@ public:
// Disperse bucket bits by multiplying with the 64-bit FNV-1 prime.
// This avoids an inherent affinity between the LSB of a bucket's bits
// and the stripe an operation ends up on.
- return bucket.getBucketId().getRawId() * 1099511628211ULL;
+ return bucket.getBucketId().getId() * 1099511628211ULL;
}
Stripe & stripe(const document::Bucket & bucket) {
return _stripes[dispersed_bucket_bits(bucket) % _stripes.size()];
@@ -208,15 +214,20 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
+ // TODO refactor, too many params
BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket,
- uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id);
+ uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id,
+ api::LockingRequirements lockReq);
~BucketLock();
const document::Bucket &getBucket() const override { return _bucket; }
+ api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; }
private:
Stripe & _stripe;
document::Bucket _bucket;
+ api::StorageMessage::Id _uniqueMsgId;
+ api::LockingRequirements _lockReq;
};
FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&,
@@ -253,8 +264,8 @@ public:
uint32_t getNextStripeId(uint32_t disk);
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket, uint16_t disk) {
- return _diskInfo[disk].lock(bucket);
+ lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) {
+ return _diskInfo[disk].lock(bucket, lockReq);
}
void addMergeStatus(const document::Bucket&, MergeStatus::SP);
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index ba7f5979569..d0572e7dbf8 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -38,6 +38,9 @@ public:
void setMaxByteSize(uint32_t maxByteSize) { _maxByteSize = maxByteSize; }
uint32_t getMaxByteSize() const { return _maxByteSize; }
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Shared;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
private:
@@ -105,6 +108,9 @@ public:
spi::ReadConsistency getReadConsistency() const noexcept {
return _readConsistency;
}
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Shared;
+ }
std::unique_ptr<api::StorageReply> makeReply() override;
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index c2dcb8e2a29..888dc93dd82 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -122,9 +122,14 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
result.disk = getPreferredAvailableDisk(bucket);
while (true) {
+ // This function is only called in a context where we require exclusive
+ // locking (split/join). Refactor if this no longer the case.
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(
- _fileStorHandler.lock(bucket, result.disk));
+ _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive));
+ // TODO disks are no longer used in practice, can we safely discard this?
+ // Might need it for synchronization purposes if something has taken the
+ // disk lock _and_ the bucket lock...?
StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "join-lockAndGetDisk-1", flags));
if (entry.exist() && entry->disk != result.disk) {
diff --git a/storage/src/vespa/storage/visiting/stor-visitor.def b/storage/src/vespa/storage/visiting/stor-visitor.def
index 1e80f2993a5..6f16bcb60a2 100644
--- a/storage/src/vespa/storage/visiting/stor-visitor.def
+++ b/storage/src/vespa/storage/visiting/stor-visitor.def
@@ -24,6 +24,7 @@ defaultparalleliterators int default=8
## will be 16 requests to persistence layer, but only 8 will be able to execute
## at the same time, since only one operation can be executed at the same time
## for one bucket)
+## DEPRECATED: ignored by backend, 1 is always used.
iterators_per_bucket int default=1
## Default number of maximum client replies pending.
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index a8f31514eb1..b12a1eb6e7f 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -637,7 +637,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd)
_ignoreNonExistingVisitorTimeLimit
= config.ignorenonexistingvisitortimelimit;
_defaultParallelIterators = config.defaultparalleliterators;
- _iteratorsPerBucket = config.iteratorsPerBucket;
_defaultPendingMessages = config.defaultpendingmessages;
_defaultDocBlockSize = config.defaultdocblocksize;
_visitorMemoryUsageLimit = config.visitorMemoryUsageLimit;
@@ -647,12 +646,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd)
LOG(config, "Cannot use value of defaultParallelIterators < 1");
_defaultParallelIterators = 1;
}
- if (_iteratorsPerBucket < 1 && _iteratorsPerBucket > 10) {
- if (_iteratorsPerBucket < 1) _iteratorsPerBucket = 1;
- else _iteratorsPerBucket = 10;
- LOG(config, "Invalid value of iterators per bucket %u using %u",
- config.iteratorsPerBucket, _iteratorsPerBucket);
- }
if (_defaultPendingMessages < 1) {
LOG(config, "Cannot use value of defaultPendingMessages < 1");
_defaultPendingMessages = 1;
diff --git a/storageapi/src/vespa/storageapi/message/persistence.h b/storageapi/src/vespa/storageapi/message/persistence.h
index bda1bd0f038..59934154cf5 100644
--- a/storageapi/src/vespa/storageapi/message/persistence.h
+++ b/storageapi/src/vespa/storageapi/message/persistence.h
@@ -24,7 +24,7 @@ class TestAndSetCommand : public BucketInfoCommand {
TestAndSetCondition _condition;
public:
TestAndSetCommand(const MessageType & messageType, const document::Bucket &bucket);
- ~TestAndSetCommand();
+ ~TestAndSetCommand() override;
void setCondition(const TestAndSetCondition & condition) { _condition = condition; }
const TestAndSetCondition & getCondition() const { return _condition; }
@@ -49,7 +49,7 @@ class PutCommand : public TestAndSetCommand {
public:
PutCommand(const document::Bucket &bucket, const DocumentSP&, Timestamp);
- ~PutCommand();
+ ~PutCommand() override;
void setTimestamp(Timestamp ts) { _timestamp = ts; }
@@ -86,7 +86,7 @@ class PutReply : public BucketInfoReply {
public:
explicit PutReply(const PutCommand& cmd, bool wasFound = true);
- ~PutReply();
+ ~PutReply() override;
const document::DocumentId& getDocumentId() const { return _docId; }
bool hasDocument() const { return _document.get(); }
@@ -116,7 +116,7 @@ class UpdateCommand : public TestAndSetCommand {
public:
UpdateCommand(const document::Bucket &bucket,
const std::shared_ptr<document::DocumentUpdate>&, Timestamp);
- ~UpdateCommand();
+ ~UpdateCommand() override;
void setTimestamp(Timestamp ts) { _timestamp = ts; }
void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; }
@@ -147,7 +147,7 @@ class UpdateReply : public BucketInfoReply {
public:
UpdateReply(const UpdateCommand& cmd, Timestamp oldTimestamp = 0);
- ~UpdateReply();
+ ~UpdateReply() override;
void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; }
@@ -189,7 +189,7 @@ class GetCommand : public BucketInfoCommand {
public:
GetCommand(const document::Bucket &bucket, const document::DocumentId&,
const vespalib::stringref & fieldSet, Timestamp before = MAX_TIMESTAMP);
- ~GetCommand();
+ ~GetCommand() override;
void setBeforeTimestamp(Timestamp ts) { _beforeTimestamp = ts; }
const document::DocumentId& getDocumentId() const { return _docId; }
Timestamp getBeforeTimestamp() const { return _beforeTimestamp; }
@@ -199,6 +199,10 @@ public:
vespalib::string getSummary() const override;
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Shared;
+ }
+
DECLARE_STORAGECOMMAND(GetCommand, onGet)
};
@@ -219,7 +223,7 @@ public:
GetReply(const GetCommand& cmd,
const DocumentSP& doc = DocumentSP(),
Timestamp lastModified = 0);
- ~GetReply();
+ ~GetReply() override;
const DocumentSP& getDocument() const { return _doc; }
const document::DocumentId& getDocumentId() const { return _docId; }
@@ -245,7 +249,7 @@ class RemoveCommand : public TestAndSetCommand {
public:
RemoveCommand(const document::Bucket &bucket, const document::DocumentId& docId, Timestamp timestamp);
- ~RemoveCommand();
+ ~RemoveCommand() override;
void setTimestamp(Timestamp ts) { _timestamp = ts; }
const document::DocumentId& getDocumentId() const override { return _docId; }
@@ -267,7 +271,7 @@ class RemoveReply : public BucketInfoReply {
Timestamp _oldTimestamp;
public:
explicit RemoveReply(const RemoveCommand& cmd, Timestamp oldTimestamp = 0);
- ~RemoveReply();
+ ~RemoveReply() override;
const document::DocumentId& getDocumentId() const { return _docId; }
Timestamp getTimestamp() { return _timestamp; };
@@ -289,7 +293,7 @@ class RevertCommand : public BucketInfoCommand {
public:
RevertCommand(const document::Bucket &bucket,
const std::vector<Timestamp>& revertTokens);
- ~RevertCommand();
+ ~RevertCommand() override;
const std::vector<Timestamp>& getRevertTokens() const { return _tokens; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(RevertCommand, onRevert)
@@ -305,7 +309,7 @@ class RevertReply : public BucketInfoReply {
std::vector<Timestamp> _tokens;
public:
explicit RevertReply(const RevertCommand& cmd);
- ~RevertReply();
+ ~RevertReply() override;
const std::vector<Timestamp>& getRevertTokens() const { return _tokens; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGEREPLY(RevertReply, onRevertReply)
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
index f970091f695..380d846dd93 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -302,4 +302,19 @@ StorageMessage::getSummary() const {
return toString();
}
+const char* to_string(LockingRequirements req) noexcept {
+ switch (req) {
+ case LockingRequirements::Exclusive:
+ return "Exclusive";
+ case LockingRequirements::Shared:
+ return "Shared";
+ }
+ assert(false);
+}
+
+std::ostream& operator<<(std::ostream& os, LockingRequirements req) {
+ os << to_string(req);
+ return os;
+}
+
}
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index dadb68c644d..6c561f3af21 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -19,6 +19,7 @@
#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/util/printable.h>
#include <map>
+#include <iosfwd>
namespace vespalib {
class asciistream;
@@ -306,6 +307,20 @@ struct TransportContext {
virtual ~TransportContext() = 0;
};
+enum class LockingRequirements : uint8_t {
+ // Operations with exclusive locking can only be executed iff no other
+ // exclusive or shared locks are taken for its bucket.
+ Exclusive = 0,
+ // Operations with shared locking can only be executed iff no exclusive
+ // lock is taken for its bucket. Should only be used for read-only operations
+ // that cannot mutate a bucket's state.
+ Shared
+};
+
+const char* to_string(LockingRequirements req) noexcept;
+
+std::ostream& operator<<(std::ostream&, LockingRequirements);
+
class StorageMessage : public vespalib::Printable
{
friend class StorageMessageTest; // Used for testing only
@@ -421,6 +436,10 @@ public:
virtual document::Bucket getBucket() const { return getDummyBucket(); }
document::BucketId getBucketId() const { return getBucket().getBucketId(); }
virtual bool hasSingleBucketId() const { return false; }
+ virtual LockingRequirements lockingRequirements() const noexcept {
+ // Safe default: assume exclusive locking is required.
+ return LockingRequirements::Exclusive;
+ }
};
}