summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-07-17 10:48:54 +0200
committerHenning Baldersheim <balder@oath.com>2018-07-17 10:50:24 +0200
commite197b746a131564eeb45370a88b337b2ac0dd191 (patch)
tree05a74e2391833982efdda9019e1d31f76048ad6e
parent980885223d1953260f0f2ea6f608fa5b77413d0f (diff)
- Use std::make_unique/shared.
- Unify alignment - deduplicate code. - Remove unused code. - NULL -> nullptr.
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp117
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h20
2 files changed, 47 insertions, 90 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 95b3008985b..474227ccd2a 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -22,8 +22,7 @@ namespace proton {
namespace {
search::SerialNum
-findOldestFlushedSerial(const IFlushTarget::List &lst,
- const IFlushHandler &handler)
+findOldestFlushedSerial(const IFlushTarget::List &lst, const IFlushHandler &handler)
{
search::SerialNum ret(handler.getCurrentSerialNumber());
for (const IFlushTarget::SP & target : lst) {
@@ -33,36 +32,40 @@ findOldestFlushedSerial(const IFlushTarget::List &lst,
return ret;
}
+void
+logTarget(const char * text, const FlushContext & ctx) {
+ LOG(debug, "Target '%s' %s flush of transactions %" PRIu64 " through %" PRIu64 ".",
+ ctx.getName().c_str(), text,
+ ctx.getTarget()->getFlushedSerialNum() + 1,
+ ctx.getHandler()->getCurrentSerialNumber());
+}
+
}
-FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id) :
- _name(name),
- _start(start),
- _id(id)
+FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id)
+ : _name(name),
+ _start(start),
+ _id(id)
{ }
-FlushEngine::FlushMeta::~FlushMeta() { }
+FlushEngine::FlushMeta::~FlushMeta() = default;
-FlushEngine::FlushInfo::FlushInfo() :
- FlushMeta("", fastos::ClockSystem::now(), 0),
- _target()
+FlushEngine::FlushInfo::FlushInfo()
+ : FlushMeta("", fastos::ClockSystem::now(), 0),
+ _target()
{
}
-FlushEngine::FlushInfo::~FlushInfo() { }
+FlushEngine::FlushInfo::~FlushInfo() = default;
-FlushEngine::FlushInfo::FlushInfo(uint32_t taskId,
- const IFlushTarget::SP &target,
- const vespalib::string & destination) :
- FlushMeta(destination, fastos::ClockSystem::now(), taskId),
- _target(target)
+FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const IFlushTarget::SP &target, const vespalib::string & destination)
+ : FlushMeta(destination, fastos::ClockSystem::now(), taskId),
+ _target(target)
{
}
-FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
- tlsStatsFactory,
- IFlushStrategy::SP strategy, uint32_t numThreads,
- uint32_t idleIntervalMS)
+FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory,
+ IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS)
: _closed(false),
_maxConcurrent(numThreads),
_idleIntervalMS(idleIntervalMS),
@@ -80,9 +83,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
_strategyCond(),
_tlsStatsFactory(tlsStatsFactory),
_pendingPrune()
-{
- // empty
-}
+{ }
FlushEngine::~FlushEngine()
{
@@ -92,7 +93,7 @@ FlushEngine::~FlushEngine()
FlushEngine &
FlushEngine::start()
{
- if (_threadPool.NewThread(this) == NULL) {
+ if (_threadPool.NewThread(this) == nullptr) {
throw vespalib::IllegalStateException("Failed to start engine thread.");
}
return *this;
@@ -148,10 +149,8 @@ FlushEngine::wait(size_t minimumWaitTimeIfReady)
}
void
-FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg)
+FlushEngine::Run(FastOS_ThreadInterface *, void *)
{
- (void)thread;
- (void)arg;
bool shouldIdle = false;
vespalib::string prevFlushName;
while (wait(shouldIdle ? _idleIntervalMS : 0)) {
@@ -167,7 +166,8 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg)
} else {
shouldIdle = true;
}
- LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str());
+ LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'",
+ shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str());
}
_executor.sync();
prune();
@@ -211,18 +211,16 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const
for (const auto & it : _handlers) {
IFlushHandler & handler(*it.second);
search::SerialNum serial(handler.getCurrentSerialNumber());
- LOG(spam, "Checking FlushHandler '%s' current serial = %ld",
- handler.getName().c_str(), serial);
+ LOG(spam, "Checking FlushHandler '%s' current serial = %ld", handler.getName().c_str(), serial);
IFlushTarget::List lst = handler.getFlushTargets();
for (const IFlushTarget::SP & target : lst) {
- LOG(spam, "Checking target '%s' with flushedSerialNum = %ld", target->getName().c_str(), target->getFlushedSerialNum());
+ LOG(spam, "Checking target '%s' with flushedSerialNum = %ld",
+ target->getName().c_str(), target->getFlushedSerialNum());
if (!isFlushing(guard, FlushContext::createName(handler, *target)) || includeFlushingTargets) {
- ret.push_back(FlushContext::SP(new FlushContext(it.second,
- IFlushTarget::SP(new CachedFlushTarget(target)),
- serial)));
+ ret.push_back(std::make_shared<FlushContext>(it.second, std::make_shared<CachedFlushTarget>(target), serial));
} else {
LOG(debug, "Target '%s' with flushedSerialNum = %ld already has a flush going. Local last serial = %ld.",
- target->getName().c_str(), target->getFlushedSerialNum(), serial);
+ target->getName().c_str(), target->getFlushedSerialNum(), serial);
}
}
}
@@ -258,17 +256,12 @@ FlushEngine::initNextFlush(const FlushContext::List &lst)
break;
}
}
- if (ctx.get() != NULL) {
- LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".",
- ctx->getName().c_str(),
- ctx->getTarget()->getFlushedSerialNum() + 1,
- ctx->getHandler()->getCurrentSerialNumber());
+ if (ctx) {
+ logTarget("initiated", *ctx);
}
return ctx;
}
-
-
void
FlushEngine::flushAll(const FlushContext::List &lst)
{
@@ -276,19 +269,12 @@ FlushEngine::flushAll(const FlushContext::List &lst)
for (const FlushContext::SP & ctx : lst) {
if (wait(0)) {
if (ctx->initFlush()) {
- LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".",
- ctx->getName().c_str(),
- ctx->getTarget()->getFlushedSerialNum() + 1,
- ctx->getHandler()->getCurrentSerialNumber());
- _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx)));
+ logTarget("initiated", *ctx);
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
} else {
- LOG(debug, "Target '%s' failed to initiate flush of transactions %" PRIu64 " through %" PRIu64 ".",
- ctx->getName().c_str(),
- ctx->getTarget()->getFlushedSerialNum() + 1,
- ctx->getHandler()->getCurrentSerialNumber());
+ logTarget("failed to initiate", *ctx);
}
}
-
}
}
@@ -311,7 +297,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name)
return "";
}
FlushContext::SP ctx = initNextFlush(lst.first);
- if (ctx.get() == NULL) {
+ if ( ! ctx) {
LOG(debug, "All targets refused to flush.");
return "";
}
@@ -321,7 +307,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name)
name.c_str(), lst.first.size());
FastOS_Thread::Sleep(1000);
}
- _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx)));
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
return ctx->getName();
}
@@ -330,12 +316,8 @@ FlushEngine::initFlush(const FlushContext &ctx)
{
if (LOG_WOULD_LOG(event)) {
IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain());
- EventLogger::flushStart(ctx.getName(),
- mgain.getBefore(),
- mgain.getAfter(),
- mgain.gain(),
- ctx.getTarget()->getFlushedSerialNum() + 1,
- ctx.getHandler()->getCurrentSerialNumber());
+ EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(),
+ ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber());
}
return initFlush(ctx.getHandler(), ctx.getTarget());
}
@@ -350,10 +332,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
}
if (LOG_WOULD_LOG(event)) {
FlushStats stats = ctx.getTarget()->getLastFlushStats();
- EventLogger::flushComplete(ctx.getName(),
- duration.ms(),
- stats.getPath(),
- stats.getPathElementsToLog());
+ EventLogger::flushComplete(ctx.getName(), duration.ms(), stats.getPath(), stats.getPathElementsToLog());
}
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec());
std::lock_guard<std::mutex> guard(_lock);
@@ -366,8 +345,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
}
IFlushHandler::SP
-FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
- const IFlushHandler::SP &flushHandler)
+FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler)
{
std::lock_guard<std::mutex> guard(_lock);
IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler));
@@ -379,13 +357,6 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
}
IFlushHandler::SP
-FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const
-{
- std::lock_guard<std::mutex> guard(_lock);
- return _handlers.getHandler(docTypeName);
-}
-
-IFlushHandler::SP
FlushEngine::removeFlushHandler(const DocTypeName &docTypeName)
{
std::lock_guard<std::mutex> guard(_lock);
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 19175f9ce2a..b9ab3f802cb 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -96,8 +96,7 @@ public:
* @param numThreads The number of worker threads to use.
* @param idleInterval The interval between when flushes are checked whne there are no one progressing.
*/
- FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
- tlsStatsFactory,
+ FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory,
IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS);
/**
@@ -145,19 +144,8 @@ public:
* @param flushHandler The handler to register.
* @return The replaced handler, if any.
*/
- IFlushHandler::SP
- putFlushHandler(const DocTypeName &docTypeName,
- const IFlushHandler::SP &flushHandler);
+ IFlushHandler::SP putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler);
- /**
- * Returns the flush handler for the given document type. If no handler was
- * registered, this method returns an empty shared pointer.
- *
- * @param docType The document type whose handler to return.
- * @return The registered handler, if any.
- */
- IFlushHandler::SP
- getFlushHandler(const DocTypeName &docTypeName) const;
/**
* Removes and returns the flush handler for the given document type. If no
@@ -166,10 +154,8 @@ public:
* @param docType The document type whose handler to remove.
* @return The removed handler, if any.
*/
- IFlushHandler::SP
- removeFlushHandler(const DocTypeName &docTypeName);
+ IFlushHandler::SP removeFlushHandler(const DocTypeName &docTypeName);
- // Implements FastOS_Runnable.
void Run(FastOS_ThreadInterface *thread, void *arg) override;
FlushMetaSet getCurrentlyFlushingSet() const;