diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-25 14:43:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-25 14:43:14 +0200 |
commit | 1711c0cff4b3f43857bf054af7dcb63e13b8d2af (patch) | |
tree | bced3356fa5db157e42c94391123ba4ac0e16e64 | |
parent | 78338d5a782bc464d3a8912c12c6839669f1b91b (diff) | |
parent | 90676b68765027c1df6c4a3f22530a45118b4f3c (diff) |
Merge pull request #19719 from vespa-engine/balder/async-create-bucket-2
Balder/async create bucket 2
21 files changed, 256 insertions, 266 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 6e4f38fe564..74fef13f141 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -34,12 +34,12 @@ BucketContent::BucketContent() _inUse(false), _outdatedInfo(true), _active(false) -{ } +{} + BucketContent::~BucketContent() = default; uint32_t -BucketContent::computeEntryChecksum(const BucketEntry& e) const -{ +BucketContent::computeEntryChecksum(const BucketEntry &e) const { vespalib::crc_32_type checksummer; uint64_t ts(e.entry->getTimestamp()); @@ -49,8 +49,7 @@ BucketContent::computeEntryChecksum(const BucketEntry& e) const } BucketChecksum -BucketContent::updateRollingChecksum(uint32_t entryChecksum) -{ +BucketContent::updateRollingChecksum(uint32_t entryChecksum) { uint32_t checksum = _info.getChecksum(); checksum ^= entryChecksum; if (checksum == 0) { @@ -59,9 +58,8 @@ BucketContent::updateRollingChecksum(uint32_t entryChecksum) return BucketChecksum(checksum); } -const BucketInfo& -BucketContent::getBucketInfo() const -{ +const BucketInfo & +BucketContent::getBucketInfo() const { if (!_outdatedInfo) { return _info; } @@ -73,9 +71,9 @@ BucketContent::getBucketInfo() const uint32_t totalSize = 0; uint32_t checksum = 0; - for (const BucketEntry & bucketEntry : _entries) { - const DocEntry& entry(*bucketEntry.entry); - const GlobalId& gid(bucketEntry.gid); + for (const BucketEntry &bucketEntry: _entries) { + const DocEntry &entry(*bucketEntry.entry); + const GlobalId &gid(bucketEntry.gid); GidMapType::const_iterator gidIt(_gidMap.find(gid)); assert(gidIt != _gidMap.end()); @@ -114,17 +112,19 @@ BucketContent::getBucketInfo() const namespace { struct TimestampLess { - bool operator()(const BucketEntry &bucketEntry, Timestamp t) - { return bucketEntry.entry->getTimestamp() < t; } - bool operator()(Timestamp t, const BucketEntry &bucketEntry) - { return t < bucketEntry.entry->getTimestamp(); } + bool operator()(const BucketEntry &bucketEntry, Timestamp t) { + return bucketEntry.entry->getTimestamp() < t; + } + + bool operator()(Timestamp t, const BucketEntry &bucketEntry) { + return t < bucketEntry.entry->getTimestamp(); + } }; } // namespace bool -BucketContent::hasTimestamp(Timestamp t) const -{ +BucketContent::hasTimestamp(Timestamp t) const { if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) { return false; } @@ -148,10 +148,9 @@ BucketContent::hasTimestamp(Timestamp t) const */ void -BucketContent::insert(DocEntry::SP e) -{ +BucketContent::insert(DocEntry::SP e) { LOG(spam, "insert(%s)", e->toString().c_str()); - const DocumentId* docId(e->getDocumentId()); + const DocumentId *docId(e->getDocumentId()); assert(docId != 0); GlobalId gid(docId->getGlobalId()); GidMapType::iterator gidIt(_gidMap.find(gid)); @@ -160,22 +159,15 @@ BucketContent::insert(DocEntry::SP e) _entries.back().entry->getTimestamp() < e->getTimestamp()) { _entries.push_back(BucketEntry(e, gid)); } else { - std::vector<BucketEntry>::iterator it = - lower_bound(_entries.begin(), - _entries.end(), - e->getTimestamp(), - TimestampLess()); + auto it = lower_bound(_entries.begin(), _entries.end(), e->getTimestamp(), TimestampLess()); if (it != _entries.end()) { if (it->entry->getTimestamp() == e->getTimestamp()) { if (*it->entry.get() == *e) { - LOG(debug, "Ignoring duplicate put entry %s", - e->toString().c_str()); + LOG(debug, "Ignoring duplicate put entry %s", e->toString().c_str()); return; } else { - LOG(error, "Entry %s was already present." - "Was trying to insert %s.", - it->entry->toString().c_str(), - e->toString().c_str()); + LOG(error, "Entry %s was already present. Was trying to insert %s.", + it->entry->toString().c_str(), e->toString().c_str()); LOG_ABORT("should not reach here"); } } @@ -190,11 +182,8 @@ BucketContent::insert(DocEntry::SP e) // newer versions of a document etc. by XORing away old checksum. gidIt->second = e; } else { - LOG(spam, - "Newly inserted entry %s was older than existing entry %s; " - "not updating GID mapping", - e->toString().c_str(), - gidIt->second->toString().c_str()); + LOG(spam, "Newly inserted entry %s was older than existing entry %s; not updating GID mapping", + e->toString().c_str(), gidIt->second->toString().c_str()); } _outdatedInfo = true; } else { @@ -226,10 +215,8 @@ BucketContent::insert(DocEntry::SP e) _info.getActive()); } - LOG(spam, - "After cheap bucketinfo update, state is %s (inserted %s)", - _info.toString().c_str(), - e->toString().c_str()); + LOG(spam, "After cheap bucketinfo update, state is %s (inserted %s)", + _info.toString().c_str(), e->toString().c_str()); } } @@ -237,9 +224,8 @@ BucketContent::insert(DocEntry::SP e) } DocEntry::SP -BucketContent::getEntry(const DocumentId& did) const -{ - GidMapType::const_iterator it(_gidMap.find(did.getGlobalId())); +BucketContent::getEntry(const DocumentId &did) const { + auto it(_gidMap.find(did.getGlobalId())); if (it != _gidMap.end()) { return it->second; } @@ -247,10 +233,8 @@ BucketContent::getEntry(const DocumentId& did) const } DocEntry::SP -BucketContent::getEntry(Timestamp t) const -{ - std::vector<BucketEntry>::const_iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); +BucketContent::getEntry(Timestamp t) const { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); if (iter == _entries.end() || iter->entry->getTimestamp() != t) { return DocEntry::SP(); @@ -260,15 +244,12 @@ BucketContent::getEntry(Timestamp t) const } void -BucketContent::eraseEntry(Timestamp t) -{ - std::vector<BucketEntry>::iterator iter = - lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); +BucketContent::eraseEntry(Timestamp t) { + auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess()); if (iter != _entries.end() && iter->entry->getTimestamp() == t) { assert(iter->entry->getDocumentId() != 0); - GidMapType::iterator gidIt( - _gidMap.find(iter->entry->getDocumentId()->getGlobalId())); + GidMapType::iterator gidIt = _gidMap.find(iter->entry->getDocumentId()->getGlobalId()); assert(gidIt != _gidMap.end()); _entries.erase(iter); if (gidIt->second->getTimestamp() == t) { @@ -281,7 +262,7 @@ BucketContent::eraseEntry(Timestamp t) } } -DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo>& repo) +DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo> &repo) : _initialized(false), _repo(repo), _content(), @@ -294,13 +275,12 @@ DummyPersistence::DummyPersistence(const std::shared_ptr<const document::Documen DummyPersistence::~DummyPersistence() = default; document::select::Node::UP -DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf) -{ +DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) { document::select::Node::UP ret; try { document::select::Parser parser(*_repo, document::BucketIdFactory()); ret = parser.parse(documentSelection); - } catch (document::select::ParsingFailedException& e) { + } catch (document::select::ParsingFailedException &e) { return document::select::Node::UP(); } if (ret->isLeafNode() && !allowLeaf) { @@ -310,18 +290,17 @@ DummyPersistence::parseDocumentSelection(const string& documentSelection, bool a } Result -DummyPersistence::initialize() -{ +DummyPersistence::initialize() { assert(!_initialized); _initialized = true; return Result(); } #define DUMMYPERSISTENCE_VERIFY_INITIALIZED \ - if (!_initialized) throw vespalib::IllegalStateException( \ - "initialize() must always be called first in order to " \ - "trigger lazy initialization.", VESPA_STRLOC) - + if (!_initialized) { \ + LOG(error, "initialize() must always be called first in order to trigger lazy initialization."); \ + abort(); \ + } BucketIdListResult DummyPersistence::listBuckets(BucketSpace bucketSpace) const @@ -714,8 +693,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&) return Result(); } -Result -DummyPersistence::createBucket(const Bucket& b, Context&) +void +DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "createBucket(%s)", b.toString().c_str()); @@ -727,11 +706,11 @@ DummyPersistence::createBucket(const Bucket& b, Context&) assert(!_content[b]->_inUse); LOG(debug, "%s already existed", b.toString().c_str()); } - return Result(); + onComplete->onComplete(std::make_unique<Result>()); } void -DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "deleteBucket(%s)", b.toString().c_str()); diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 99d6ba717b7..a25bf6b8a8e 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -168,8 +168,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket&, Context&) override; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index e287bdc5252..3b59f20ca96 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider { public: Result initialize() override { return Result(); }; - Result createBucket(const Bucket&, Context&) override { return Result(); } Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); } diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h index 02c626ea23e..7b04498205d 100644 --- a/persistence/src/vespa/persistence/spi/catchresult.h +++ b/persistence/src/vespa/persistence/spi/catchresult.h @@ -19,4 +19,9 @@ private: const ResultHandler *_resulthandler; }; +class NoopOperationComplete : public OperationComplete { + void onComplete(std::unique_ptr<spi::Result>) noexcept override { } + void addResultHandler(const spi::ResultHandler *) override { } +}; + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 3ea476c33fc..31db08a6f4f 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat } Result +PersistenceProvider::createBucket(const Bucket& bucket, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + createBucketAsync(bucket, context, std::move(catcher)); + return *future.get(); +} + +Result PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) { auto catcher = std::make_unique<CatchResult>(); auto future = catcher->future_result(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 83eb042d855..269175f7d26 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -58,6 +58,7 @@ struct PersistenceProvider virtual ~PersistenceProvider(); // TODO Move to utility class for use in tests only + Result createBucket(const Bucket&, Context&); Result deleteBucket(const Bucket&, Context&); Result put(const Bucket&, Timestamp, DocumentSP, Context&); Result setActiveState(const Bucket&, BucketInfo::ActiveState); @@ -336,14 +337,14 @@ struct PersistenceProvider * Tells the provider that the given bucket has been created in the * service layer. There is no requirement to do anything here. */ - virtual Result createBucket(const Bucket&, Context&) = 0; + virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * Deletes the given bucket and all entries contained in that bucket. * After this operation has succeeded, a restart of the provider should * not yield the bucket in getBucketList(). */ - virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; + virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0; /** * This function is called continuously by the service layer. It allows the diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 114292d055d..2e1fc74037c 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -548,24 +548,28 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) } -Result -PersistenceEngine::createBucket(const Bucket &b, Context &) +void +PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "createBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleCreateBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleCreateBucket(feedtoken::make(transportContext), b); + } else { + handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } void -PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) +PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept { ReadGuard rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 94331ac2cd6..fe564d01459 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -114,8 +114,8 @@ public: IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override; Result destroyIterator(IteratorId, Context&) override; - Result createBucket(const Bucket &bucketId, Context &) override ; - void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; + void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) noexcept override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index b3bd1c6a253..02b43a32df3 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -24,7 +24,7 @@ #define CHECK_ERROR_ASYNC(className, failType, onError) \ { \ - Guard guard(_lock); \ + Guard guard(_lock); \ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \ return; \ @@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const return _spi.listBuckets(bucketSpace); } -spi::Result -PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { LOG_SPI("createBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET); - return _spi.createBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete); + return _spi.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketInfoResult @@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, void PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, - spi::OperationComplete::UP operationComplete) + spi::OperationComplete::UP operationComplete) noexcept { LOG_SPI("deleteBucket(" << bucket << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index c6628814dba..cfc7002a643 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -96,7 +96,7 @@ public: void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, spi::OperationComplete::UP up) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; @@ -111,7 +111,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 07d2b24d536..a3f0182ba30 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -62,13 +62,13 @@ public: return PersistenceProviderWrapper::getBucketInfo(bucket); } - spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_createBucketInvocations; - return PersistenceProviderWrapper::createBucket(bucket, ctx); + PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete)); } void - deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { + deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { ++_deleteBucketInvocations; PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 017b8ce2b92..75e85fb4b6f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -872,7 +872,6 @@ TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, @@ -903,7 +902,6 @@ TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { - { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 47b5e4f5f27..bc6e67578c0 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -5,6 +5,7 @@ #include "testandsethelper.h" #include "bucketownershipnotifier.h" #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> @@ -154,6 +155,31 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.createBuckets); + LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); + } + spi::Bucket bucket(cmd.getBucket()); + auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + tracker->sendReply(); + }); + + if (cmd.getActive()) { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task))); + } else { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task))); + } + + return tracker; +} + +MessageTracker::UP AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.deleteBuckets); diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 4f5c242570c..db5a77bfb59 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -30,6 +30,7 @@ public: MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 77e7762ec9a..4cd0b181155 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -6,13 +6,12 @@ #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> #include <algorithm> -#include <future> #include <vespa/log/log.h> LOG_SETUP(".persistence.mergehandler"); @@ -51,20 +50,6 @@ constexpr int getDeleteFlag() { return 2; } -/** - * Throws std::runtime_error if result has an error. - */ -void -checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op) -{ - if (result.hasError()) { - vespalib::asciistream ss; - ss << "Failed " << op << " in " << bucket << ": " << result.toString(); - throw std::runtime_error(ss.str()); - } -} - - class IteratorGuard { spi::PersistenceProvider& _spi; spi::IteratorId _iteratorId; @@ -663,25 +648,28 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const } namespace { - void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, - uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) - { - for (const auto& entry : status.diff) { - uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); - if ((entry_has_mask == 0u) || - (constrictHasMask && (entry_has_mask != hasMask))) { - continue; - } - cmd.getDiff().emplace_back(entry); - if (constrictHasMask) { - cmd.getDiff().back()._entry._hasMask = newHasMask; - } else { - cmd.getDiff().back()._entry._hasMask = entry_has_mask; - } + +void +findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, + uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) +{ + for (const auto& entry : status.diff) { + uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); + if ((entry_has_mask == 0u) || + (constrictHasMask && (entry_has_mask != hasMask))) { + continue; + } + cmd.getDiff().emplace_back(entry); + if (constrictHasMask) { + cmd.getDiff().back()._entry._hasMask = newHasMask; + } else { + cmd.getDiff().back()._entry._hasMask = entry_has_mask; } } } +} + api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, spi::Context& context, @@ -898,7 +886,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker->fail(api::ReturnCode::BUSY, err); return tracker; } - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); @@ -938,141 +927,136 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP namespace { - uint8_t findOwnIndex( - const std::vector<api::MergeBucketCommand::Node>& nodeList, - uint16_t us) - { - for (uint32_t i=0, n=nodeList.size(); i<n; ++i) { - if (nodeList[i].index == us) return i; - } - throw vespalib::IllegalStateException( - "Got GetBucketDiff cmd on node not in nodelist in command", - VESPA_STRLOC); +uint8_t findOwnIndex( + const std::vector<api::MergeBucketCommand::Node>& nodeList, + uint16_t us) +{ + for (uint32_t i=0, n=nodeList.size(); i<n; ++i) { + if (nodeList[i].index == us) return i; } + throw vespalib::IllegalStateException( + "Got GetBucketDiff cmd on node not in nodelist in command", + VESPA_STRLOC); +} - struct DiffEntryTimestampOrder - : public std::binary_function<api::GetBucketDiffCommand::Entry, - api::GetBucketDiffCommand::Entry, bool> - { - bool operator()(const api::GetBucketDiffCommand::Entry& x, - const api::GetBucketDiffCommand::Entry& y) const { - return (x._timestamp < y._timestamp); - } - }; - - /** - * Merges list A and list B together and puts the result in result. - * Result is swapped in as last step to keep function exception safe. Thus - * result can be listA or listB if wanted. - * - * listA and listB are assumed to be in the order found in the slotfile, or - * in the order given by a previous call to this function. (In both cases - * this will be sorted by timestamp) - * - * @return false if any suspect entries was found. - */ - bool mergeLists( - const std::vector<api::GetBucketDiffCommand::Entry>& listA, - const std::vector<api::GetBucketDiffCommand::Entry>& listB, - std::vector<api::GetBucketDiffCommand::Entry>& finalResult) - { - bool suspect = false; - std::vector<api::GetBucketDiffCommand::Entry> result; - uint32_t i = 0, j = 0; - while (i < listA.size() && j < listB.size()) { - const api::GetBucketDiffCommand::Entry& a(listA[i]); - const api::GetBucketDiffCommand::Entry& b(listB[j]); - if (a._timestamp < b._timestamp) { - result.push_back(a); - ++i; - } else if (a._timestamp > b._timestamp) { - result.push_back(b); - ++j; - } else { - // If we find equal timestamped entries that are not the - // same.. Flag an error. But there is nothing we can do - // about it. Note it as if it is the same entry so we - // dont try to merge them. - if (!(a == b)) { - if (a._gid == b._gid && a._flags == b._flags) { - if ((a._flags & getDeleteFlag()) != 0 && - (b._flags & getDeleteFlag()) != 0) - { - // Unfortunately this can happen, for instance - // if a remove comes to a bucket out of sync - // and reuses different headers in the two - // versions. - LOG(debug, "Found entries with equal timestamps of " - "the same gid who both are remove " - "entries: %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } else { - LOG(error, "Found entries with equal timestamps of " - "the same gid. This is likely same " - "document where size of document varies:" - " %s <-> %s.", - a.toString(true).c_str(), - b.toString(true).c_str()); - } - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; - } else if ((a._flags & getDeleteFlag()) - != (b._flags & getDeleteFlag())) +/** + * Merges list A and list B together and puts the result in result. + * Result is swapped in as last step to keep function exception safe. Thus + * result can be listA or listB if wanted. + * + * listA and listB are assumed to be in the order found in the slotfile, or + * in the order given by a previous call to this function. (In both cases + * this will be sorted by timestamp) + * + * @return false if any suspect entries was found. + */ +bool mergeLists( + const std::vector<api::GetBucketDiffCommand::Entry>& listA, + const std::vector<api::GetBucketDiffCommand::Entry>& listB, + std::vector<api::GetBucketDiffCommand::Entry>& finalResult) +{ + bool suspect = false; + std::vector<api::GetBucketDiffCommand::Entry> result; + uint32_t i = 0, j = 0; + while (i < listA.size() && j < listB.size()) { + const api::GetBucketDiffCommand::Entry& a(listA[i]); + const api::GetBucketDiffCommand::Entry& b(listB[j]); + if (a._timestamp < b._timestamp) { + result.push_back(a); + ++i; + } else if (a._timestamp > b._timestamp) { + result.push_back(b); + ++j; + } else { + // If we find equal timestamped entries that are not the + // same.. Flag an error. But there is nothing we can do + // about it. Note it as if it is the same entry so we + // dont try to merge them. + if (!(a == b)) { + if (a._gid == b._gid && a._flags == b._flags) { + if ((a._flags & getDeleteFlag()) != 0 && + (b._flags & getDeleteFlag()) != 0) { - // If we find one remove and one put entry on the - // same timestamp we are going to keep the remove - // entry to make the copies consistent. - const api::GetBucketDiffCommand::Entry& deletedEntry( - (a._flags & getDeleteFlag()) != 0 ? a : b); - result.push_back(deletedEntry); - LOG(debug, - "Found put and remove on same timestamp. Keeping" - "remove as it is likely caused by remove with " - "copies unavailable at the time: %s, %s.", - a.toString().c_str(), b.toString().c_str()); + // Unfortunately this can happen, for instance + // if a remove comes to a bucket out of sync + // and reuses different headers in the two + // versions. + LOG(debug, "Found entries with equal timestamps of " + "the same gid who both are remove " + "entries: %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } else { - LOG(error, "Found entries with equal timestamps that " - "weren't the same entry: %s, %s.", - a.toString().c_str(), b.toString().c_str()); - result.push_back(a); - result.back()._hasMask |= b._hasMask; - suspect = true; + LOG(error, "Found entries with equal timestamps of " + "the same gid. This is likely same " + "document where size of document varies:" + " %s <-> %s.", + a.toString(true).c_str(), + b.toString(true).c_str()); } + result.push_back(a); + result.back()._hasMask |= b._hasMask; + suspect = true; + } else if ((a._flags & getDeleteFlag()) + != (b._flags & getDeleteFlag())) + { + // If we find one remove and one put entry on the + // same timestamp we are going to keep the remove + // entry to make the copies consistent. + const api::GetBucketDiffCommand::Entry& deletedEntry( + (a._flags & getDeleteFlag()) != 0 ? a : b); + result.push_back(deletedEntry); + LOG(debug, + "Found put and remove on same timestamp. Keeping" + "remove as it is likely caused by remove with " + "copies unavailable at the time: %s, %s.", + a.toString().c_str(), b.toString().c_str()); } else { + LOG(error, "Found entries with equal timestamps that " + "weren't the same entry: %s, %s.", + a.toString().c_str(), b.toString().c_str()); result.push_back(a); result.back()._hasMask |= b._hasMask; + suspect = true; } - ++i; - ++j; + } else { + result.push_back(a); + result.back()._hasMask |= b._hasMask; } + ++i; + ++j; } - if (i < listA.size()) { - assert(j >= listB.size()); - for (uint32_t n = listA.size(); i<n; ++i) { - result.push_back(listA[i]); - } - } else if (j < listB.size()) { - assert(i >= listA.size()); - for (uint32_t n = listB.size(); j<n; ++j) { - result.push_back(listB[j]); - } + } + if (i < listA.size()) { + assert(j >= listB.size()); + for (uint32_t n = listA.size(); i<n; ++i) { + result.push_back(listA[i]); + } + } else if (j < listB.size()) { + assert(i >= listA.size()); + for (uint32_t n = listB.size(); j<n; ++j) { + result.push_back(listB[j]); } - result.swap(finalResult); - return !suspect; } + result.swap(finalResult); + return !suspect; +} } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const -{ +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>()); + return handleGetBucketDiffStage2(cmd, std::move(tracker)); +} +MessageTracker::UP +MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const +{ + spi::Bucket bucket(cmd.getBucket()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 17cfb847d2c..e6b4d047209 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -86,6 +86,7 @@ private: const uint32_t _commonMergeChainOptimalizationMinimumSize; std::atomic<bool> _async_apply_bucket_diff; + MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index aa1a9c136fd..761021a9612 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::REVERT_ID: return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ce424f0ce83..9ccd901744b 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& return checkResult(_impl.destroyIterator(iteratorId, context)); } -spi::Result -ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { - return checkResult(_impl.createBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } void -ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) +ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept { onComplete->addResultHandler(this); - _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); + _impl.createBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index c9d2411e372..14d20cf8a52 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -49,7 +49,6 @@ public: spi::Context &context) override; spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result createBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -63,7 +62,8 @@ public: void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; + void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override; private: template <typename ResultType> diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4fe207e2e5..9a7a451b906 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t } MessageTracker::UP -SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.createBuckets); - LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); - } - spi::Bucket spiBucket(cmd.getBucket()); - _spi.createBucket(spiBucket, tracker->context()); - if (cmd.getActive()) { - _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); - } - return tracker; -} - -MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.visit); diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 2cfbc7016c0..009fd6dff52 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -22,7 +22,6 @@ public: SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&); MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; |