aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@oath.com>2017-10-06 09:03:26 +0000
committerGeir Storli <geirst@oath.com>2017-10-06 10:52:20 +0000
commitbb999bf3807c6f46c22ad5725ba7e3f590489744 (patch)
tree95cef337f0ddbf285c689c9ceea8141369d7960a
parent9e832580cf0f8d6191d28d5df50e8a5fffa9eb95 (diff)
Integrate PersistenceHandlerMap in PersistenceEngine.
-rw-r--r--persistence/src/vespa/persistence/spi/bucket.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp89
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp2
4 files changed, 50 insertions, 74 deletions
diff --git a/persistence/src/vespa/persistence/spi/bucket.h b/persistence/src/vespa/persistence/spi/bucket.h
index 4e234e4dc49..dec185b6082 100644
--- a/persistence/src/vespa/persistence/spi/bucket.h
+++ b/persistence/src/vespa/persistence/spi/bucket.h
@@ -31,6 +31,7 @@ public:
const document::Bucket &getBucket() const { return _bucket; }
document::BucketId getBucketId() const { return _bucket.getBucketId(); }
+ document::BucketSpace getBucketSpace() const { return _bucket.getBucketSpace(); }
PartitionId getPartition() const { return _partition; }
/** Convert easily to a document bucket id to make class easy to use. */
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index b5a4872604f..72f2ea8c34d 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -171,39 +171,22 @@ PersistenceEngine::HandlerSnapshot::UP
PersistenceEngine::getHandlerSnapshot() const
{
LockGuard guard(_lock);
- return std::make_unique<HandlerSnapshot>(_handlers.snapshot(), _handlers.size());
+ return _handlers.getHandlerSnapshot();
}
-namespace {
-template <typename T>
-class SequenceOfOne : public Sequence<T> {
- bool _done;
- T _value;
-public:
- SequenceOfOne(const T &value) : _done(false), _value(value) {}
-
- virtual bool valid() const override { return !_done; }
- virtual T get() const override { return _value; }
- virtual void next() override { _done = true; }
-};
-
-template <typename T>
-typename Sequence<T>::UP make_sequence(const T &value) {
- return typename Sequence<T>::UP(new SequenceOfOne<T>(value));
+PersistenceEngine::HandlerSnapshot::UP
+PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace) const
+{
+ LockGuard guard(_lock);
+ return _handlers.getHandlerSnapshot(bucketSpace);
}
-} // namespace
PersistenceEngine::HandlerSnapshot::UP
-PersistenceEngine::getHandlerSnapshot(const DocumentId &id) const {
- if (!id.hasDocType()) {
- return getHandlerSnapshot();
- }
- IPersistenceHandler::SP handler = getHandler(DocTypeName(id.getDocType()));
- if (!handler.get()) {
- return HandlerSnapshot::UP();
- }
- return HandlerSnapshot::UP(
- new HandlerSnapshot(make_sequence(handler.get()), 1));
+PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace,
+ const DocumentId &id) const
+{
+ LockGuard guard(_lock);
+ return _handlers.getHandlerSnapshot(bucketSpace, id);
}
PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner,
@@ -237,17 +220,17 @@ PersistenceEngine::putHandler(document::BucketSpace bucketSpace,
const DocTypeName &docType,
const IPersistenceHandler::SP &handler)
{
- (void) bucketSpace;
LockGuard guard(_lock);
- return _handlers.putHandler(docType, handler);
+ return _handlers.putHandler(bucketSpace, docType, handler);
}
IPersistenceHandler::SP
-PersistenceEngine::getHandler(const DocTypeName &docType) const
+PersistenceEngine::getHandler(document::BucketSpace bucketSpace,
+ const DocTypeName &docType) const
{
LockGuard guard(_lock);
- return _handlers.getHandler(docType);
+ return _handlers.getHandler(bucketSpace, docType);
}
@@ -255,10 +238,9 @@ IPersistenceHandler::SP
PersistenceEngine::removeHandler(document::BucketSpace bucketSpace,
const DocTypeName &docType)
{
- (void) bucketSpace;
// TODO: Grab bucket list and treat them as modified
LockGuard guard(_lock);
- return _handlers.removeHandler(docType);
+ return _handlers.removeHandler(bucketSpace, docType);
}
@@ -286,7 +268,7 @@ PersistenceEngine::getPartitionStates() const
BucketIdListResult
-PersistenceEngine::listBuckets(BucketSpace, PartitionId id) const
+PersistenceEngine::listBuckets(BucketSpace bucketSpace, PartitionId id) const
{
// Runs in SPI thread.
// No handover to write threads in persistence handlers.
@@ -295,7 +277,7 @@ PersistenceEngine::listBuckets(BucketSpace, PartitionId id) const
BucketIdListResult::List emptyList;
return BucketIdListResult(emptyList);
}
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ HandlerSnapshot::UP snap = getHandlerSnapshot(bucketSpace);
BucketIdListResultHandler resultHandler;
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -327,7 +309,7 @@ PersistenceEngine::setActiveState(const Bucket& bucket,
storage::spi::BucketInfo::ActiveState newState)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ HandlerSnapshot::UP snap = getHandlerSnapshot(bucket.getBucketSpace());
GenericResultHandler resultHandler(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -344,7 +326,7 @@ PersistenceEngine::getBucketInfo(const Bucket& b) const
// Runs in SPI thread.
// No handover to write threads in persistence handlers.
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace());
BucketInfoResultHandler resultHandler;
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -378,7 +360,7 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S
"Old id scheme not supported in elastic mode (%s)",
doc->getId().toString().c_str()));
}
- IPersistenceHandler::SP handler = getHandler(docType);
+ IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType);
if (handler.get() == NULL) {
return Result(Result::PERMANENT_ERROR,
make_string("No handler for document type '%s'",
@@ -401,7 +383,7 @@ PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, C
b.toString().c_str(),
static_cast<uint64_t>(t.getValue()),
did.toString().c_str());
- HandlerSnapshot::UP snap = getHandlerSnapshot(did);
+ HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace(), did);
if (!snap.get()) {
return RemoveResult(false);
}
@@ -436,7 +418,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
docType.toString().c_str(),
upd->getId().toString().c_str(),
(upd->getCreateIfNonExistent() ? "true" : "false"));
- IPersistenceHandler::SP handler = getHandler(docType);
+ IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType);
TransportLatch latch(1);
if (handler.get() != NULL) {
FeedToken token(latch, mbus::Reply::UP(new documentapi::UpdateDocumentReply()));
@@ -457,7 +439,7 @@ PersistenceEngine::get(const Bucket& b,
Context& context) const
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
- HandlerSnapshot::UP snapshot = getHandlerSnapshot();
+ HandlerSnapshot::UP snapshot = getHandlerSnapshot(b.getBucketSpace());
for (PersistenceHandlerSequence & handlers = snapshot->handlers(); handlers.valid(); handlers.next()) {
BucketGuard::UP bucket_guard = handlers.get()->lockBucket(b);
@@ -490,7 +472,7 @@ PersistenceEngine::createIterator(const Bucket &bucket,
Context & context)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
- HandlerSnapshot::UP snapshot = getHandlerSnapshot();
+ HandlerSnapshot::UP snapshot = getHandlerSnapshot(bucket.getBucketSpace());
auto entry = std::make_unique<IteratorEntry>(context.getReadConsistency(), bucket, fields, selection,
versions, _defaultSerializedSize, _ignoreMaxBytes);
@@ -566,7 +548,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
LOG(spam, "createBucket(%s)", b.toString().c_str());
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace());
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -583,7 +565,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
LOG(spam, "deleteBucket(%s)", b.toString().c_str());
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ HandlerSnapshot::UP snap = getHandlerSnapshot(b.getBucketSpace());
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -596,7 +578,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&)
BucketIdListResult
-PersistenceEngine::getModifiedBuckets(BucketSpace) const
+PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
typedef BucketIdListResultV MBV;
@@ -605,7 +587,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace) const
LockGuard guard(_lock);
extraModifiedBuckets.swap(_extraModifiedBuckets);
}
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ HandlerSnapshot::UP snap = getHandlerSnapshot(bucketSpace);
SynchronizedBucketIdListResultHandler resultHandler(snap->size() + extraModifiedBuckets.size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -624,7 +606,9 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
LOG(spam, "split(%s, %s, %s)", source.toString().c_str(), target1.toString().c_str(), target2.toString().c_str());
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ assert(source.getBucketSpace() == target1.getBucketSpace());
+ assert(source.getBucketSpace() == target2.getBucketSpace());
+ HandlerSnapshot::UP snap = getHandlerSnapshot(source.getBucketSpace());
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -641,7 +625,9 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
LOG(spam, "join(%s, %s, %s)", source1.toString().c_str(), source2.toString().c_str(), target.toString().c_str());
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ assert(source1.getBucketSpace() == target.getBucketSpace());
+ assert(source2.getBucketSpace() == target.getBucketSpace());
+ HandlerSnapshot::UP snap = getHandlerSnapshot(target.getBucketSpace());
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
@@ -746,9 +732,10 @@ public:
};
void
-PersistenceEngine::populateInitialBucketDB(IPersistenceHandler &targetHandler)
+PersistenceEngine::populateInitialBucketDB(BucketSpace bucketSpace,
+ IPersistenceHandler &targetHandler)
{
- HandlerSnapshot::UP snap = getHandlerSnapshot();
+ HandlerSnapshot::UP snap = getHandlerSnapshot(bucketSpace);
size_t snapSize(snap->size());
size_t flawed = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 758b39f4417..d9a27f9e460 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -3,6 +3,7 @@
#include "document_iterator.h"
#include "i_resource_write_filter.h"
+#include "persistence_handler_map.h"
#include <vespa/document/bucket/bucketspace.h>
#include <vespa/persistence/spi/abstractpersistenceprovider.h>
#include <vespa/searchcore/proton/common/handlermap.hpp>
@@ -17,7 +18,8 @@ class IPersistenceEngineOwner;
class PersistenceEngine : public storage::spi::AbstractPersistenceProvider {
private:
- typedef vespalib::Sequence<IPersistenceHandler *> PersistenceHandlerSequence;
+ using PersistenceHandlerSequence = vespalib::Sequence<IPersistenceHandler *>;
+ using HandlerSnapshot = PersistenceHandlerMap::HandlerSnapshot;
using DocumentUpdate = document::DocumentUpdate;
using Bucket = storage::spi::Bucket;
using BucketIdListResult = storage::spi::BucketIdListResult;
@@ -39,23 +41,6 @@ private:
using Timestamp = storage::spi::Timestamp;
using TimestampList = storage::spi::TimestampList;
using UpdateResult = storage::spi::UpdateResult;
- class HandlerSnapshot {
- private:
- PersistenceHandlerSequence::UP _handlers;
- size_t _size;
- public:
- typedef std::unique_ptr<HandlerSnapshot> UP;
- HandlerSnapshot(PersistenceHandlerSequence::UP handlers_, size_t size_) :
- _handlers(handlers_.release()),
- _size(size_)
- {}
- HandlerSnapshot(const HandlerSnapshot &) = delete;
- HandlerSnapshot & operator = (const HandlerSnapshot &) = delete;
-
- size_t size() const { return _size; }
- PersistenceHandlerSequence &handlers() { return *_handlers; }
- static PersistenceHandlerSequence::UP release(HandlerSnapshot && rhs) { return std::move(rhs._handlers); }
- };
struct IteratorEntry {
PersistenceHandlerSequence::UP handler_sequence;
@@ -79,7 +64,7 @@ private:
const ssize_t _defaultSerializedSize;
const bool _ignoreMaxBytes;
- mutable HandlerMap<IPersistenceHandler> _handlers;
+ mutable PersistenceHandlerMap _handlers;
vespalib::Lock _lock;
Iterators _iterators;
vespalib::Lock _iterators_lock;
@@ -89,8 +74,12 @@ private:
mutable BucketIdListResultV _extraModifiedBuckets;
mutable std::shared_timed_mutex _rwMutex;
+ IPersistenceHandler::SP getHandler(document::BucketSpace bucketSpace,
+ const DocTypeName &docType) const;
HandlerSnapshot::UP getHandlerSnapshot() const;
- HandlerSnapshot::UP getHandlerSnapshot(const document::DocumentId &) const;
+ HandlerSnapshot::UP getHandlerSnapshot(document::BucketSpace bucketSpace) const;
+ HandlerSnapshot::UP getHandlerSnapshot(document::BucketSpace bucketSpace,
+ const document::DocumentId &docId) const;
void saveClusterState(const ClusterState &calc);
ClusterState::SP savedClusterState() const;
@@ -106,7 +95,6 @@ public:
IPersistenceHandler::SP putHandler(document::BucketSpace bucketSpace,
const DocTypeName &docType,
const IPersistenceHandler::SP &handler);
- IPersistenceHandler::SP getHandler(const DocTypeName &docType) const;
IPersistenceHandler::SP removeHandler(document::BucketSpace bucketSpace,
const DocTypeName &docType);
@@ -137,7 +125,7 @@ public:
void destroyIterators();
void propagateSavedClusterState(IPersistenceHandler &handler);
void grabExtraModifiedBuckets(IPersistenceHandler &handler);
- void populateInitialBucketDB(IPersistenceHandler &targetHandler);
+ void populateInitialBucketDB(BucketSpace bucketSpace, IPersistenceHandler &targetHandler);
std::unique_lock<std::shared_timed_mutex> getWLock() const;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 82db32adb7b..34cfaaeddb2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -585,7 +585,7 @@ Proton::addDocumentDB(const document::DocumentType &docType,
if (!_isInitializing) {
_persistenceEngine->
propagateSavedClusterState(*persistenceHandler);
- _persistenceEngine->populateInitialBucketDB(*persistenceHandler);
+ _persistenceEngine->populateInitialBucketDB(bucketSpace, *persistenceHandler);
}
// TODO: Fix race with new cluster state setting.
_persistenceEngine->putHandler(bucketSpace, docTypeName, persistenceHandler);