summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-05-12 11:02:48 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-05-12 13:18:11 +0000
commit742d5d49f147ff652d78f5b8d270c81008d96283 (patch)
tree68eac9c736e1b20350de6b76a5a1392a84da56d9
parent32e26201ca86681150eb47661ae551a1c188c594 (diff)
Don't attempt to actually execute document moves from a cancelled bucket mover
This prevents the following race condition where the bucket mover logic fails to notify the content layer that the bucket sub DB status has changed for a particular bucket: 1. Bucket state is changed over SPI, a mover is created and registered and a BucketTask is scheduled onto the persistence queues to actually do the document reads and finalize the move. 2. Before the bucket task is executed, bucket state is changed again over the SPI. A new mover is created, the old one is cancelled (tagging mover as not consistent) and another BucketTask is scheduled onto the persistence queues. Note: the old task still remains. 3. Old bucket task is executed and performs the actual document moving despite being cancelled. No notification is done towards the content layer since the mover was tagged as not being consistent. 4. New bucket task is executed and tries to move the same document set as the old mover. Since the documents are no longer present in the source document DB, the moves fail. This tags the mover as inconsistent and no notification is done. Bucket is automatically rechecked, but since all docs are already moved away there is nothing more to do and no subsequent mover is created. This means the "should notify?" edge is not triggered and the content layer remains blissfully unaware of any sub DB changes. This commit simply changes cancellation to actually inhibit document moves from taking place. This lets the preempting mover successfully complete its moves, thus triggering the notify-edge as expected.
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp51
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h15
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h2
6 files changed, 106 insertions, 6 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
index 060215c4521..953cfcf733f 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
@@ -14,7 +14,9 @@ DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors)
: _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)),
_lock(),
_cond(),
- _inFlight()
+ _inFlight(),
+ _defer_tasks(false),
+ _deferred_tasks()
{
}
@@ -24,6 +26,15 @@ DummyBucketExecutor::~DummyBucketExecutor() {
void
DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) {
+ if (!_defer_tasks) {
+ internal_execute_no_defer(bucket, std::move(task));
+ } else {
+ _deferred_tasks.emplace_back(bucket, std::move(task));
+ }
+}
+
+void
+DummyBucketExecutor::internal_execute_no_defer(const Bucket& bucket, std::unique_ptr<BucketTask> task) {
auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() {
{
std::unique_lock guard(_lock);
@@ -45,6 +56,44 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask>
}
void
+DummyBucketExecutor::defer_new_tasks() {
+ std::lock_guard guard(_lock);
+ _defer_tasks = true;
+}
+
+void
+DummyBucketExecutor::schedule_all_deferred_tasks() {
+ DeferredTasks to_run;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ _deferred_tasks.swap(to_run);
+ }
+ for (auto& bucket_and_task : to_run) {
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+ }
+}
+
+size_t
+DummyBucketExecutor::num_deferred_tasks() const noexcept {
+ std::lock_guard guard(_lock);
+ return _deferred_tasks.size();
+}
+
+void
+DummyBucketExecutor::schedule_single_deferred_task() {
+ std::pair<Bucket, std::unique_ptr<BucketTask>> bucket_and_task;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ assert(!_deferred_tasks.empty());
+ bucket_and_task = std::move(_deferred_tasks.front());
+ _deferred_tasks.pop_front();
+ }
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+}
+
+void
DummyBucketExecutor::sync() {
_executor->sync();
}
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
index 86b497437fe..3e1432f7f54 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
@@ -4,6 +4,7 @@
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/util/threadexecutor.h>
+#include <deque>
#include <mutex>
#include <condition_variable>
#include <unordered_set>
@@ -19,11 +20,21 @@ public:
~DummyBucketExecutor() override;
void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
void sync();
+ void defer_new_tasks();
+ [[nodiscard]] size_t num_deferred_tasks() const noexcept;
+ void schedule_single_deferred_task();
+ void schedule_all_deferred_tasks();
private:
+ void internal_execute_no_defer(const Bucket & bucket, std::unique_ptr<BucketTask> task);
+
+ using DeferredTasks = std::deque<std::pair<Bucket, std::unique_ptr<BucketTask>>>;
+
std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
- std::mutex _lock;
+ mutable std::mutex _lock;
std::condition_variable _cond;
- std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ bool _defer_tasks;
+ DeferredTasks _deferred_tasks;
};
}
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index 9bc374b8386..2db34e45140 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -294,7 +294,7 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
EXPECT_EQ(0, numPending());
EXPECT_EQ(7u, docsMoved().size());
EXPECT_EQ(3u, bucketsModified().size());
- EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+ ASSERT_EQ(_ready.bucket(2), bucketsModified()[0]);
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]);
EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]);
}
@@ -481,6 +481,36 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]);
}
+TEST_F(ControllerFixture, bucket_change_notification_is_not_lost_with_concurrent_bucket_movers)
+{
+ addReady(_ready.bucket(1));
+ _bmj->recompute(); // Bucket 1 should be (and is) ready, bucket 2 is ready (but should not be).
+ _bucketExecutor.defer_new_tasks(); // Don't execute immediately, we need to force multiple pending moves
+ masterExecute([this]() {
+ deactivateBucket(_ready.bucket(2));
+ _bmj->scanAndMove(4, 3);
+ // New deactivation received from above prior to completion of scan. This can happen since
+ // moves are asynchronous and the distributor can send new (de-)activations before the old move is done.
+ // In our case, we've enforced that another move is already pending in the bucket executor.
+ deactivateBucket(_ready.bucket(2));
+ _bmj->scanAndMove(4, 3);
+ });
+ sync();
+ ASSERT_EQ(_bucketExecutor.num_deferred_tasks(), 2u);
+ _bucketExecutor.schedule_single_deferred_task();
+ sync();
+ // We have to fake that moving a document marks it as not found in the source sub DB.
+ // This doesn't automatically happen when using mocks. The most important part is that
+ // we ensure that moving isn't erroneously tested as if it were idempotent.
+ for (const auto& move : docsMoved()) {
+ failRetrieveForLid(move.getPrevLid());
+ }
+ _bucketExecutor.schedule_single_deferred_task();
+ sync();
+ EXPECT_TRUE(_bmj->done());
+ ASSERT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+}
TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index e010240f5f8..45cac1d02e9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -159,7 +159,7 @@ BucketMoveJob::needMove(const ScanIterator &itr) const {
return noMove;
}
const bool wantReady = (shouldBeReady == Trinary::True) || isActive;
- LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)",
+ LOG(spam, "needMove(): bucket(%s), shouldBeReady(%s), active(%s)",
itr.getBucket().toString().c_str(), toStr(shouldBeReady), toStr(isActive));
if (wantReady) {
if (!hasNotReadyDocs) {
@@ -241,6 +241,11 @@ BucketMoveJob::prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::Move
void
BucketMoveJob::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) {
BucketMover & mover = ops.mover();
+ if (mover.cancelled()) {
+ LOG(spam, "completeMove(%s, mover@%p): mover already cancelled, not processing it further",
+ mover.getBucket().toString().c_str(), &mover);
+ return;
+ }
mover.moveDocuments(std::move(ops.success()), std::move(onDone));
ops.failed().clear();
if (checkIfMoverComplete(mover)) {
@@ -280,6 +285,7 @@ void
BucketMoveJob::cancelBucket(BucketId bucket) {
auto inFlight = _bucketsInFlight.find(bucket);
if (inFlight != _bucketsInFlight.end()) {
+ LOG(spam, "cancelBucket(%s): cancelling existing mover %p", bucket.toString().c_str(), inFlight->second.get());
inFlight->second->cancel();
checkIfMoverComplete(*inFlight->second);
}
@@ -329,7 +335,7 @@ std::shared_ptr<BucketMover>
BucketMoveJob::createMover(BucketId bucket, bool wantReady) {
const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready);
const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
- LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
+ LOG(debug, "createMover(): BucketMover::create(%s, source:%u, target:%u)",
bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id());
return BucketMover::create(bucket, &source, target.sub_db_id(), _moveHandler);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index e7977c7380b..7a1826fe8ff 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -79,6 +79,7 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB
_started(0),
_completed(0),
_needReschedule(false),
+ _cancelled(false),
_allScheduled(false),
_lastGidValid(false),
_lastGid()
@@ -138,6 +139,7 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba
void
BucketMover::cancel() {
+ _cancelled = true;
setAllScheduled();
_needReschedule.store(true, std::memory_order_relaxed);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index b8f1f7c732c..f908d019ca7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -123,6 +123,7 @@ public:
const document::BucketId &getBucket() const { return _bucket; }
void cancel();
+ [[nodiscard]] bool cancelled() const noexcept { return _cancelled; }
void setAllScheduled() { _allScheduled = true; }
/// Signals all documents have been scheduled for move
bool allScheduled() const { return _allScheduled; }
@@ -147,6 +148,7 @@ private:
std::atomic<uint32_t> _started;
std::atomic<uint32_t> _completed;
std::atomic<bool> _needReschedule;
+ bool _cancelled;
bool _allScheduled; // All moves started, or operation has been cancelled
bool _lastGidValid;
document::GlobalId _lastGid;