From 79cbce284155baf5b85fc1ae257fe8f2b24a6743 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 21 Oct 2021 20:08:00 +0000 Subject: No need to wait for the result of async createBucket as the result is ignored anyway, and no later operations depends on the outcome. In addition they are sequenced as the master executor conducts all operation in fifo order. --- .../src/vespa/storage/persistence/asynchandler.cpp | 26 ++++++++++++---------- .../src/vespa/storage/persistence/mergehandler.cpp | 25 ++++++++++++--------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index b3e9d596c76..5da2b65b7ff 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -91,6 +91,11 @@ private: vespalib::ISequencedTaskExecutor::ExecutorId _executorId; }; +struct Noop : public spi::OperationComplete { + void onComplete(std::unique_ptr) override { } + void addResultHandler(const spi::ResultHandler *) override { } +}; + bool bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { // Don't check document sizes, as background moving of documents in Proton @@ -162,21 +167,18 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker:: LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); } spi::Bucket bucket(cmd.getBucket()); - auto task = makeResultTask([this, tracker = std::move(tracker), bucket, active=cmd.getActive()](spi::Result::UP ignored) mutable { + auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable { // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric (void) ignored; - if (active) { - auto nestedTask = makeResultTask([tracker = std::move(tracker)](spi::Result::UP nestedIgnored) { - // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric - (void) nestedIgnored; - tracker->sendReply(); - }); - _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(nestedTask))); - } else { - tracker->sendReply(); - } + tracker->sendReply(); }); - _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); + + if (cmd.getActive()) { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique()); + _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(task))); + } else { + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); + } return tracker; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 2f19b8695a4..7727a591af7 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -662,6 +662,7 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const } namespace { + void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) @@ -680,6 +681,17 @@ findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHa } } } + +struct CheckResult : public spi::OperationComplete { + spi::Bucket _bucket; + const char *_msg; + CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { } + void onComplete(std::unique_ptr result) override { + checkResult(*result, _bucket, _msg); + } + void addResultHandler(const spi::ResultHandler *) override { } +}; + } api::StorageReply::SP @@ -898,7 +910,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker->fail(api::ReturnCode::BUSY, err); return tracker; } - checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); + _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(bucket, "create bucket")); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); auto s = std::make_shared(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); @@ -1053,16 +1066,6 @@ bool mergeLists( return !suspect; } -struct CheckResult : public spi::OperationComplete { - spi::Bucket _bucket; - const char *_msg; - CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { } - void onComplete(std::unique_ptr result) override { - checkResult(*result, _bucket, _msg); - } - void addResultHandler(const spi::ResultHandler *) override { } -}; - } MessageTracker::UP -- cgit v1.2.3