diff options
31 files changed, 103 insertions, 103 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp index 7ffb26cd922..2151556b89d 100644 --- a/searchcore/src/tests/proton/attribute/attribute_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp @@ -200,7 +200,7 @@ public: } SerialNum test_force_commit(AttributeVector &attr, SerialNum serialNum) { commit(serialNum); - _attributeFieldWriter->sync(); + _attributeFieldWriter->sync_all(); return attr.getStatus().getLastSyncToken(); } }; diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index ffe720059c6..27574cb68df 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -253,7 +253,7 @@ public: uint64_t serialNum = _ddb->getFeedHandler().inc_serial_num(); _aw->put(serialNum, doc, lid, std::shared_ptr<IDestructorCallback>()); _aw->forceCommit(serialNum, std::shared_ptr<IDestructorCallback>()); - _ddb->getReadySubDB()->getAttributeManager()->getAttributeFieldWriter().sync(); + _ddb->getReadySubDB()->getAttributeManager()->getAttributeFieldWriter().sync_all(); _sa->put(serialNum, lid, doc); const GlobalId &gid = docId.getGlobalId(); BucketId bucketId(gid.convertToBucketId()); @@ -705,7 +705,7 @@ TEST("requireThatAttributesAreUsed") .add({{"x", "a"}, {"y", "b"}}, 4))); bjTensorAttr->commit(); }); - attributeFieldWriter.sync(); + attributeFieldWriter.sync_all(); DocsumReply::UP rep2 = dc._ddb->getDocsums(req); TEST_DO(assertTensor(make_tensor(TensorSpec("tensor(x{},y{})") diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp index ded0f624474..85c91884294 100644 --- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp +++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp @@ -164,7 +164,7 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { doc = buildDocument(doc_builder, doc_id2, word2); memory_index.insertDocument(doc_id2, *doc.get()); memory_index.commit(std::shared_ptr<vespalib::IDestructorCallback>()); - indexFieldWriter->sync(); + indexFieldWriter->sync_all(); testSearch(memory_index, word1, doc_id1); testSearch(memory_index, word2, doc_id2); diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp index 5793e366126..ca10ff01a69 100644 --- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp +++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp @@ -182,7 +182,7 @@ void Test::createIndex(const string &dir, uint32_t id, bool fusion) { addDocument(doc_builder, memory_index, *_selector, id, id + 1, "bar"); addDocument(doc_builder, memory_index, *_selector, id, id + 2, "baz"); addDocument(doc_builder, memory_index, *_selector, id, id + 3, "qux"); - _threadingService.indexFieldWriter().sync(); + _threadingService.indexFieldWriter().sync_all(); const uint32_t docIdLimit = std::min(memory_index.getDocIdLimit(), _selector->getDocIdLimit()); diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 7021f04d845..be1e2c79f70 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -145,7 +145,7 @@ struct IndexManagerTest : public ::testing::Test { _index_manager->commit(serialNum, emptyDestructorCallback); }); - _writeService.indexFieldWriter().sync(); + _writeService.indexFieldWriter().sync_all(); } void removeDocument(uint32_t docId) { SerialNum serialNum = ++_serial_num; @@ -185,7 +185,7 @@ IndexManagerTest::addDocument(uint32_t id) runAsIndex([&]() { _index_manager->putDocument(id, *doc, serialNum); _index_manager->commit(serialNum, emptyDestructorCallback); }); - _writeService.indexFieldWriter().sync(); + _writeService.indexFieldWriter().sync_all(); return doc; } @@ -406,9 +406,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) Document::UP doc = addDocument(docid); inverter.invertDocument(docid, *doc); - invertThreads->sync(); + invertThreads->sync_all(); inverter.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>()); - pushThreads->sync(); + pushThreads->sync_all(); index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size; /// Must account for both docid 0 being reserved and the extra after. @@ -425,9 +425,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) inverter.invertDocument(docid + 10, *doc); doc = addDocument(docid + 100); inverter.invertDocument(docid + 100, *doc); - invertThreads->sync(); + invertThreads->sync_all(); inverter.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>()); - pushThreads->sync(); + pushThreads->sync_all(); index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size; /// Must account for both docid 0 being reserved and the extra after. selector_size = (docid + 100 + 1) * sizeof(Source); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 433d2a954a3..57ab149404d 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -652,7 +652,7 @@ void AttributeWriter::setupAttributeMapping() { AttributeWriter::~AttributeWriter() { - _attributeFieldWriter.sync(); + _attributeFieldWriter.sync_all(); } std::vector<search::AttributeVector *> @@ -806,7 +806,7 @@ AttributeWriter::onReplayDone(uint32_t docIdLimit) [docIdLimit, attr = entry.second.attribute]() { applyReplayDone(docIdLimit, *attr); }); } - _attributeFieldWriter.sync(); + _attributeFieldWriter.sync_all(); } @@ -818,7 +818,7 @@ AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum) [wantedLidLimit, serialNum, attr=entry.second.attribute]() { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); }); } - _attributeFieldWriter.sync(); + _attributeFieldWriter.sync_all(); } bool diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp index 1c5fa0892de..4b6e67212e1 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp @@ -184,7 +184,7 @@ AttributeManager::transferExistingAttributes(const AttributeManager &currMgr, toBeAdded.push_back(aspec); } } - _attributeFieldWriter.sync(); + _attributeFieldWriter.sync_all(); } void diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index 427cbab2a14..be03b57e088 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -22,7 +22,7 @@ GidToLidChangeListener::GidToLidChangeListener(vespalib::ISequencedTaskExecutor GidToLidChangeListener::~GidToLidChangeListener() { - _attributeFieldWriter.sync(); + _attributeFieldWriter.sync_all(); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index c15d100dcfa..645c9b15f07 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -524,7 +524,7 @@ DocumentDB::performDropFeedView(IFeedView::SP feedView) // Called by executor task, delays when feed view is dropped. // Also called by DocumentDB::receive() method to keep feed view alive - _writeService.attributeFieldWriter().sync(); + _writeService.attributeFieldWriter().sync_all(); _writeService.summary().sync(); // Feed view is kept alive in the closure's shared ptr. @@ -536,8 +536,8 @@ void DocumentDB::performDropFeedView2(IFeedView::SP feedView) { // Called by executor task, delays when feed view is dropped. // Also called by DocumentDB::receive() method to keep feed view alive - _writeService.indexFieldInverter().sync(); - _writeService.indexFieldWriter().sync(); + _writeService.indexFieldInverter().sync_all(); + _writeService.indexFieldWriter().sync_all(); masterExecute([feedView]() { doNothing(feedView); }); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 558ad211e7d..cb4b396d63d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -74,11 +74,11 @@ ExecutorThreadingService::syncOnce() { if (!isMasterThread) { _masterExecutor.sync(); } - _attributeFieldWriter->sync(); + _attributeFieldWriter->sync_all(); _indexExecutor->sync(); _summaryExecutor->sync(); - _indexFieldInverter->sync(); - _indexFieldWriter->sync(); + _indexFieldInverter->sync_all(); + _indexFieldWriter->sync_all(); if (!isMasterThread) { _masterExecutor.sync(); } @@ -89,13 +89,13 @@ ExecutorThreadingService::shutdown() { _masterExecutor.shutdown(); _masterExecutor.sync(); - _attributeFieldWriter->sync(); + _attributeFieldWriter->sync_all(); _summaryExecutor->shutdown(); _summaryExecutor->sync(); _indexExecutor->shutdown(); _indexExecutor->sync(); - _indexFieldInverter->sync(); - _indexFieldWriter->sync(); + _indexFieldInverter->sync_all(); + _indexFieldWriter->sync_all(); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp index 01afcdfe8bc..5bd9ba64bae 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp @@ -314,7 +314,7 @@ FastAccessDocSubDB::onReprocessDone(SerialNum serialNum) IFeedView::SP feedView = _iFeedView.get(); IAttributeWriter::SP attrWriter = static_cast<FastAccessFeedView &>(*feedView).getAttributeWriter(); attrWriter->forceCommit(serialNum, std::shared_ptr<vespalib::IDestructorCallback>()); - _writeService.attributeFieldWriter().sync(); + _writeService.attributeFieldWriter().sync_all(); _writeService.summary().sync(); Parent::onReprocessDone(serialNum); } diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp index 747f6200962..b8985a83ef1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp @@ -91,7 +91,7 @@ void FastAccessFeedView::sync() { Parent::sync(); - _writeService.attributeFieldWriter().sync(); + _writeService.attributeFieldWriter().sync_all(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index 829725b92e5..5ee3f17e127 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -44,8 +44,8 @@ SearchableFeedView::performSync() { // Called by index write thread, delays when sync() method on it completes. assert(_writeService.index().isCurrentThread()); - _writeService.indexFieldInverter().sync(); - _writeService.indexFieldWriter().sync(); + _writeService.indexFieldInverter().sync_all(); + _writeService.indexFieldWriter().sync_all(); } void diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 16657a373ff..ed661b7499f 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -326,9 +326,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire doc = make_doc10(b); inv.invertDocument(10, *doc); - invertThreads->sync(); + invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync(); + pushThreads->sync_all(); b.startDocument("id:ns:searchdocument::11"). startIndexField("f3"). @@ -336,9 +336,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(11, *doc); - invertThreads->sync(); + invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync(); + pushThreads->sync_all(); b.startDocument("id:ns:searchdocument::12"). startIndexField("f3"). @@ -346,9 +346,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(12, *doc); - invertThreads->sync(); + invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync(); + pushThreads->sync_all(); IndexBuilder ib(schema); vespalib::string dump2dir = prefix + "dump2"; @@ -465,9 +465,9 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng DocumentInverter inv(_schema, *invertThreads, *pushThreads, fic); inv.invertDocument(10, *make_doc10(b)); - invertThreads->sync(); + invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync(); + pushThreads->sync_all(); IndexBuilder ib(_schema); TuneFileIndexing tuneFileIndexing; diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp index a8f1af08158..3bcf75680cc 100644 --- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp +++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp @@ -155,14 +155,14 @@ struct DocumentInverterTest : public ::testing::Test { } void pushDocuments() { - _invertThreads->sync(); + _invertThreads->sync_all(); uint32_t fieldId = 0; for (auto &inverter : _inv.getInverters()) { _inserter.setFieldId(fieldId); inverter->pushDocuments(); ++fieldId; } - _pushThreads->sync(); + _pushThreads->sync_all(); } }; diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp index bde78a6229f..7cf40a5be63 100644 --- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp +++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp @@ -415,7 +415,7 @@ myremove(uint32_t docId, DocumentInverter &inv, ISequencedTaskExecutor &invertThreads) { inv.removeDocument(docId); - invertThreads.sync(); + invertThreads.sync_all(); inv.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>()); } @@ -480,7 +480,7 @@ myCommit(FieldIndexCollection &fieldIndexes, ISequencedTaskExecutor &pushThreads { fieldIndex->commit(); }); ++fieldId; } - pushThreads.sync(); + pushThreads.sync_all(); } void @@ -950,9 +950,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::20"); _b.startIndexField("f0"). @@ -960,9 +960,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(20, *doc); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::30"); _b.startIndexField("f0"). @@ -991,9 +991,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(30, *doc); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::40"); _b.startIndexField("f0"). @@ -1002,9 +1002,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(40, *doc); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::999"); _b.startIndexField("f0"). @@ -1032,12 +1032,12 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) doc = _b.endDocument(); for (uint32_t docId = 10000; docId < 20000; ++docId) { _inv.invertDocument(docId, *doc); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); } - _pushThreads->sync(); + _pushThreads->sync_all(); DataStoreBase::MemStats beforeStats = getFeatureStoreMemStats(_fic); LOG(info, "Before feature compaction: allocElems=%zu, usedElems=%zu" @@ -1149,17 +1149,17 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo _b.startIndexField("f1").addStr("a").addStr("c").endField(); Document::UP doc1 = _b.endDocument(); _inv.invertDocument(1, *doc1.get()); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::2"); _b.startIndexField("f0").addStr("b").addStr("c").endField(); Document::UP doc2 = _b.endDocument(); _inv.invertDocument(2, *doc2.get()); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); EXPECT_TRUE(assertPostingList("[1]", find("a", 0))); EXPECT_TRUE(assertPostingList("[1,2]", find("b", 0))); @@ -1168,7 +1168,7 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo EXPECT_TRUE(assertPostingList("[1]", find("c", 1))); myremove(1, _inv, *_invertThreads); - _pushThreads->sync(); + _pushThreads->sync_all(); EXPECT_TRUE(assertPostingList("[]", find("a", 0))); EXPECT_TRUE(assertPostingList("[2]", find("b", 0))); @@ -1318,10 +1318,10 @@ TEST_F(UriInverterTest, require_that_uri_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); SimpleMatchData match_data; { @@ -1394,10 +1394,10 @@ TEST_F(CjkInverterTest, require_that_cjk_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads->sync(); + _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync(); + _pushThreads->sync_all(); SimpleMatchData match_data; uint32_t fieldId = _schema.getIndexFieldId("f0"); @@ -1472,7 +1472,7 @@ struct RemoverTest : public FieldIndexCollectionTest { void remove(uint32_t docId) { DocumentInverter inv(schema, *_invertThreads, *_pushThreads, fic); myremove(docId, inv, *_invertThreads); - _pushThreads->sync(); + _pushThreads->sync_all(); EXPECT_FALSE(fic.getFieldIndex(0u)->getDocumentRemover(). getStore().get(docId).valid()); } diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp index 525bc4306ec..4f03a5cb95f 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp @@ -77,8 +77,8 @@ DocumentInverter::DocumentInverter(const Schema &schema, DocumentInverter::~DocumentInverter() { - _invertThreads.sync(); - _pushThreads.sync(); + _invertThreads.sync_all(); + _pushThreads.sync_all(); } void diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp index 1574c94e164..177d8e612bd 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp @@ -73,8 +73,8 @@ MemoryIndex::MemoryIndex(const Schema& schema, MemoryIndex::~MemoryIndex() { - _invertThreads.sync(); - _pushThreads.sync(); + _invertThreads.sync_all(); + _pushThreads.sync_all(); } void @@ -109,8 +109,8 @@ MemoryIndex::removeDocument(uint32_t docId) void MemoryIndex::commit(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone) { - _invertThreads.sync(); // drain inverting into this inverter - _pushThreads.sync(); // drain use of other inverter + _invertThreads.sync_all(); // drain inverting into this inverter + _pushThreads.sync_all(); // drain use of other inverter _inverter->pushDocuments(onWriteDone); flipInverter(); } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp index 5c188c3204c..5fc6d2a69ae 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp @@ -72,7 +72,7 @@ TEST_F("testExecute", Fixture) { tv->wait(1); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -87,7 +87,7 @@ TEST_F("require that task with same component id are serialized", Fixture) tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -106,7 +106,7 @@ TEST_F("require that task with different component ids are not serialized", Fixt } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -125,7 +125,7 @@ TEST_F("require that task with same string component id are serialized", Fixture tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -146,7 +146,7 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -196,7 +196,7 @@ TEST_F("require that execute works with const lambda", Fixture) { res.push_back(i--); res.push_back(i--); }; f._threads.execute(0, lambda); f._threads.execute(0, lambda); - f._threads.sync(); + f._threads.sync_all(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -211,7 +211,7 @@ TEST_F("require that execute works with reference to lambda", Fixture) auto &lambdaref = lambda; f._threads.execute(0, lambdaref); f._threads.execute(0, lambdaref); - f._threads.sync(); + f._threads.sync_all(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -224,7 +224,7 @@ TEST_F("require that executeLambda works", Fixture) const auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); - f._threads.sync(); + f._threads.sync_all(); std::vector<int> exp({5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp index ae01452c46b..56fb570209c 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp @@ -72,7 +72,7 @@ TEST_F("testExecute", Fixture) { tv->wait(1); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -87,7 +87,7 @@ TEST_F("require that task with same id are serialized", Fixture) tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -106,7 +106,7 @@ TEST_F("require that task with different ids are serialized", Fixture) } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); + f._threads.sync_all(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 705c7174c4a..7c90f96afdd 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -76,7 +76,7 @@ TEST_F("testExecute", Fixture) { tv->wait(1); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads->sync(); + f._threads->sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -91,7 +91,7 @@ TEST_F("require that task with same component id are serialized", Fixture) tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads->sync(); + f._threads->sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -110,7 +110,7 @@ TEST_F("require that task with different component ids are not serialized", Fixt } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads->sync(); + f._threads->sync_all(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -129,7 +129,7 @@ TEST_F("require that task with same string component id are serialized", Fixture tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads->sync(); + f._threads->sync_all(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -150,7 +150,7 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads->sync(); + f._threads->sync_all(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -200,7 +200,7 @@ TEST_F("require that execute works with const lambda", Fixture) { res.push_back(i--); res.push_back(i--); }; f._threads->execute(0, lambda); f._threads->execute(0, lambda); - f._threads->sync(); + f._threads->sync_all(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -215,7 +215,7 @@ TEST_F("require that execute works with reference to lambda", Fixture) auto &lambdaref = lambda; f._threads->execute(0, lambdaref); f._threads->execute(0, lambdaref); - f._threads->sync(); + f._threads->sync_all(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -228,7 +228,7 @@ TEST_F("require that executeLambda works", Fixture) const auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); - f._threads->sync(); + f._threads->sync_all(); std::vector<int> exp({5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 7ffa396304c..4d08e14375c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -246,7 +246,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor() { - sync(); + sync_all(); { auto guard = std::unique_lock(_mutex); assert(_self.state == Self::State::OPEN); @@ -305,7 +305,7 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) } void -AdaptiveSequencedExecutor::sync() +AdaptiveSequencedExecutor::sync_all() { BarrierCompletion barrierCompletion; { diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index 185345ebc9c..ccf6ab977f3 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -147,7 +147,7 @@ public: ~AdaptiveSequencedExecutor() override; ExecutorId getExecutorId(uint64_t component) const override; void executeTask(ExecutorId id, Task::UP task) override; - void sync() override; + void sync_all() override; void setTaskLimit(uint32_t task_limit) override; ExecutorStats getStats() override; Config get_config() const; diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp index 703256c5521..ce5237f41c9 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp @@ -27,7 +27,7 @@ ForegroundTaskExecutor::executeTask(ExecutorId id, Executor::Task::UP task) } void -ForegroundTaskExecutor::sync() +ForegroundTaskExecutor::sync_all() { } diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h index 9d351aca653..615bf62afe5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h @@ -23,7 +23,7 @@ public: ExecutorId getExecutorId(uint64_t componentId) const override; void executeTask(ExecutorId id, Executor::Task::UP task) override; - void sync() override; + void sync_all() override; void setTaskLimit(uint32_t taskLimit) override; ExecutorStats getStats() override; private: diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index a6aca98a65d..8268363d335 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -58,7 +58,7 @@ public: /** * Wrap lambda function into a task and schedule it to be run. * Caller must ensure that pointers and references are valid and - * call sync before tearing down pointed to/referenced data. + * call sync_all before tearing down pointed to/referenced data. * * @param id which internal executor to use * @param function function to be wrapped in a task and later executed @@ -70,7 +70,7 @@ public: /** * Wait for all scheduled tasks to complete. */ - virtual void sync() = 0; + virtual void sync_all() = 0; virtual void setTaskLimit(uint32_t taskLimit) = 0; @@ -79,7 +79,7 @@ public: /** * Wrap lambda function into a task and schedule it to be run. * Caller must ensure that pointers and references are valid and - * call sync before tearing down pointed to/referenced data. + * call sync_all before tearing down pointed to/referenced data. * * @param componentId component id * @param function function to be wrapped in a task and later executed @@ -93,7 +93,7 @@ public: /** * Wrap lambda function into a task and schedule it to be run. * Caller must ensure that pointers and references are valid and - * call sync before tearing down pointed to/referenced data. + * call sync_all before tearing down pointed to/referenced data. * * @param id executor id * @param function function to be wrapped in a task and later executed diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 9e95bdaa3ab..038b9201724 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -51,7 +51,7 @@ SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t thre SequencedTaskExecutor::~SequencedTaskExecutor() { - sync(); + sync_all(); } SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) @@ -82,7 +82,7 @@ SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP t } void -SequencedTaskExecutor::sync() { +SequencedTaskExecutor::sync_all() { wakeup(); for (auto &executor : *_executors) { executor->sync(); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 7b49f7aac75..245d6d29780 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -24,7 +24,7 @@ public: void setTaskLimit(uint32_t taskLimit) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; ExecutorId getExecutorId(uint64_t componentId) const override; - void sync() override; + void sync_all() override; ExecutorStats getStats() override; void wakeup() override; diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp index 4f4a2e6ab6d..5ae8e96b606 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp @@ -28,10 +28,10 @@ SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Ta } void -SequencedTaskExecutorObserver::sync() +SequencedTaskExecutorObserver::sync_all() { ++_syncCnt; - _executor.sync(); + _executor.sync_all(); } std::vector<uint32_t> diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h index 71717c051e7..7e2bf968952 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h @@ -25,7 +25,7 @@ public: ExecutorId getExecutorId(uint64_t componentId) const override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; - void sync() override; + void sync_all() override; void setTaskLimit(uint32_t taskLimit) override; vespalib::ExecutorStats getStats() override; diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index de238b9eeb4..d58ab776785 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -115,7 +115,7 @@ public: void TearDown() override { if (_sequenceTaskExecutor) { - _sequenceTaskExecutor->sync(); + _sequenceTaskExecutor->sync_all(); _sequenceTaskExecutor.reset(); } _env.reset(); |