aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-01 12:48:53 +0100
committerGitHub <noreply@github.com>2021-12-01 12:48:53 +0100
commit84251d52e3e0d03cc9508895474bfc860f2d6ecd (patch)
treef13334e44dd7f5c264d29e92db1ba4a5e231d634
parent7cff9f240f16e5c02d562a691bc81a3dba830218 (diff)
parent645c655f61cfa8eaff3cc1517cfdd4a39bb84bdf (diff)
Merge pull request #20305 from vespa-engine/balder/remove-sync_all_executors
sync_all_executors is now gone.
-rw-r--r--searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp96
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp24
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h3
3 files changed, 63 insertions, 60 deletions
diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
index 1c2d903fead..b1d7ee1d0a8 100644
--- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
@@ -40,12 +40,12 @@ namespace {
class MySummaryAdapter : public test::MockSummaryAdapter {
private:
- int &_rmCount;
- int &_putCount;
- int &_heartbeatCount;
+ std::atomic<int> &_rmCount;
+ std::atomic<int> &_putCount;
+ std::atomic<int> &_heartbeatCount;
public:
- MySummaryAdapter(int &removeCount, int &putCount, int &heartbeatCount) noexcept
+ MySummaryAdapter(std::atomic<int> &removeCount, std::atomic<int> &putCount, std::atomic<int> &heartbeatCount) noexcept
: _rmCount(removeCount),
_putCount(putCount),
_heartbeatCount(heartbeatCount) {
@@ -78,18 +78,18 @@ struct MyMinimalFeedViewBase
struct MyMinimalFeedView : public MyMinimalFeedViewBase, public StoreOnlyFeedView {
using UP = std::unique_ptr<MyMinimalFeedView>;
- int removeMultiAttributesCount;
- int removeMultiIndexFieldsCount;
- int heartBeatAttributesCount;
- int heartBeatIndexedFieldsCount;
- int &outstandingMoveOps;
+ std::atomic<int> removeMultiAttributesCount;
+ std::atomic<int> removeMultiIndexFieldsCount;
+ std::atomic<int> heartBeatAttributesCount;
+ std::atomic<int> heartBeatIndexedFieldsCount;
+ std::atomic<int> &outstandingMoveOps;
MyMinimalFeedView(const ISummaryAdapter::SP &summaryAdapter,
const DocumentMetaStore::SP &metaStore,
searchcorespi::index::IThreadingService &writeService,
const PersistentParams &params,
std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit,
- int &outstandingMoveOps_) :
+ std::atomic<int> &outstandingMoveOps_) :
MyMinimalFeedViewBase(),
StoreOnlyFeedView(StoreOnlyFeedView::Context(summaryAdapter,
search::index::Schema::SP(),
@@ -127,17 +127,18 @@ struct MyMinimalFeedView : public MyMinimalFeedViewBase, public StoreOnlyFeedVie
struct MoveOperationFeedView : public MyMinimalFeedView {
using UP = std::unique_ptr<MoveOperationFeedView>;
- int putAttributesCount;
- int putIndexFieldsCount;
- int removeAttributesCount;
- int removeIndexFieldsCount;
+ std::atomic<int> putAttributesCount;
+ std::atomic<int> putIndexFieldsCount;
+ std::atomic<int> removeAttributesCount;
+ std::atomic<int> removeIndexFieldsCount;
std::vector<IDestructorCallback::SP> onWriteDoneContexts;
+ std::mutex _mutex;
MoveOperationFeedView(const ISummaryAdapter::SP &summaryAdapter,
const DocumentMetaStore::SP &metaStore,
searchcorespi::index::IThreadingService &writeService,
const PersistentParams &params,
std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit,
- int &outstandingMoveOps_) :
+ std::atomic<int> &outstandingMoveOps_) :
MyMinimalFeedView(summaryAdapter, metaStore, writeService,
params, std::move(pendingLidsForCommit), outstandingMoveOps_),
putAttributesCount(0),
@@ -149,30 +150,50 @@ struct MoveOperationFeedView : public MyMinimalFeedView {
void putAttributes(SerialNum, search::DocumentIdT, const document::Document &, OnPutDoneType onWriteDone) override {
++putAttributesCount;
EXPECT_EQUAL(1, outstandingMoveOps);
+ std::lock_guard guard(_mutex);
onWriteDoneContexts.push_back(onWriteDone);
}
void putIndexedFields(SerialNum, search::DocumentIdT, const document::Document::SP &,
OnOperationDoneType onWriteDone) override {
++putIndexFieldsCount;
EXPECT_EQUAL(1, outstandingMoveOps);
+ std::lock_guard guard(_mutex);
onWriteDoneContexts.push_back(onWriteDone);
}
void removeAttributes(SerialNum, search::DocumentIdT, OnRemoveDoneType onWriteDone) override {
++removeAttributesCount;
EXPECT_EQUAL(1, outstandingMoveOps);
+ std::lock_guard guard(_mutex);
onWriteDoneContexts.push_back(onWriteDone);
}
void removeIndexedFields(SerialNum, search::DocumentIdT, OnRemoveDoneType onWriteDone) override {
++removeIndexFieldsCount;
EXPECT_EQUAL(1, outstandingMoveOps);
+ std::lock_guard guard(_mutex);
onWriteDoneContexts.push_back(onWriteDone);
}
- void clearWriteDoneContexts() { onWriteDoneContexts.clear(); }
+ void clearWriteDoneContexts() {
+ std::lock_guard guard(_mutex);
+ onWriteDoneContexts.clear();
+ }
+ void waitFor(uint32_t expected) {
+ while (true) {
+ std::lock_guard guard(_mutex);
+ if (expected == onWriteDoneContexts.size()) {
+ bool ok = true;
+ for (uint32_t i(0); ok && i < expected; i++) {
+ // One for attributes, and one for indexes
+ ok = (onWriteDoneContexts[i].use_count() == 2);
+ }
+ if (ok) return;
+ }
+ }
+ }
};
struct MoveOperationCallback : public IDestructorCallback {
- int &outstandingMoveOps;
- explicit MoveOperationCallback(int &outstandingMoveOps_) noexcept : outstandingMoveOps(outstandingMoveOps_) {
+ std::atomic<int> &outstandingMoveOps;
+ explicit MoveOperationCallback(std::atomic<int> &outstandingMoveOps_) noexcept : outstandingMoveOps(outstandingMoveOps_) {
++outstandingMoveOps;
}
~MoveOperationCallback() override {
@@ -185,10 +206,10 @@ const uint32_t subdb_id = 0;
template <typename FeedViewType>
struct FixtureBase {
- int removeCount;
- int putCount;
- int heartbeatCount;
- int outstandingMoveOps;
+ std::atomic<int> removeCount;
+ std::atomic<int> putCount;
+ std::atomic<int> heartbeatCount;
+ std::atomic<int> outstandingMoveOps;
DocumentMetaStore::SP metaStore;
vespalib::ThreadStackExecutor sharedExecutor;
ExecutorThreadingService writeService;
@@ -241,11 +262,6 @@ struct FixtureBase {
}
template <typename FunctionType>
- void runInMasterAndSyncAll(FunctionType func) {
- test::runInMaster(writeService, func);
- writeService.sync_all_executors();
- }
- template <typename FunctionType>
void runInMasterAndSync(FunctionType func) {
test::runInMasterAndSync(writeService, func);
}
@@ -288,11 +304,25 @@ struct MoveFixture : public FixtureBase<MoveOperationFeedView> {
feedview->clearWriteDoneContexts();
EXPECT_EQUAL(0, outstandingMoveOps);
}
+
+ void handleMove(const MoveOperation & op, long expected) {
+ auto ctx = beginMoveOp();
+ runInMasterAndSync([&, ctx]() {
+ feedview->handleMove(op, std::move(ctx));
+ });
+ // First we wait for everything propagated to MinimalFeedView
+ while (ctx.use_count() > (expected + 1)) {
+ LOG(info, "use_count = %ld", ctx.use_count());
+ std::this_thread::sleep_for(1s);
+ }
+ // And then we must wait for everyone else to finish up too.
+ feedview->waitFor(expected*2);
+ }
};
TEST_F("require that prepareMove sets target db document id", Fixture)
{
- Document::SP doc(new Document);
+ auto doc = std::make_shared<Document>();
MoveOperation op(BucketId(20, 42), Timestamp(10), doc, 1, subdb_id + 1);
f.runInMasterAndSync([&]() { f.feedview->prepareMove(op); });
@@ -323,7 +353,7 @@ TEST_F("require that handleMove() adds document to target and removes it from so
MoveOperation::UP op = makeMoveOp(DbDocumentId(subdb_id + 1, 1), subdb_id);
TEST_DO(f.assertPutCount(0));
f.runInMasterAndSync([&]() { f.feedview->prepareMove(*op); });
- f.runInMasterAndSyncAll([&]() { f.feedview->handleMove(*op, f.beginMoveOp()); });
+ f.handleMove(*op, 1);
TEST_DO(f.assertPutCount(1));
TEST_DO(f.assertAndClearMoveOp());
lid = op->getDbDocumentId().getLid();
@@ -335,7 +365,7 @@ TEST_F("require that handleMove() adds document to target and removes it from so
MoveOperation::UP op = makeMoveOp(DbDocumentId(subdb_id, 1), subdb_id + 1);
op->setDbDocumentId(DbDocumentId(subdb_id + 1, 1));
TEST_DO(f.assertRemoveCount(0));
- f.runInMasterAndSyncAll([&]() { f.feedview->handleMove(*op, f.beginMoveOp()); });
+ f.handleMove(*op, 1);
EXPECT_FALSE(f.metaStore->validLid(lid));
TEST_DO(f.assertRemoveCount(1));
TEST_DO(f.assertAndClearMoveOp());
@@ -344,7 +374,7 @@ TEST_F("require that handleMove() adds document to target and removes it from so
TEST_F("require that handleMove() handles move within same subdb and propagates destructor callback", MoveFixture)
{
- Document::SP doc(new Document);
+ auto doc = std::make_shared<Document>();
DocumentId doc1id("id:test:foo:g=foo:1");
uint32_t docSize = 1;
f.runInMasterAndSync([&]() {
@@ -363,7 +393,7 @@ TEST_F("require that handleMove() handles move within same subdb and propagates
op->setTargetLid(1);
TEST_DO(f.assertPutCount(0));
TEST_DO(f.assertRemoveCount(0));
- f.runInMasterAndSyncAll([&]() { f.feedview->handleMove(*op, f.beginMoveOp()); });
+ f.handleMove(*op, 2);
TEST_DO(f.assertPutCount(1));
TEST_DO(f.assertRemoveCount(1));
TEST_DO(f.assertAndClearMoveOp());
@@ -377,7 +407,7 @@ TEST_F("require that prune removed documents removes documents",
{
f.addDocsToMetaStore(3);
- LidVectorContext::SP lids(new LidVectorContext(4));
+ auto lids = std::make_shared<LidVectorContext>(4);
lids->addLid(1);
lids->addLid(3);
PruneRemovedDocumentsOperation op(lids->getDocIdLimit(), subdb_id);
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index bca8e89d69e..1e3fd5ee158 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -111,14 +111,6 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
ExecutorThreadingService::~ExecutorThreadingService() = default;
void
-ExecutorThreadingService::sync_all_executors() {
- // We have multiple patterns where task A posts to B which post back to A
- for (size_t i = 0; i < 2; i++) {
- syncOnce();
- }
-}
-
-void
ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP task)
{
uint32_t limit = master_task_limit();
@@ -129,22 +121,6 @@ ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP t
}
void
-ExecutorThreadingService::syncOnce() {
- bool isMasterThread = _masterService.isCurrentThread();
- if (!isMasterThread) {
- _masterExecutor.sync();
- }
- _attribute_field_writer_ptr->sync_all();
- _indexExecutor->sync();
- _summaryExecutor->sync();
- _index_field_inverter_ptr->sync_all();
- _index_field_writer_ptr->sync_all();
- if (!isMasterThread) {
- _masterExecutor.sync();
- }
-}
-
-void
ExecutorThreadingService::shutdown()
{
_masterExecutor.shutdown().sync();
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index e55e95c6745..8040433dbde 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -36,7 +36,6 @@ private:
vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr;
std::vector<Registration> _invokeRegistrations;
- void syncOnce();
public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
/**
@@ -51,8 +50,6 @@ public:
uint32_t stackSize = 128 * 1024);
~ExecutorThreadingService() override;
- void sync_all_executors();
-
void blocking_master_execute(vespalib::Executor::Task::UP task) override;
void shutdown();