summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-03-05 22:41:14 +0100
committerGitHub <noreply@github.com>2021-03-05 22:41:14 +0100
commitf8adddfb8b1494f5b303279d6bedac2252f42068 (patch)
tree3607136cb7d36f390a962d2d0080fe29c7fb749f /searchcore/src
parent1e0331fad1ca9237f31564d5e716354cf676f70a (diff)
Revert "Revert "If a document was removed or moved between th emove started and the d…""
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h15
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp30
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h13
6 files changed, 93 insertions, 17 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
index e955f017d67..f70a4bfad11 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
@@ -64,8 +64,13 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest {
using DocumentVector = std::vector<Document::SP>;
std::shared_ptr<const DocumentTypeRepo> _repo;
DocumentVector _docs;
+ uint32_t _lid2Fail;
- MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo) : _repo(std::move(repo)), _docs() {
+ MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo)
+ : _repo(std::move(repo)),
+ _docs(),
+ _lid2Fail(0)
+ {
_docs.push_back(Document::UP()); // lid 0 invalid
}
@@ -76,9 +81,11 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest {
DocumentMetaData getDocumentMetaData(const DocumentId &) const override { return DocumentMetaData(); }
Document::UP getFullDocument(DocumentIdT lid) const override {
- return Document::UP(_docs[lid]->clone());
+ return (lid != _lid2Fail) ? Document::UP(_docs[lid]->clone()) : Document::UP();
}
+ void failRetrieveForLid(uint32_t lid) { _lid2Fail = lid; }
+
CachedSelect::SP parseSelect(const vespalib::string &) const override {
return {};
}
@@ -115,6 +122,10 @@ struct MySubDb {
void insertDocs(const UserDocuments &docs_);
+ void failRetrieveForLid(uint32_t lid) {
+ _realRetriever->failRetrieveForLid(lid);
+ }
+
BucketId bucket(uint32_t userId) const {
return _docs.getBucket(userId);
}
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
index 99692ec53bd..d3f4b2fbedd 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
@@ -70,6 +70,14 @@ struct ControllerFixtureBase : public ::testing::Test
_bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE);
return *this;
}
+ void failRetrieveForLid(uint32_t lid) {
+ _ready.failRetrieveForLid(lid);
+ _notReady.failRetrieveForLid(lid);
+ }
+ void fixRetriever() {
+ _ready.failRetrieveForLid(0);
+ _notReady.failRetrieveForLid(0);
+ }
const MoveOperationVector &docsMoved() const {
return _moveHandler._moves;
}
@@ -200,6 +208,28 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc
EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
}
+TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error)
+{
+ // bucket 2 should be moved
+ addReady(_ready.bucket(1));
+ _bmj.recompute();
+ failRetrieveForLid(5);
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_FALSE(_bmj.done());
+ fixRetriever();
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]);
+ assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]);
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+}
+
TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
{
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
index f49d806b6d4..143d8f290c6 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
@@ -35,6 +35,7 @@ struct DocumentMoverTest : ::testing::Test
test::UserDocumentsBuilder _builder;
std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB;
MyMoveOperationLimiter _limiter;
+ //TODO When we retire old bucket move job me must make rewrite this test to use the BucketMover directly.
DocumentBucketMover _mover;
MySubDbTwoBuckets _source;
bucketdb::BucketDBOwner _bucketDb;
@@ -71,8 +72,10 @@ TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done)
MyMoveOperationLimiter limiter;
DocumentBucketMover mover(limiter, _bucketDb);
EXPECT_TRUE(mover.bucketDone());
+ EXPECT_FALSE(mover.needReschedule());
mover.moveDocuments(2);
EXPECT_TRUE(mover.bucketDone());
+ EXPECT_FALSE(mover.needReschedule());
}
TEST_F(DocumentMoverTest, require_that_we_can_move_all_documents)
@@ -136,4 +139,16 @@ TEST_F(DocumentMoverTest, require_that_we_can_move_documents_in_several_steps)
EXPECT_EQ(5u, _handler._moves.size());
}
+TEST_F(DocumentMoverTest, require_that_cancel_signal_rescheduling_need) {
+ setupForBucket(_source.bucket(1), 6, 9);
+ EXPECT_FALSE(_mover.bucketDone());
+ EXPECT_FALSE(_mover.needReschedule());
+ EXPECT_TRUE(moveDocuments(2));
+ EXPECT_FALSE(_mover.bucketDone());
+ EXPECT_FALSE(_mover.needReschedule());
+ _mover.cancel();
+ EXPECT_TRUE(_mover.bucketDone());
+ EXPECT_TRUE(_mover.needReschedule());
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 84b693db4ba..cef73d2975c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -216,7 +216,7 @@ void
BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
auto [keys, done] = mover->getKeysToMove(maxDocsToMove);
if (done) {
- mover->setBucketDone();
+ mover->setAllScheduled();
}
if (keys.empty()) return;
if (_stopped.load(std::memory_order_relaxed)) return;
@@ -247,12 +247,20 @@ BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> op
void
BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) {
- if (mover->bucketDone() && mover->inSync()) {
+ bool bucketMoveComplete = mover->allScheduled() && mover->inSync();
+ if (bucketMoveComplete || mover->needReschedule()) {
BucketId bucket = mover->getBucket();
- assert(_bucketsInFlight.contains(bucket));
- _modifiedHandler.notifyBucketModified(bucket);
+ assert(mover->needReschedule() || _bucketsInFlight.contains(bucket));
_bucketsInFlight.erase(bucket);
- updatePending();
+ if (mover->needReschedule()) {
+ reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket);
+ if (!_buckets2Move.contains(bucket)) {
+ // It failed, but all was moved anyway
+ _modifiedHandler.notifyBucketModified(bucket);
+ }
+ } else {
+ _modifiedHandler.notifyBucketModified(bucket);
+ }
if (_postponedUntilSafe.contains(bucket)) {
_postponedUntilSafe.erase(bucket);
reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket);
@@ -261,6 +269,7 @@ BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) {
_postponedUntilSafe.erase(RECOMPUTE_TOKEN);
recompute();
}
+ updatePending();
}
}
@@ -348,9 +357,9 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
const auto & mover = _movers[index];
//Move, or reduce movers as we are tailing off
- if (!mover->bucketDone()) {
+ if (!mover->allScheduled()) {
startMove(mover, maxDocsToMove);
- if (mover->bucketDone()) {
+ if (mover->allScheduled()) {
_movers.erase(_movers.begin() + index);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index 662e9e2e920..e33513df3f1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -60,7 +60,8 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB
_targetSubDbId(targetSubDbId),
_started(0),
_completed(0),
- _bucketDone(false),
+ _needReschedule(false),
+ _allScheduled(false),
_lastGidValid(false),
_lastGid()
{ }
@@ -95,6 +96,7 @@ BucketMover::createMoveOperations(std::vector<MoveKey> toMove) {
for (MoveKey &key : toMove) {
auto moveOp = createMoveOperation(key);
if (!moveOp.first) {
+ _needReschedule.store(true, std::memory_order_relaxed);
break;
}
successfulReads.push_back(std::move(moveOp));
@@ -109,6 +111,12 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba
}
}
+void
+BucketMover::cancel() {
+ setAllScheduled();
+ _needReschedule.store(true, std::memory_order_relaxed);
+}
+
}
namespace proton {
@@ -137,7 +145,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove) {
bool
DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
{
- if (_impl->bucketDone()) {
+ if (_impl->allScheduled()) {
return true;
}
auto [keys, done] = _impl->getKeysToMove(maxDocsToMove);
@@ -145,7 +153,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &
auto moveOps = _impl->createMoveOperations(std::move(keys));
bool allOk = (numKeys == moveOps.size());
if (done && allOk) {
- _impl->setBucketDone();
+ _impl->setAllScheduled();
}
if (moveOps.empty()) return allOk;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index 807151b0769..05ceb4c17e6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -82,10 +82,11 @@ public:
void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
const document::BucketId &getBucket() const { return _bucket; }
- void cancel() { setBucketDone(); }
- void setBucketDone() { _bucketDone = true; }
+ void cancel();
+ void setAllScheduled() { _allScheduled = true; }
/// Signals all documents have been scheduled for move
- bool bucketDone() const { return _bucketDone; }
+ bool allScheduled() const { return _allScheduled; }
+ bool needReschedule() const { return _needReschedule.load(std::memory_order_relaxed); }
const MaintenanceDocumentSubDB * getSource() const { return _source; }
/// Must be called in master thread
void updateLastValidGid(const document::GlobalId &gid) {
@@ -103,7 +104,8 @@ private:
std::atomic<uint32_t> _started;
std::atomic<uint32_t> _completed;
- bool _bucketDone; // All moves started, or operation has been cancelled
+ std::atomic<bool> _needReschedule;
+ bool _allScheduled; // All moves started, or operation has been cancelled
bool _lastGidValid;
document::GlobalId _lastGid;
GuardedMoveOp createMoveOperation(MoveKey & key);
@@ -139,8 +141,9 @@ public:
const document::BucketId &getBucket() const { return _impl->getBucket(); }
bool moveDocuments(size_t maxDocsToMove);
void cancel() { _impl->cancel(); }
+ bool needReschedule() { return _impl && _impl->needReschedule(); }
bool bucketDone() const {
- return !_impl || _impl->bucketDone();
+ return !_impl || _impl->allScheduled();
}
const MaintenanceDocumentSubDB * getSource() const { return _impl->getSource(); }
};