diff options
4 files changed, 50 insertions, 74 deletions
diff --git a/persistence/src/vespa/persistence/spi/bucket.h b/persistence/src/vespa/persistence/spi/bucket.h index 4e234e4dc49..dec185b6082 100644 --- a/persistence/src/vespa/persistence/spi/bucket.h +++ b/persistence/src/vespa/persistence/spi/bucket.h @@ -31,6 +31,7 @@ public: const document::Bucket &getBucket() const { return _bucket; } document::BucketId getBucketId() const { return _bucket.getBucketId(); } + document::BucketSpace getBucketSpace() const { return _bucket.getBucketSpace(); } PartitionId getPartition() const { return _partition; } /** Convert easily to a document bucket id to make class easy to use. */ diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index b5a4872604f..72f2ea8c34d 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -171,39 +171,22 @@ PersistenceEngine::HandlerSnapshot::UP PersistenceEngine::getHandlerSnapshot() const { LockGuard guard(_lock); - return std::make_unique<HandlerSnapshot>(_handlers.snapshot(), _handlers.size()); + return _handlers.getHandlerSnapshot(); } -namespace { -template <typename T> -class SequenceOfOne : public Sequence<T> { - bool _done; - T _value; -public: - SequenceOfOne(const T &value) : _done(false), _value(value) {} - - virtual bool valid() const override { return !_done; } - virtual T get() const override { return _value; } - virtual void next() override { _done = true; } -}; - -template <typename T> -typename Sequence<T>::UP make_sequence(const T &value) { - return typename Sequence<T>::UP(new SequenceOfOne<T>(value)); +PersistenceEngine::HandlerSnapshot::UP +PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace) const +{ + LockGuard guard(_lock); + return _handlers.getHandlerSnapshot(bucketSpace); } -} // namespace PersistenceEngine::HandlerSnapshot::UP -PersistenceEngine::getHandlerSnapshot(const DocumentId &id) const { - if (!id.hasDocType()) { - return getHandlerSnapshot(); - } - IPersistenceHandler::SP handler = getHandler(DocTypeName(id.getDocType())); - if (!handler.get()) { - return HandlerSnapshot::UP(); - } - return HandlerSnapshot::UP( - new HandlerSnapshot(make_sequence(handler.get()), 1)); +PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace, + const DocumentId &id) const +{ + LockGuard guard(_lock); + return _handlers.getHandlerSnapshot(bucketSpace, id); } PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, @@ -237,17 +220,17 @@ PersistenceEngine::putHandler(document::BucketSpace bucketSpace, const DocTypeName &docType, const IPersistenceHandler::SP &handler) { - (void) bucketSpace; LockGuard guard(_lock); - return _handlers.putHandler(docType, handler); + return _handlers.putHandler(bucketSpace, docType, handler); } IPersistenceHandler::SP -PersistenceEngine::getHandler(const DocTypeName &docType) const +PersistenceEngine::getHandler(document::BucketSpace bucketSpace, + const DocTypeName &docType) const { LockGuard guard(_lock); - return _handlers.getHandler(docType); + return _handlers.getHandler(bucketSpace, docType); } @@ -255,10 +238,9 @@ IPersistenceHandler::SP PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, const DocTypeName &docType) { - (void) bucketSpace; // TODO: Grab bucket list and treat them as modified LockGuard guard(_lock); - return _handlers.removeHandler(docType); + return _handlers.removeHandler(bucketSpace, docType); } @@ -286,7 +268,7 @@ PersistenceEngine::getPartitionStates() const BucketIdListResult -PersistenceEngine::listBuckets(BucketSpace, PartitionId id) const +PersistenceEngine::listBuckets(BucketSpace bucketSpace, PartitionId id) const { // Runs in SPI thread. // No handover to write threads in persistence handlers. @@ -295,7 +277,7 @@ PersistenceEngine::listBuckets(BucketSpace, PartitionId id) const BucketIdListResult::List emptyList; return BucketIdListResult(emptyList); } - HandlerSnapshot::UP snap = getHandlerSnapshot(); + HandlerSnapshot::UP snap = getHandlerSnapshot(bucketSpace); BucketIdListResultHandler resultHandler; for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -327,7 +309,7 @@ PersistenceEngine::setActiveState(const Bucket& bucket, storage::spi::BucketInfo::ActiveState newState) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); - HandlerSnapshot::UP snap = getHandlerSnapshot(); + HandlerSnapshot::UP snap = getHandlerSnapshot(bucket.getBucketSpace()); GenericResultHandler resultHandler(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -344,7 +326,7 @@ PersistenceEngine::getBucketInfo(const Bucket& b) const // Runs in SPI thread. // No handover to write threads in persistence handlers. std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); - HandlerSnapshot::UP snap = getHandlerSnapshot(); + HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace()); BucketInfoResultHandler resultHandler; for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -378,7 +360,7 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S "Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())); } - IPersistenceHandler::SP handler = getHandler(docType); + IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType); if (handler.get() == NULL) { return Result(Result::PERMANENT_ERROR, make_string("No handler for document type '%s'", @@ -401,7 +383,7 @@ PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, C b.toString().c_str(), static_cast<uint64_t>(t.getValue()), did.toString().c_str()); - HandlerSnapshot::UP snap = getHandlerSnapshot(did); + HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace(), did); if (!snap.get()) { return RemoveResult(false); } @@ -436,7 +418,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP docType.toString().c_str(), upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false")); - IPersistenceHandler::SP handler = getHandler(docType); + IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType); TransportLatch latch(1); if (handler.get() != NULL) { FeedToken token(latch, mbus::Reply::UP(new documentapi::UpdateDocumentReply())); @@ -457,7 +439,7 @@ PersistenceEngine::get(const Bucket& b, Context& context) const { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); - HandlerSnapshot::UP snapshot = getHandlerSnapshot(); + HandlerSnapshot::UP snapshot = getHandlerSnapshot(b.getBucketSpace()); for (PersistenceHandlerSequence & handlers = snapshot->handlers(); handlers.valid(); handlers.next()) { BucketGuard::UP bucket_guard = handlers.get()->lockBucket(b); @@ -490,7 +472,7 @@ PersistenceEngine::createIterator(const Bucket &bucket, Context & context) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); - HandlerSnapshot::UP snapshot = getHandlerSnapshot(); + HandlerSnapshot::UP snapshot = getHandlerSnapshot(bucket.getBucketSpace()); auto entry = std::make_unique<IteratorEntry>(context.getReadConsistency(), bucket, fields, selection, versions, _defaultSerializedSize, _ignoreMaxBytes); @@ -566,7 +548,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); LOG(spam, "createBucket(%s)", b.toString().c_str()); - HandlerSnapshot::UP snap = getHandlerSnapshot(); + HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace()); TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -583,7 +565,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); - HandlerSnapshot::UP snap = getHandlerSnapshot(); + HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace()); TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -596,7 +578,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&) BucketIdListResult -PersistenceEngine::getModifiedBuckets(BucketSpace) const +PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); typedef BucketIdListResultV MBV; @@ -605,7 +587,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace) const LockGuard guard(_lock); extraModifiedBuckets.swap(_extraModifiedBuckets); } - HandlerSnapshot::UP snap = getHandlerSnapshot(); + HandlerSnapshot::UP snap = getHandlerSnapshot(bucketSpace); SynchronizedBucketIdListResultHandler resultHandler(snap->size() + extraModifiedBuckets.size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -624,7 +606,9 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); LOG(spam, "split(%s, %s, %s)", source.toString().c_str(), target1.toString().c_str(), target2.toString().c_str()); - HandlerSnapshot::UP snap = getHandlerSnapshot(); + assert(source.getBucketSpace() == target1.getBucketSpace()); + assert(source.getBucketSpace() == target2.getBucketSpace()); + HandlerSnapshot::UP snap = getHandlerSnapshot(source.getBucketSpace()); TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -641,7 +625,9 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); LOG(spam, "join(%s, %s, %s)", source1.toString().c_str(), source2.toString().c_str(), target.toString().c_str()); - HandlerSnapshot::UP snap = getHandlerSnapshot(); + assert(source1.getBucketSpace() == target.getBucketSpace()); + assert(source2.getBucketSpace() == target.getBucketSpace()); + HandlerSnapshot::UP snap = getHandlerSnapshot(target.getBucketSpace()); TransportLatch latch(snap->size()); for (; snap->handlers().valid(); snap->handlers().next()) { IPersistenceHandler *handler = snap->handlers().get(); @@ -746,9 +732,10 @@ public: }; void -PersistenceEngine::populateInitialBucketDB(IPersistenceHandler &targetHandler) +PersistenceEngine::populateInitialBucketDB(BucketSpace bucketSpace, + IPersistenceHandler &targetHandler) { - HandlerSnapshot::UP snap = getHandlerSnapshot(); + HandlerSnapshot::UP snap = getHandlerSnapshot(bucketSpace); size_t snapSize(snap->size()); size_t flawed = 0; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 758b39f4417..d9a27f9e460 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -3,6 +3,7 @@ #include "document_iterator.h" #include "i_resource_write_filter.h" +#include "persistence_handler_map.h" #include <vespa/document/bucket/bucketspace.h> #include <vespa/persistence/spi/abstractpersistenceprovider.h> #include <vespa/searchcore/proton/common/handlermap.hpp> @@ -17,7 +18,8 @@ class IPersistenceEngineOwner; class PersistenceEngine : public storage::spi::AbstractPersistenceProvider { private: - typedef vespalib::Sequence<IPersistenceHandler *> PersistenceHandlerSequence; + using PersistenceHandlerSequence = vespalib::Sequence<IPersistenceHandler *>; + using HandlerSnapshot = PersistenceHandlerMap::HandlerSnapshot; using DocumentUpdate = document::DocumentUpdate; using Bucket = storage::spi::Bucket; using BucketIdListResult = storage::spi::BucketIdListResult; @@ -39,23 +41,6 @@ private: using Timestamp = storage::spi::Timestamp; using TimestampList = storage::spi::TimestampList; using UpdateResult = storage::spi::UpdateResult; - class HandlerSnapshot { - private: - PersistenceHandlerSequence::UP _handlers; - size_t _size; - public: - typedef std::unique_ptr<HandlerSnapshot> UP; - HandlerSnapshot(PersistenceHandlerSequence::UP handlers_, size_t size_) : - _handlers(handlers_.release()), - _size(size_) - {} - HandlerSnapshot(const HandlerSnapshot &) = delete; - HandlerSnapshot & operator = (const HandlerSnapshot &) = delete; - - size_t size() const { return _size; } - PersistenceHandlerSequence &handlers() { return *_handlers; } - static PersistenceHandlerSequence::UP release(HandlerSnapshot && rhs) { return std::move(rhs._handlers); } - }; struct IteratorEntry { PersistenceHandlerSequence::UP handler_sequence; @@ -79,7 +64,7 @@ private: const ssize_t _defaultSerializedSize; const bool _ignoreMaxBytes; - mutable HandlerMap<IPersistenceHandler> _handlers; + mutable PersistenceHandlerMap _handlers; vespalib::Lock _lock; Iterators _iterators; vespalib::Lock _iterators_lock; @@ -89,8 +74,12 @@ private: mutable BucketIdListResultV _extraModifiedBuckets; mutable std::shared_timed_mutex _rwMutex; + IPersistenceHandler::SP getHandler(document::BucketSpace bucketSpace, + const DocTypeName &docType) const; HandlerSnapshot::UP getHandlerSnapshot() const; - HandlerSnapshot::UP getHandlerSnapshot(const document::DocumentId &) const; + HandlerSnapshot::UP getHandlerSnapshot(document::BucketSpace bucketSpace) const; + HandlerSnapshot::UP getHandlerSnapshot(document::BucketSpace bucketSpace, + const document::DocumentId &docId) const; void saveClusterState(const ClusterState &calc); ClusterState::SP savedClusterState() const; @@ -106,7 +95,6 @@ public: IPersistenceHandler::SP putHandler(document::BucketSpace bucketSpace, const DocTypeName &docType, const IPersistenceHandler::SP &handler); - IPersistenceHandler::SP getHandler(const DocTypeName &docType) const; IPersistenceHandler::SP removeHandler(document::BucketSpace bucketSpace, const DocTypeName &docType); @@ -137,7 +125,7 @@ public: void destroyIterators(); void propagateSavedClusterState(IPersistenceHandler &handler); void grabExtraModifiedBuckets(IPersistenceHandler &handler); - void populateInitialBucketDB(IPersistenceHandler &targetHandler); + void populateInitialBucketDB(BucketSpace bucketSpace, IPersistenceHandler &targetHandler); std::unique_lock<std::shared_timed_mutex> getWLock() const; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 82db32adb7b..34cfaaeddb2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -585,7 +585,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, if (!_isInitializing) { _persistenceEngine-> propagateSavedClusterState(*persistenceHandler); - _persistenceEngine->populateInitialBucketDB(*persistenceHandler); + _persistenceEngine->populateInitialBucketDB(bucketSpace, *persistenceHandler); } // TODO: Fix race with new cluster state setting. _persistenceEngine->putHandler(bucketSpace, docTypeName, persistenceHandler); |