diff options
Diffstat (limited to 'searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 136d95a068b..114292d055d 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -306,20 +306,21 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState & } -Result -PersistenceEngine::setActiveState(const Bucket& bucket, - storage::spi::BucketInfo::ActiveState newState) +void +PersistenceEngine::setActiveStateAsync(const Bucket & bucket, BucketInfo::ActiveState newState, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace()); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); - for (; snap.handlers().valid(); snap.handlers().next()) { + auto resultHandler = std::make_shared<GenericResultHandler>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleSetActiveState(bucket, newState, resultHandler); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleSetActiveState(bucket, newState, resultHandler); + } else { + handler->handleSetActiveState(bucket, newState, std::move(resultHandler)); + } } - return *futureResult.get(); } @@ -363,7 +364,7 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc)); } @@ -383,7 +384,7 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did); } @@ -435,7 +436,7 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP if (handler == nullptr) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd)); } @@ -563,19 +564,23 @@ PersistenceEngine::createBucket(const Bucket &b, Context &) } -Result -PersistenceEngine::deleteBucket(const Bucket& b, Context&) +void +PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleDeleteBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleDeleteBucket(feedtoken::make(transportContext), b); + } else { + handler->handleDeleteBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } |