aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-05-12 16:07:44 +0200
committerGitHub <noreply@github.com>2022-05-12 16:07:44 +0200
commit6f7d407ebcfb430136c6335dd731e79af56965d7 (patch)
tree15d31c684758c573a954e913fd4460fd79d02761
parent2f1beb1a191580fde26e1b855a1fb7426c789e30 (diff)
parent742d5d49f147ff652d78f5b8d270c81008d96283 (diff)
Merge pull request #22571 from vespa-engine/vekterli/improve-handling-of-preempted-bucket-move
Don't attempt to execute document moves from a cancelled bucket mover [run-systemtest]
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp51
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h15
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h2
6 files changed, 106 insertions, 6 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
index 060215c4521..953cfcf733f 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
@@ -14,7 +14,9 @@ DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors)
: _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)),
_lock(),
_cond(),
- _inFlight()
+ _inFlight(),
+ _defer_tasks(false),
+ _deferred_tasks()
{
}
@@ -24,6 +26,15 @@ DummyBucketExecutor::~DummyBucketExecutor() {
void
DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) {
+ if (!_defer_tasks) {
+ internal_execute_no_defer(bucket, std::move(task));
+ } else {
+ _deferred_tasks.emplace_back(bucket, std::move(task));
+ }
+}
+
+void
+DummyBucketExecutor::internal_execute_no_defer(const Bucket& bucket, std::unique_ptr<BucketTask> task) {
auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() {
{
std::unique_lock guard(_lock);
@@ -45,6 +56,44 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask>
}
void
+DummyBucketExecutor::defer_new_tasks() {
+ std::lock_guard guard(_lock);
+ _defer_tasks = true;
+}
+
+void
+DummyBucketExecutor::schedule_all_deferred_tasks() {
+ DeferredTasks to_run;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ _deferred_tasks.swap(to_run);
+ }
+ for (auto& bucket_and_task : to_run) {
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+ }
+}
+
+size_t
+DummyBucketExecutor::num_deferred_tasks() const noexcept {
+ std::lock_guard guard(_lock);
+ return _deferred_tasks.size();
+}
+
+void
+DummyBucketExecutor::schedule_single_deferred_task() {
+ std::pair<Bucket, std::unique_ptr<BucketTask>> bucket_and_task;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ assert(!_deferred_tasks.empty());
+ bucket_and_task = std::move(_deferred_tasks.front());
+ _deferred_tasks.pop_front();
+ }
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+}
+
+void
DummyBucketExecutor::sync() {
_executor->sync();
}
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
index 86b497437fe..3e1432f7f54 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
@@ -4,6 +4,7 @@
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/util/threadexecutor.h>
+#include <deque>
#include <mutex>
#include <condition_variable>
#include <unordered_set>
@@ -19,11 +20,21 @@ public:
~DummyBucketExecutor() override;
void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
void sync();
+ void defer_new_tasks();
+ [[nodiscard]] size_t num_deferred_tasks() const noexcept;
+ void schedule_single_deferred_task();
+ void schedule_all_deferred_tasks();
private:
+ void internal_execute_no_defer(const Bucket & bucket, std::unique_ptr<BucketTask> task);
+
+ using DeferredTasks = std::deque<std::pair<Bucket, std::unique_ptr<BucketTask>>>;
+
std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
- std::mutex _lock;
+ mutable std::mutex _lock;
std::condition_variable _cond;
- std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ bool _defer_tasks;
+ DeferredTasks _deferred_tasks;
};
}
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index 9bc374b8386..2db34e45140 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -294,7 +294,7 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
EXPECT_EQ(0, numPending());
EXPECT_EQ(7u, docsMoved().size());
EXPECT_EQ(3u, bucketsModified().size());
- EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+ ASSERT_EQ(_ready.bucket(2), bucketsModified()[0]);
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]);
EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]);
}
@@ -481,6 +481,36 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]);
}
+TEST_F(ControllerFixture, bucket_change_notification_is_not_lost_with_concurrent_bucket_movers)
+{
+ addReady(_ready.bucket(1));
+ _bmj->recompute(); // Bucket 1 should be (and is) ready, bucket 2 is ready (but should not be).
+ _bucketExecutor.defer_new_tasks(); // Don't execute immediately, we need to force multiple pending moves
+ masterExecute([this]() {
+ deactivateBucket(_ready.bucket(2));
+ _bmj->scanAndMove(4, 3);
+ // New deactivation received from above prior to completion of scan. This can happen since
+ // moves are asynchronous and the distributor can send new (de-)activations before the old move is done.
+ // In our case, we've enforced that another move is already pending in the bucket executor.
+ deactivateBucket(_ready.bucket(2));
+ _bmj->scanAndMove(4, 3);
+ });
+ sync();
+ ASSERT_EQ(_bucketExecutor.num_deferred_tasks(), 2u);
+ _bucketExecutor.schedule_single_deferred_task();
+ sync();
+ // We have to fake that moving a document marks it as not found in the source sub DB.
+ // This doesn't automatically happen when using mocks. The most important part is that
+ // we ensure that moving isn't erroneously tested as if it were idempotent.
+ for (const auto& move : docsMoved()) {
+ failRetrieveForLid(move.getPrevLid());
+ }
+ _bucketExecutor.schedule_single_deferred_task();
+ sync();
+ EXPECT_TRUE(_bmj->done());
+ ASSERT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+}
TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index e010240f5f8..45cac1d02e9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -159,7 +159,7 @@ BucketMoveJob::needMove(const ScanIterator &itr) const {
return noMove;
}
const bool wantReady = (shouldBeReady == Trinary::True) || isActive;
- LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)",
+ LOG(spam, "needMove(): bucket(%s), shouldBeReady(%s), active(%s)",
itr.getBucket().toString().c_str(), toStr(shouldBeReady), toStr(isActive));
if (wantReady) {
if (!hasNotReadyDocs) {
@@ -241,6 +241,11 @@ BucketMoveJob::prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::Move
void
BucketMoveJob::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) {
BucketMover & mover = ops.mover();
+ if (mover.cancelled()) {
+ LOG(spam, "completeMove(%s, mover@%p): mover already cancelled, not processing it further",
+ mover.getBucket().toString().c_str(), &mover);
+ return;
+ }
mover.moveDocuments(std::move(ops.success()), std::move(onDone));
ops.failed().clear();
if (checkIfMoverComplete(mover)) {
@@ -280,6 +285,7 @@ void
BucketMoveJob::cancelBucket(BucketId bucket) {
auto inFlight = _bucketsInFlight.find(bucket);
if (inFlight != _bucketsInFlight.end()) {
+ LOG(spam, "cancelBucket(%s): cancelling existing mover %p", bucket.toString().c_str(), inFlight->second.get());
inFlight->second->cancel();
checkIfMoverComplete(*inFlight->second);
}
@@ -329,7 +335,7 @@ std::shared_ptr<BucketMover>
BucketMoveJob::createMover(BucketId bucket, bool wantReady) {
const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready);
const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
- LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
+ LOG(debug, "createMover(): BucketMover::create(%s, source:%u, target:%u)",
bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id());
return BucketMover::create(bucket, &source, target.sub_db_id(), _moveHandler);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index e7977c7380b..7a1826fe8ff 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -79,6 +79,7 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB
_started(0),
_completed(0),
_needReschedule(false),
+ _cancelled(false),
_allScheduled(false),
_lastGidValid(false),
_lastGid()
@@ -138,6 +139,7 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba
void
BucketMover::cancel() {
+ _cancelled = true;
setAllScheduled();
_needReschedule.store(true, std::memory_order_relaxed);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index b8f1f7c732c..f908d019ca7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -123,6 +123,7 @@ public:
const document::BucketId &getBucket() const { return _bucket; }
void cancel();
+ [[nodiscard]] bool cancelled() const noexcept { return _cancelled; }
void setAllScheduled() { _allScheduled = true; }
/// Signals all documents have been scheduled for move
bool allScheduled() const { return _allScheduled; }
@@ -147,6 +148,7 @@ private:
std::atomic<uint32_t> _started;
std::atomic<uint32_t> _completed;
std::atomic<bool> _needReschedule;
+ bool _cancelled;
bool _allScheduled; // All moves started, or operation has been cancelled
bool _lastGidValid;
document::GlobalId _lastGid;