summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp26
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp25
2 files changed, 28 insertions, 23 deletions
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index b3e9d596c76..5da2b65b7ff 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -91,6 +91,11 @@ private:
vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
};
+struct Noop : public spi::OperationComplete {
+ void onComplete(std::unique_ptr<spi::Result>) override { }
+ void addResultHandler(const spi::ResultHandler *) override { }
+};
+
bool
bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
// Don't check document sizes, as background moving of documents in Proton
@@ -162,21 +167,18 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::
LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
}
spi::Bucket bucket(cmd.getBucket());
- auto task = makeResultTask([this, tracker = std::move(tracker), bucket, active=cmd.getActive()](spi::Result::UP ignored) mutable {
+ auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable {
// TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
(void) ignored;
- if (active) {
- auto nestedTask = makeResultTask([tracker = std::move(tracker)](spi::Result::UP nestedIgnored) {
- // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
- (void) nestedIgnored;
- tracker->sendReply();
- });
- _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(nestedTask)));
- } else {
- tracker->sendReply();
- }
+ tracker->sendReply();
});
- _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+
+ if (cmd.getActive()) {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<Noop>());
+ _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ } else {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ }
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 2f19b8695a4..7727a591af7 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -662,6 +662,7 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
}
namespace {
+
void
findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
@@ -680,6 +681,17 @@ findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHa
}
}
}
+
+struct CheckResult : public spi::OperationComplete {
+ spi::Bucket _bucket;
+ const char *_msg;
+ CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { }
+ void onComplete(std::unique_ptr<spi::Result> result) override {
+ checkResult(*result, _bucket, _msg);
+ }
+ void addResultHandler(const spi::ResultHandler *) override { }
+};
+
}
api::StorageReply::SP
@@ -898,7 +910,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
tracker->fail(api::ReturnCode::BUSY, err);
return tracker;
}
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<CheckResult>(bucket, "create bucket"));
+
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel());
@@ -1053,16 +1066,6 @@ bool mergeLists(
return !suspect;
}
-struct CheckResult : public spi::OperationComplete {
- spi::Bucket _bucket;
- const char *_msg;
- CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { }
- void onComplete(std::unique_ptr<spi::Result> result) override {
- checkResult(*result, _bucket, _msg);
- }
- void addResultHandler(const spi::ResultHandler *) override { }
-};
-
}
MessageTracker::UP