diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-01 12:48:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-01 12:48:53 +0100 |
commit | 84251d52e3e0d03cc9508895474bfc860f2d6ecd (patch) | |
tree | f13334e44dd7f5c264d29e92db1ba4a5e231d634 | |
parent | 7cff9f240f16e5c02d562a691bc81a3dba830218 (diff) | |
parent | 645c655f61cfa8eaff3cc1517cfdd4a39bb84bdf (diff) |
Merge pull request #20305 from vespa-engine/balder/remove-sync_all_executors
sync_all_executors is now gone.
3 files changed, 63 insertions, 60 deletions
diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp index 1c2d903fead..b1d7ee1d0a8 100644 --- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp @@ -40,12 +40,12 @@ namespace { class MySummaryAdapter : public test::MockSummaryAdapter { private: - int &_rmCount; - int &_putCount; - int &_heartbeatCount; + std::atomic<int> &_rmCount; + std::atomic<int> &_putCount; + std::atomic<int> &_heartbeatCount; public: - MySummaryAdapter(int &removeCount, int &putCount, int &heartbeatCount) noexcept + MySummaryAdapter(std::atomic<int> &removeCount, std::atomic<int> &putCount, std::atomic<int> &heartbeatCount) noexcept : _rmCount(removeCount), _putCount(putCount), _heartbeatCount(heartbeatCount) { @@ -78,18 +78,18 @@ struct MyMinimalFeedViewBase struct MyMinimalFeedView : public MyMinimalFeedViewBase, public StoreOnlyFeedView { using UP = std::unique_ptr<MyMinimalFeedView>; - int removeMultiAttributesCount; - int removeMultiIndexFieldsCount; - int heartBeatAttributesCount; - int heartBeatIndexedFieldsCount; - int &outstandingMoveOps; + std::atomic<int> removeMultiAttributesCount; + std::atomic<int> removeMultiIndexFieldsCount; + std::atomic<int> heartBeatAttributesCount; + std::atomic<int> heartBeatIndexedFieldsCount; + std::atomic<int> &outstandingMoveOps; MyMinimalFeedView(const ISummaryAdapter::SP &summaryAdapter, const DocumentMetaStore::SP &metaStore, searchcorespi::index::IThreadingService &writeService, const PersistentParams ¶ms, std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit, - int &outstandingMoveOps_) : + std::atomic<int> &outstandingMoveOps_) : MyMinimalFeedViewBase(), StoreOnlyFeedView(StoreOnlyFeedView::Context(summaryAdapter, search::index::Schema::SP(), @@ -127,17 +127,18 @@ struct MyMinimalFeedView : public MyMinimalFeedViewBase, public StoreOnlyFeedVie struct MoveOperationFeedView : public MyMinimalFeedView { using UP = std::unique_ptr<MoveOperationFeedView>; - int putAttributesCount; - int putIndexFieldsCount; - int removeAttributesCount; - int removeIndexFieldsCount; + std::atomic<int> putAttributesCount; + std::atomic<int> putIndexFieldsCount; + std::atomic<int> removeAttributesCount; + std::atomic<int> removeIndexFieldsCount; std::vector<IDestructorCallback::SP> onWriteDoneContexts; + std::mutex _mutex; MoveOperationFeedView(const ISummaryAdapter::SP &summaryAdapter, const DocumentMetaStore::SP &metaStore, searchcorespi::index::IThreadingService &writeService, const PersistentParams ¶ms, std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit, - int &outstandingMoveOps_) : + std::atomic<int> &outstandingMoveOps_) : MyMinimalFeedView(summaryAdapter, metaStore, writeService, params, std::move(pendingLidsForCommit), outstandingMoveOps_), putAttributesCount(0), @@ -149,30 +150,50 @@ struct MoveOperationFeedView : public MyMinimalFeedView { void putAttributes(SerialNum, search::DocumentIdT, const document::Document &, OnPutDoneType onWriteDone) override { ++putAttributesCount; EXPECT_EQUAL(1, outstandingMoveOps); + std::lock_guard guard(_mutex); onWriteDoneContexts.push_back(onWriteDone); } void putIndexedFields(SerialNum, search::DocumentIdT, const document::Document::SP &, OnOperationDoneType onWriteDone) override { ++putIndexFieldsCount; EXPECT_EQUAL(1, outstandingMoveOps); + std::lock_guard guard(_mutex); onWriteDoneContexts.push_back(onWriteDone); } void removeAttributes(SerialNum, search::DocumentIdT, OnRemoveDoneType onWriteDone) override { ++removeAttributesCount; EXPECT_EQUAL(1, outstandingMoveOps); + std::lock_guard guard(_mutex); onWriteDoneContexts.push_back(onWriteDone); } void removeIndexedFields(SerialNum, search::DocumentIdT, OnRemoveDoneType onWriteDone) override { ++removeIndexFieldsCount; EXPECT_EQUAL(1, outstandingMoveOps); + std::lock_guard guard(_mutex); onWriteDoneContexts.push_back(onWriteDone); } - void clearWriteDoneContexts() { onWriteDoneContexts.clear(); } + void clearWriteDoneContexts() { + std::lock_guard guard(_mutex); + onWriteDoneContexts.clear(); + } + void waitFor(uint32_t expected) { + while (true) { + std::lock_guard guard(_mutex); + if (expected == onWriteDoneContexts.size()) { + bool ok = true; + for (uint32_t i(0); ok && i < expected; i++) { + // One for attributes, and one for indexes + ok = (onWriteDoneContexts[i].use_count() == 2); + } + if (ok) return; + } + } + } }; struct MoveOperationCallback : public IDestructorCallback { - int &outstandingMoveOps; - explicit MoveOperationCallback(int &outstandingMoveOps_) noexcept : outstandingMoveOps(outstandingMoveOps_) { + std::atomic<int> &outstandingMoveOps; + explicit MoveOperationCallback(std::atomic<int> &outstandingMoveOps_) noexcept : outstandingMoveOps(outstandingMoveOps_) { ++outstandingMoveOps; } ~MoveOperationCallback() override { @@ -185,10 +206,10 @@ const uint32_t subdb_id = 0; template <typename FeedViewType> struct FixtureBase { - int removeCount; - int putCount; - int heartbeatCount; - int outstandingMoveOps; + std::atomic<int> removeCount; + std::atomic<int> putCount; + std::atomic<int> heartbeatCount; + std::atomic<int> outstandingMoveOps; DocumentMetaStore::SP metaStore; vespalib::ThreadStackExecutor sharedExecutor; ExecutorThreadingService writeService; @@ -241,11 +262,6 @@ struct FixtureBase { } template <typename FunctionType> - void runInMasterAndSyncAll(FunctionType func) { - test::runInMaster(writeService, func); - writeService.sync_all_executors(); - } - template <typename FunctionType> void runInMasterAndSync(FunctionType func) { test::runInMasterAndSync(writeService, func); } @@ -288,11 +304,25 @@ struct MoveFixture : public FixtureBase<MoveOperationFeedView> { feedview->clearWriteDoneContexts(); EXPECT_EQUAL(0, outstandingMoveOps); } + + void handleMove(const MoveOperation & op, long expected) { + auto ctx = beginMoveOp(); + runInMasterAndSync([&, ctx]() { + feedview->handleMove(op, std::move(ctx)); + }); + // First we wait for everything propagated to MinimalFeedView + while (ctx.use_count() > (expected + 1)) { + LOG(info, "use_count = %ld", ctx.use_count()); + std::this_thread::sleep_for(1s); + } + // And then we must wait for everyone else to finish up too. + feedview->waitFor(expected*2); + } }; TEST_F("require that prepareMove sets target db document id", Fixture) { - Document::SP doc(new Document); + auto doc = std::make_shared<Document>(); MoveOperation op(BucketId(20, 42), Timestamp(10), doc, 1, subdb_id + 1); f.runInMasterAndSync([&]() { f.feedview->prepareMove(op); }); @@ -323,7 +353,7 @@ TEST_F("require that handleMove() adds document to target and removes it from so MoveOperation::UP op = makeMoveOp(DbDocumentId(subdb_id + 1, 1), subdb_id); TEST_DO(f.assertPutCount(0)); f.runInMasterAndSync([&]() { f.feedview->prepareMove(*op); }); - f.runInMasterAndSyncAll([&]() { f.feedview->handleMove(*op, f.beginMoveOp()); }); + f.handleMove(*op, 1); TEST_DO(f.assertPutCount(1)); TEST_DO(f.assertAndClearMoveOp()); lid = op->getDbDocumentId().getLid(); @@ -335,7 +365,7 @@ TEST_F("require that handleMove() adds document to target and removes it from so MoveOperation::UP op = makeMoveOp(DbDocumentId(subdb_id, 1), subdb_id + 1); op->setDbDocumentId(DbDocumentId(subdb_id + 1, 1)); TEST_DO(f.assertRemoveCount(0)); - f.runInMasterAndSyncAll([&]() { f.feedview->handleMove(*op, f.beginMoveOp()); }); + f.handleMove(*op, 1); EXPECT_FALSE(f.metaStore->validLid(lid)); TEST_DO(f.assertRemoveCount(1)); TEST_DO(f.assertAndClearMoveOp()); @@ -344,7 +374,7 @@ TEST_F("require that handleMove() adds document to target and removes it from so TEST_F("require that handleMove() handles move within same subdb and propagates destructor callback", MoveFixture) { - Document::SP doc(new Document); + auto doc = std::make_shared<Document>(); DocumentId doc1id("id:test:foo:g=foo:1"); uint32_t docSize = 1; f.runInMasterAndSync([&]() { @@ -363,7 +393,7 @@ TEST_F("require that handleMove() handles move within same subdb and propagates op->setTargetLid(1); TEST_DO(f.assertPutCount(0)); TEST_DO(f.assertRemoveCount(0)); - f.runInMasterAndSyncAll([&]() { f.feedview->handleMove(*op, f.beginMoveOp()); }); + f.handleMove(*op, 2); TEST_DO(f.assertPutCount(1)); TEST_DO(f.assertRemoveCount(1)); TEST_DO(f.assertAndClearMoveOp()); @@ -377,7 +407,7 @@ TEST_F("require that prune removed documents removes documents", { f.addDocsToMetaStore(3); - LidVectorContext::SP lids(new LidVectorContext(4)); + auto lids = std::make_shared<LidVectorContext>(4); lids->addLid(1); lids->addLid(3); PruneRemovedDocumentsOperation op(lids->getDocIdLimit(), subdb_id); diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index bca8e89d69e..1e3fd5ee158 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -111,14 +111,6 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha ExecutorThreadingService::~ExecutorThreadingService() = default; void -ExecutorThreadingService::sync_all_executors() { - // We have multiple patterns where task A posts to B which post back to A - for (size_t i = 0; i < 2; i++) { - syncOnce(); - } -} - -void ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP task) { uint32_t limit = master_task_limit(); @@ -129,22 +121,6 @@ ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP t } void -ExecutorThreadingService::syncOnce() { - bool isMasterThread = _masterService.isCurrentThread(); - if (!isMasterThread) { - _masterExecutor.sync(); - } - _attribute_field_writer_ptr->sync_all(); - _indexExecutor->sync(); - _summaryExecutor->sync(); - _index_field_inverter_ptr->sync_all(); - _index_field_writer_ptr->sync_all(); - if (!isMasterThread) { - _masterExecutor.sync(); - } -} - -void ExecutorThreadingService::shutdown() { _masterExecutor.shutdown().sync(); diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index e55e95c6745..8040433dbde 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -36,7 +36,6 @@ private: vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr; std::vector<Registration> _invokeRegistrations; - void syncOnce(); public: using OptimizeFor = vespalib::Executor::OptimizeFor; /** @@ -51,8 +50,6 @@ public: uint32_t stackSize = 128 * 1024); ~ExecutorThreadingService() override; - void sync_all_executors(); - void blocking_master_execute(vespalib::Executor::Task::UP task) override; void shutdown(); |