diff options
author | Henning Baldersheim <balder@oath.com> | 2018-07-17 10:48:54 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-07-17 10:50:24 +0200 |
commit | e197b746a131564eeb45370a88b337b2ac0dd191 (patch) | |
tree | 05a74e2391833982efdda9019e1d31f76048ad6e /searchcore | |
parent | 980885223d1953260f0f2ea6f608fa5b77413d0f (diff) |
- Use std::make_unique/shared.
- Unify alignment
- deduplicate code.
- Remove unused code.
- NULL -> nullptr.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp | 117 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h | 20 |
2 files changed, 47 insertions, 90 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 95b3008985b..474227ccd2a 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -22,8 +22,7 @@ namespace proton { namespace { search::SerialNum -findOldestFlushedSerial(const IFlushTarget::List &lst, - const IFlushHandler &handler) +findOldestFlushedSerial(const IFlushTarget::List &lst, const IFlushHandler &handler) { search::SerialNum ret(handler.getCurrentSerialNumber()); for (const IFlushTarget::SP & target : lst) { @@ -33,36 +32,40 @@ findOldestFlushedSerial(const IFlushTarget::List &lst, return ret; } +void +logTarget(const char * text, const FlushContext & ctx) { + LOG(debug, "Target '%s' %s flush of transactions %" PRIu64 " through %" PRIu64 ".", + ctx.getName().c_str(), text, + ctx.getTarget()->getFlushedSerialNum() + 1, + ctx.getHandler()->getCurrentSerialNumber()); +} + } -FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id) : - _name(name), - _start(start), - _id(id) +FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, fastos::TimeStamp start, uint32_t id) + : _name(name), + _start(start), + _id(id) { } -FlushEngine::FlushMeta::~FlushMeta() { } +FlushEngine::FlushMeta::~FlushMeta() = default; -FlushEngine::FlushInfo::FlushInfo() : - FlushMeta("", fastos::ClockSystem::now(), 0), - _target() +FlushEngine::FlushInfo::FlushInfo() + : FlushMeta("", fastos::ClockSystem::now(), 0), + _target() { } -FlushEngine::FlushInfo::~FlushInfo() { } +FlushEngine::FlushInfo::~FlushInfo() = default; -FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, - const IFlushTarget::SP &target, - const vespalib::string & destination) : - FlushMeta(destination, fastos::ClockSystem::now(), taskId), - _target(target) +FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const IFlushTarget::SP &target, const vespalib::string & destination) + : FlushMeta(destination, fastos::ClockSystem::now(), taskId), + _target(target) { } -FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> - tlsStatsFactory, - IFlushStrategy::SP strategy, uint32_t numThreads, - uint32_t idleIntervalMS) +FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory, + IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS) : _closed(false), _maxConcurrent(numThreads), _idleIntervalMS(idleIntervalMS), @@ -80,9 +83,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> _strategyCond(), _tlsStatsFactory(tlsStatsFactory), _pendingPrune() -{ - // empty -} +{ } FlushEngine::~FlushEngine() { @@ -92,7 +93,7 @@ FlushEngine::~FlushEngine() FlushEngine & FlushEngine::start() { - if (_threadPool.NewThread(this) == NULL) { + if (_threadPool.NewThread(this) == nullptr) { throw vespalib::IllegalStateException("Failed to start engine thread."); } return *this; @@ -148,10 +149,8 @@ FlushEngine::wait(size_t minimumWaitTimeIfReady) } void -FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg) +FlushEngine::Run(FastOS_ThreadInterface *, void *) { - (void)thread; - (void)arg; bool shouldIdle = false; vespalib::string prevFlushName; while (wait(shouldIdle ? _idleIntervalMS : 0)) { @@ -167,7 +166,8 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg) } else { shouldIdle = true; } - LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str()); + LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", + shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str()); } _executor.sync(); prune(); @@ -211,18 +211,16 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const for (const auto & it : _handlers) { IFlushHandler & handler(*it.second); search::SerialNum serial(handler.getCurrentSerialNumber()); - LOG(spam, "Checking FlushHandler '%s' current serial = %ld", - handler.getName().c_str(), serial); + LOG(spam, "Checking FlushHandler '%s' current serial = %ld", handler.getName().c_str(), serial); IFlushTarget::List lst = handler.getFlushTargets(); for (const IFlushTarget::SP & target : lst) { - LOG(spam, "Checking target '%s' with flushedSerialNum = %ld", target->getName().c_str(), target->getFlushedSerialNum()); + LOG(spam, "Checking target '%s' with flushedSerialNum = %ld", + target->getName().c_str(), target->getFlushedSerialNum()); if (!isFlushing(guard, FlushContext::createName(handler, *target)) || includeFlushingTargets) { - ret.push_back(FlushContext::SP(new FlushContext(it.second, - IFlushTarget::SP(new CachedFlushTarget(target)), - serial))); + ret.push_back(std::make_shared<FlushContext>(it.second, std::make_shared<CachedFlushTarget>(target), serial)); } else { LOG(debug, "Target '%s' with flushedSerialNum = %ld already has a flush going. Local last serial = %ld.", - target->getName().c_str(), target->getFlushedSerialNum(), serial); + target->getName().c_str(), target->getFlushedSerialNum(), serial); } } } @@ -258,17 +256,12 @@ FlushEngine::initNextFlush(const FlushContext::List &lst) break; } } - if (ctx.get() != NULL) { - LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".", - ctx->getName().c_str(), - ctx->getTarget()->getFlushedSerialNum() + 1, - ctx->getHandler()->getCurrentSerialNumber()); + if (ctx) { + logTarget("initiated", *ctx); } return ctx; } - - void FlushEngine::flushAll(const FlushContext::List &lst) { @@ -276,19 +269,12 @@ FlushEngine::flushAll(const FlushContext::List &lst) for (const FlushContext::SP & ctx : lst) { if (wait(0)) { if (ctx->initFlush()) { - LOG(debug, "Target '%s' initiated flush of transactions %" PRIu64 " through %" PRIu64 ".", - ctx->getName().c_str(), - ctx->getTarget()->getFlushedSerialNum() + 1, - ctx->getHandler()->getCurrentSerialNumber()); - _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx))); + logTarget("initiated", *ctx); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); } else { - LOG(debug, "Target '%s' failed to initiate flush of transactions %" PRIu64 " through %" PRIu64 ".", - ctx->getName().c_str(), - ctx->getTarget()->getFlushedSerialNum() + 1, - ctx->getHandler()->getCurrentSerialNumber()); + logTarget("failed to initiate", *ctx); } } - } } @@ -311,7 +297,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name) return ""; } FlushContext::SP ctx = initNextFlush(lst.first); - if (ctx.get() == NULL) { + if ( ! ctx) { LOG(debug, "All targets refused to flush."); return ""; } @@ -321,7 +307,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name) name.c_str(), lst.first.size()); FastOS_Thread::Sleep(1000); } - _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx))); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); return ctx->getName(); } @@ -330,12 +316,8 @@ FlushEngine::initFlush(const FlushContext &ctx) { if (LOG_WOULD_LOG(event)) { IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain()); - EventLogger::flushStart(ctx.getName(), - mgain.getBefore(), - mgain.getAfter(), - mgain.gain(), - ctx.getTarget()->getFlushedSerialNum() + 1, - ctx.getHandler()->getCurrentSerialNumber()); + EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(), + ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber()); } return initFlush(ctx.getHandler(), ctx.getTarget()); } @@ -350,10 +332,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) } if (LOG_WOULD_LOG(event)) { FlushStats stats = ctx.getTarget()->getLastFlushStats(); - EventLogger::flushComplete(ctx.getName(), - duration.ms(), - stats.getPath(), - stats.getPathElementsToLog()); + EventLogger::flushComplete(ctx.getName(), duration.ms(), stats.getPath(), stats.getPathElementsToLog()); } LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec()); std::lock_guard<std::mutex> guard(_lock); @@ -366,8 +345,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) } IFlushHandler::SP -FlushEngine::putFlushHandler(const DocTypeName &docTypeName, - const IFlushHandler::SP &flushHandler) +FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler) { std::lock_guard<std::mutex> guard(_lock); IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler)); @@ -379,13 +357,6 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, } IFlushHandler::SP -FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const -{ - std::lock_guard<std::mutex> guard(_lock); - return _handlers.getHandler(docTypeName); -} - -IFlushHandler::SP FlushEngine::removeFlushHandler(const DocTypeName &docTypeName) { std::lock_guard<std::mutex> guard(_lock); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 19175f9ce2a..b9ab3f802cb 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -96,8 +96,7 @@ public: * @param numThreads The number of worker threads to use. * @param idleInterval The interval between when flushes are checked whne there are no one progressing. */ - FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> - tlsStatsFactory, + FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory, IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS); /** @@ -145,19 +144,8 @@ public: * @param flushHandler The handler to register. * @return The replaced handler, if any. */ - IFlushHandler::SP - putFlushHandler(const DocTypeName &docTypeName, - const IFlushHandler::SP &flushHandler); + IFlushHandler::SP putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler); - /** - * Returns the flush handler for the given document type. If no handler was - * registered, this method returns an empty shared pointer. - * - * @param docType The document type whose handler to return. - * @return The registered handler, if any. - */ - IFlushHandler::SP - getFlushHandler(const DocTypeName &docTypeName) const; /** * Removes and returns the flush handler for the given document type. If no @@ -166,10 +154,8 @@ public: * @param docType The document type whose handler to remove. * @return The removed handler, if any. */ - IFlushHandler::SP - removeFlushHandler(const DocTypeName &docTypeName); + IFlushHandler::SP removeFlushHandler(const DocTypeName &docTypeName); - // Implements FastOS_Runnable. void Run(FastOS_ThreadInterface *thread, void *arg) override; FlushMetaSet getCurrentlyFlushingSet() const; |