summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-03-05 13:34:05 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-03-05 13:34:05 +0000
commitda7b6f74bc8149b02b777710ab24520196840e29 (patch)
tree256b411cf58151b65ee8dd9872a3c5dedb1bc577 /searchcore
parent1eeeb4e17bf1ea5ce2e3f2cb74ee9474dd4a396e (diff)
If a document was removed or moved between th emove started and the document was retrieved
there would be a racewith state change or othe bucket changes requiring the bucket to be reconsidered. The mover would appear in sync prior to completing the move in the master thread. This should now be accounted by accounting the failed state in the mover.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h15
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp30
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h13
6 files changed, 93 insertions, 17 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
index e955f017d67..f70a4bfad11 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
@@ -64,8 +64,13 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest {
using DocumentVector = std::vector<Document::SP>;
std::shared_ptr<const DocumentTypeRepo> _repo;
DocumentVector _docs;
+ uint32_t _lid2Fail;
- MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo) : _repo(std::move(repo)), _docs() {
+ MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo)
+ : _repo(std::move(repo)),
+ _docs(),
+ _lid2Fail(0)
+ {
_docs.push_back(Document::UP()); // lid 0 invalid
}
@@ -76,9 +81,11 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest {
DocumentMetaData getDocumentMetaData(const DocumentId &) const override { return DocumentMetaData(); }
Document::UP getFullDocument(DocumentIdT lid) const override {
- return Document::UP(_docs[lid]->clone());
+ return (lid != _lid2Fail) ? Document::UP(_docs[lid]->clone()) : Document::UP();
}
+ void failRetrieveForLid(uint32_t lid) { _lid2Fail = lid; }
+
CachedSelect::SP parseSelect(const vespalib::string &) const override {
return {};
}
@@ -115,6 +122,10 @@ struct MySubDb {
void insertDocs(const UserDocuments &docs_);
+ void failRetrieveForLid(uint32_t lid) {
+ _realRetriever->failRetrieveForLid(lid);
+ }
+
BucketId bucket(uint32_t userId) const {
return _docs.getBucket(userId);
}
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
index 99692ec53bd..d3f4b2fbedd 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
@@ -70,6 +70,14 @@ struct ControllerFixtureBase : public ::testing::Test
_bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE);
return *this;
}
+ void failRetrieveForLid(uint32_t lid) {
+ _ready.failRetrieveForLid(lid);
+ _notReady.failRetrieveForLid(lid);
+ }
+ void fixRetriever() {
+ _ready.failRetrieveForLid(0);
+ _notReady.failRetrieveForLid(0);
+ }
const MoveOperationVector &docsMoved() const {
return _moveHandler._moves;
}
@@ -200,6 +208,28 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc
EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
}
+TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error)
+{
+ // bucket 2 should be moved
+ addReady(_ready.bucket(1));
+ _bmj.recompute();
+ failRetrieveForLid(5);
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_FALSE(_bmj.done());
+ fixRetriever();
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]);
+ assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]);
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+}
+
TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
{
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
index f49d806b6d4..143d8f290c6 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
@@ -35,6 +35,7 @@ struct DocumentMoverTest : ::testing::Test
test::UserDocumentsBuilder _builder;
std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB;
MyMoveOperationLimiter _limiter;
+ //TODO When we retire old bucket move job me must make rewrite this test to use the BucketMover directly.
DocumentBucketMover _mover;
MySubDbTwoBuckets _source;
bucketdb::BucketDBOwner _bucketDb;
@@ -71,8 +72,10 @@ TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done)
MyMoveOperationLimiter limiter;
DocumentBucketMover mover(limiter, _bucketDb);
EXPECT_TRUE(mover.bucketDone());
+ EXPECT_FALSE(mover.needReschedule());
mover.moveDocuments(2);
EXPECT_TRUE(mover.bucketDone());
+ EXPECT_FALSE(mover.needReschedule());
}
TEST_F(DocumentMoverTest, require_that_we_can_move_all_documents)
@@ -136,4 +139,16 @@ TEST_F(DocumentMoverTest, require_that_we_can_move_documents_in_several_steps)
EXPECT_EQ(5u, _handler._moves.size());
}
+TEST_F(DocumentMoverTest, require_that_cancel_signal_rescheduling_need) {
+ setupForBucket(_source.bucket(1), 6, 9);
+ EXPECT_FALSE(_mover.bucketDone());
+ EXPECT_FALSE(_mover.needReschedule());
+ EXPECT_TRUE(moveDocuments(2));
+ EXPECT_FALSE(_mover.bucketDone());
+ EXPECT_FALSE(_mover.needReschedule());
+ _mover.cancel();
+ EXPECT_TRUE(_mover.bucketDone());
+ EXPECT_TRUE(_mover.needReschedule());
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 84b693db4ba..cef73d2975c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -216,7 +216,7 @@ void
BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
auto [keys, done] = mover->getKeysToMove(maxDocsToMove);
if (done) {
- mover->setBucketDone();
+ mover->setAllScheduled();
}
if (keys.empty()) return;
if (_stopped.load(std::memory_order_relaxed)) return;
@@ -247,12 +247,20 @@ BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> op
void
BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) {
- if (mover->bucketDone() && mover->inSync()) {
+ bool bucketMoveComplete = mover->allScheduled() && mover->inSync();
+ if (bucketMoveComplete || mover->needReschedule()) {
BucketId bucket = mover->getBucket();
- assert(_bucketsInFlight.contains(bucket));
- _modifiedHandler.notifyBucketModified(bucket);
+ assert(mover->needReschedule() || _bucketsInFlight.contains(bucket));
_bucketsInFlight.erase(bucket);
- updatePending();
+ if (mover->needReschedule()) {
+ reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket);
+ if (!_buckets2Move.contains(bucket)) {
+ // It failed, but all was moved anyway
+ _modifiedHandler.notifyBucketModified(bucket);
+ }
+ } else {
+ _modifiedHandler.notifyBucketModified(bucket);
+ }
if (_postponedUntilSafe.contains(bucket)) {
_postponedUntilSafe.erase(bucket);
reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket);
@@ -261,6 +269,7 @@ BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) {
_postponedUntilSafe.erase(RECOMPUTE_TOKEN);
recompute();
}
+ updatePending();
}
}
@@ -348,9 +357,9 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
const auto & mover = _movers[index];
//Move, or reduce movers as we are tailing off
- if (!mover->bucketDone()) {
+ if (!mover->allScheduled()) {
startMove(mover, maxDocsToMove);
- if (mover->bucketDone()) {
+ if (mover->allScheduled()) {
_movers.erase(_movers.begin() + index);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index 662e9e2e920..e33513df3f1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -60,7 +60,8 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB
_targetSubDbId(targetSubDbId),
_started(0),
_completed(0),
- _bucketDone(false),
+ _needReschedule(false),
+ _allScheduled(false),
_lastGidValid(false),
_lastGid()
{ }
@@ -95,6 +96,7 @@ BucketMover::createMoveOperations(std::vector<MoveKey> toMove) {
for (MoveKey &key : toMove) {
auto moveOp = createMoveOperation(key);
if (!moveOp.first) {
+ _needReschedule.store(true, std::memory_order_relaxed);
break;
}
successfulReads.push_back(std::move(moveOp));
@@ -109,6 +111,12 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba
}
}
+void
+BucketMover::cancel() {
+ setAllScheduled();
+ _needReschedule.store(true, std::memory_order_relaxed);
+}
+
}
namespace proton {
@@ -137,7 +145,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove) {
bool
DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
{
- if (_impl->bucketDone()) {
+ if (_impl->allScheduled()) {
return true;
}
auto [keys, done] = _impl->getKeysToMove(maxDocsToMove);
@@ -145,7 +153,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &
auto moveOps = _impl->createMoveOperations(std::move(keys));
bool allOk = (numKeys == moveOps.size());
if (done && allOk) {
- _impl->setBucketDone();
+ _impl->setAllScheduled();
}
if (moveOps.empty()) return allOk;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index 807151b0769..05ceb4c17e6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -82,10 +82,11 @@ public:
void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
const document::BucketId &getBucket() const { return _bucket; }
- void cancel() { setBucketDone(); }
- void setBucketDone() { _bucketDone = true; }
+ void cancel();
+ void setAllScheduled() { _allScheduled = true; }
/// Signals all documents have been scheduled for move
- bool bucketDone() const { return _bucketDone; }
+ bool allScheduled() const { return _allScheduled; }
+ bool needReschedule() const { return _needReschedule.load(std::memory_order_relaxed); }
const MaintenanceDocumentSubDB * getSource() const { return _source; }
/// Must be called in master thread
void updateLastValidGid(const document::GlobalId &gid) {
@@ -103,7 +104,8 @@ private:
std::atomic<uint32_t> _started;
std::atomic<uint32_t> _completed;
- bool _bucketDone; // All moves started, or operation has been cancelled
+ std::atomic<bool> _needReschedule;
+ bool _allScheduled; // All moves started, or operation has been cancelled
bool _lastGidValid;
document::GlobalId _lastGid;
GuardedMoveOp createMoveOperation(MoveKey & key);
@@ -139,8 +141,9 @@ public:
const document::BucketId &getBucket() const { return _impl->getBucket(); }
bool moveDocuments(size_t maxDocsToMove);
void cancel() { _impl->cancel(); }
+ bool needReschedule() { return _impl && _impl->needReschedule(); }
bool bucketDone() const {
- return !_impl || _impl->bucketDone();
+ return !_impl || _impl->allScheduled();
}
const MaintenanceDocumentSubDB * getSource() const { return _impl->getSource(); }
};