summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-26 14:53:37 +0100
committerGitHub <noreply@github.com>2021-11-26 14:53:37 +0100
commit6d296cbc0320e5eac9c8a1e31819f6753ac7fce8 (patch)
tree65d107a4f03ec1b35fd6fafeb780b2290ef70c2f
parente7fc5ae35312770c01784b1ea1b7a2d6cf3e70bc (diff)
parent13641d108f2022d3885c5aa3e0ce19a8d764e085 (diff)
Merge pull request #20243 from vespa-engine/balder/reduce-use-of-syncable
Remove the need for Syncable
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp3
-rw-r--r--searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp29
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.h23
-rw-r--r--searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp14
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h3
13 files changed, 57 insertions, 44 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 15b5bf81670..ed2ce3d638e 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -425,8 +425,7 @@ RPCNetwork::shutdown()
{
_transport->ShutDown(true);
_threadPool->Close();
- _executor->shutdown();
- _executor->sync();
+ _executor->shutdown().sync();
}
void
diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
index e3e6ac6321e..b80cd08ae8e 100644
--- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
@@ -234,8 +234,7 @@ MySearchableContext::MySearchableContext(IThreadingService &writeService,
IBucketDBHandlerInitializer & bucketDBHandlerInitializer)
: _fastUpdCtx(writeService, bucketDB, bucketDBHandlerInitializer),
_queryLimiter(), _clock(),
- _ctx(_fastUpdCtx._ctx, _queryLimiter,
- _clock, dynamic_cast<vespalib::SyncableThreadExecutor &>(writeService.shared()))
+ _ctx(_fastUpdCtx._ctx, _queryLimiter, _clock, writeService.shared())
{}
MySearchableContext::~MySearchableContext() = default;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 4eaa722e0ba..632f4482654 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -119,7 +119,7 @@ public:
/**
* Returns the underlying executor. Only used for state explorers.
*/
- const vespalib::SyncableThreadExecutor& get_executor() const { return _executor; }
+ const vespalib::ThreadExecutor& get_executor() const { return _executor; }
/**
* Starts the scheduling thread of this manager.
diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h
index 7cc0c97048b..3d3be775a4a 100644
--- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h
+++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h
@@ -69,7 +69,7 @@ public:
/**
* Returns the underlying executor. Only used for state explorers.
*/
- const vespalib::SyncableThreadExecutor& get_executor() const { return _executor; }
+ const vespalib::ThreadExecutor& get_executor() const { return _executor; }
/**
* Closes the request handler interface. This will prevent any more data
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
index 0d6bb07b173..704b54dc566 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
@@ -6,8 +6,6 @@
#include <vespa/vespalib/stllike/string.h>
#include <memory>
-namespace vespalib { class ThreadStackExecutorBase; }
-
namespace proton {
class DocumentDBConfigOwner;
@@ -19,7 +17,7 @@ class DocumentDBConfigOwner;
class IProtonConfigurerOwner
{
public:
- using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
+ using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>;
virtual ~IProtonConfigurerOwner() { }
virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName,
document::BucketSpace bucketSpace,
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index c57d3e26d8b..91635dc7497 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -59,8 +59,7 @@ private:
using MonitorReply = search::engine::MonitorReply;
using MonitorClient = search::engine::MonitorClient;
using DocumentDBMap = std::map<DocTypeName, DocumentDB::SP>;
- using ProtonConfigSP = BootstrapConfig::ProtonConfigSP;
- using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
+ using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>;
using BucketSpace = document::BucketSpace;
class ProtonFileHeaderContext : public search::common::FileHeaderContext
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
index 7c998ceca7c..2c891927fa3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
@@ -12,6 +12,7 @@
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/config-bucketspaces.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <future>
@@ -42,7 +43,7 @@ getBucketSpace(const BootstrapConfig &bootstrapConfig, const DocTypeName &name)
}
-ProtonConfigurer::ProtonConfigurer(vespalib::SyncableThreadExecutor &executor,
+ProtonConfigurer::ProtonConfigurer(vespalib::ThreadExecutor &executor,
IProtonConfigurerOwner &owner,
const std::unique_ptr<IProtonDiskLayout> &diskLayout)
: IProtonConfigurer(),
@@ -58,9 +59,22 @@ ProtonConfigurer::ProtonConfigurer(vespalib::SyncableThreadExecutor &executor,
{
}
-ProtonConfigurer::~ProtonConfigurer()
-{
-}
+class ProtonConfigurer::ReconfigureTask : public vespalib::Executor::Task {
+public:
+ ReconfigureTask(ProtonConfigurer & configurer)
+ : _configurer(configurer),
+ _retainGuard(configurer._pendingReconfigureTasks)
+ {}
+
+ void run() override {
+ _configurer.performReconfigure();
+ }
+private:
+ ProtonConfigurer & _configurer;
+ vespalib::RetainGuard _retainGuard;
+};
+
+ProtonConfigurer::~ProtonConfigurer() = default;
void
ProtonConfigurer::setAllowReconfig(bool allowReconfig)
@@ -72,11 +86,12 @@ ProtonConfigurer::setAllowReconfig(bool allowReconfig)
_allowReconfig = allowReconfig;
if (allowReconfig) {
// Ensure that pending config is applied
- _executor.execute(makeLambdaTask([this]() { performReconfigure(); }));
+ _executor.execute(std::make_unique<ReconfigureTask>(*this));
}
}
if (!allowReconfig) {
- _executor.sync(); // drain queued performReconfigure tasks
+ // drain queued performReconfigure tasks
+ _pendingReconfigureTasks.waitForZeroRefCount();
}
}
@@ -102,7 +117,7 @@ ProtonConfigurer::reconfigure(std::shared_ptr<ProtonConfigSnapshot> configSnapsh
std::lock_guard<std::mutex> guard(_mutex);
_pendingConfigSnapshot = configSnapshot;
if (_allowReconfig) {
- _executor.execute(makeLambdaTask([&]() { performReconfigure(); }));
+ _executor.execute(std::make_unique<ReconfigureTask>(*this));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
index 3ebfbc378d7..ddb9c1bed92 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
@@ -7,6 +7,7 @@
#include <vespa/document/bucket/bucketspace.h>
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/vespalib/net/simple_component_config_producer.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
#include <map>
#include <mutex>
@@ -25,17 +26,19 @@ class IProtonDiskLayout;
class ProtonConfigurer : public IProtonConfigurer
{
using DocumentDBs = std::map<DocTypeName, std::pair<std::weak_ptr<IDocumentDBConfigOwner>, std::weak_ptr<DocumentDBDirectoryHolder>>>;
- using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
+ using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>;
+ class ReconfigureTask;
- SyncableExecutorThreadService _executor;
- IProtonConfigurerOwner &_owner;
- DocumentDBs _documentDBs;
- std::shared_ptr<ProtonConfigSnapshot> _pendingConfigSnapshot;
- std::shared_ptr<ProtonConfigSnapshot> _activeConfigSnapshot;
- mutable std::mutex _mutex;
- bool _allowReconfig;
- vespalib::SimpleComponentConfigProducer _componentConfig;
+ ExecutorThreadService _executor;
+ IProtonConfigurerOwner &_owner;
+ DocumentDBs _documentDBs;
+ std::shared_ptr<ProtonConfigSnapshot> _pendingConfigSnapshot;
+ std::shared_ptr<ProtonConfigSnapshot> _activeConfigSnapshot;
+ mutable std::mutex _mutex;
+ bool _allowReconfig;
+ vespalib::SimpleComponentConfigProducer _componentConfig;
const std::unique_ptr<IProtonDiskLayout> &_diskLayout;
+ vespalib::MonitoredRefCount _pendingReconfigureTasks;
void performReconfigure();
bool skipConfig(const ProtonConfigSnapshot *configSnapshot, bool initialConfig);
@@ -48,7 +51,7 @@ class ProtonConfigurer : public IProtonConfigurer
void pruneInitialDocumentDBDirs(const ProtonConfigSnapshot &configSnapshot);
public:
- ProtonConfigurer(vespalib::SyncableThreadExecutor &executor,
+ ProtonConfigurer(vespalib::ThreadExecutor &executor,
IProtonConfigurerOwner &owner,
const std::unique_ptr<IProtonDiskLayout> &diskLayout);
diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h
index 7f6d9328491..c49649de1e3 100644
--- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h
+++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h
@@ -72,7 +72,7 @@ public:
/**
* Returns the underlying executor. Only used for state explorers.
*/
- const vespalib::SyncableThreadExecutor& get_executor() const { return _executor; }
+ const vespalib::ThreadExecutor& get_executor() const { return _executor; }
/**
* Starts the underlying threads. This will throw a vespalib::Exception if
diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
index 0a24efd0e66..b9cbdab1c0a 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
@@ -40,7 +40,7 @@ WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig,
_warmupDone(warmupDone),
_warmupEndTime(vespalib::steady_clock::now() + warmupConfig.getDuration()),
_handledTerms(std::make_unique<FieldTermMap>()),
- _pendingTasks(0)
+ _pendingTasks()
{
if (_next->valid()) {
setCurrentIndex(_next->getCurrentIndex());
@@ -81,7 +81,7 @@ WarmupIndexCollection::~WarmupIndexCollection()
if (_warmupEndTime != vespalib::steady_time()) {
LOG(info, "Warmup aborted due to new state change or application shutdown");
}
- assert(_pendingTasks == 0);
+ assert(_pendingTasks.has_zero_ref_count());
}
const ISourceSelector &
@@ -220,23 +220,19 @@ WarmupIndexCollection::getSearchableSP(uint32_t i) const
void
WarmupIndexCollection::drainPending() {
- while (_pendingTasks > 0) {
- std::this_thread::sleep_for(1ms);
- }
+ _pendingTasks.waitForZeroRefCount();
}
WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup)
: _warmup(std::move(warmup)),
+ _retainGuard(_warmup->_pendingTasks),
_matchData(std::move(md)),
_bluePrint(),
_requestContext()
{
- _warmup->_pendingTasks++;
}
-WarmupIndexCollection::WarmupTask::~WarmupTask() {
- _warmup->_pendingTasks--;
-}
+WarmupIndexCollection::WarmupTask::~WarmupTask() = default;
void
WarmupIndexCollection::WarmupTask::run()
diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
index 5292fb36298..b0b2952bee8 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
@@ -5,6 +5,8 @@
#include "isearchableindexcollection.h"
#include "warmupconfig.h"
#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <vespa/searchlib/queryeval/fake_requestcontext.h>
namespace searchcorespi {
@@ -85,9 +87,10 @@ private:
private:
void run() override;
std::shared_ptr<WarmupIndexCollection> _warmup;
- std::unique_ptr<MatchData> _matchData;
- Blueprint::UP _bluePrint;
- FakeRequestContext _requestContext;
+ vespalib::RetainGuard _retainGuard;
+ std::unique_ptr<MatchData> _matchData;
+ Blueprint::UP _bluePrint;
+ FakeRequestContext _requestContext;
};
void fireWarmup(Task::UP task);
@@ -102,7 +105,7 @@ private:
vespalib::steady_time _warmupEndTime;
std::mutex _lock;
std::unique_ptr<FieldTermMap> _handledTerms;
- std::atomic<uint64_t> _pendingTasks;
+ vespalib::MonitoredRefCount _pendingTasks;
};
} // namespace searchcorespi
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 954a63978f3..7c4711b6802 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -167,7 +167,7 @@ SequencedTaskExecutor::getExecutorIdImPerfect(uint64_t componentId) const {
return ExecutorId(executorId);
}
-const vespalib::SyncableThreadExecutor*
+const vespalib::ThreadExecutor*
SequencedTaskExecutor::first_executor() const
{
if (_executors.empty()) {
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 7bb56424849..db0723d16c8 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -8,6 +8,7 @@
namespace vespalib {
+class ThreadExecutor;
class SyncableThreadExecutor;
/**
@@ -41,7 +42,7 @@ public:
*/
uint32_t getComponentHashSize() const { return _component2IdImperfect.size(); }
uint32_t getComponentEffectiveHashSize() const { return _nextId; }
- const vespalib::SyncableThreadExecutor* first_executor() const;
+ const vespalib::ThreadExecutor* first_executor() const;
private:
explicit SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executor);