diff options
Diffstat (limited to 'searchcore/src')
4 files changed, 29 insertions, 55 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp index f3b41c745b6..eb3bec35118 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp @@ -19,9 +19,7 @@ ReprocessDocumentsTask(IReprocessingInitializer &initializer, _visitorProgress(0.0), _visitorCost(0.0), _handler(docIdLimit), - _startTime(0), - _loggedProgress(0.0), - _loggedTime(0) + _loggedProgress(0.0) { initializer.initialize(_handler); if (_handler.hasProcessors()) { @@ -30,55 +28,40 @@ ReprocessDocumentsTask(IReprocessingInitializer &initializer, } } - void ReprocessDocumentsTask::run() { if (_handler.hasProcessors()) { - EventLogger::reprocessDocumentsStart(_subDbName, - _visitorCost); - fastos::TimeStamp ts(fastos::ClockSystem::now()); - _startTime = ts.ms(); - _loggedTime = _startTime; + EventLogger::reprocessDocumentsStart(_subDbName,_visitorCost); + _stopWatch.start(); search::IDocumentStore &docstore = _sm->getBackingStore(); if (_handler.hasRewriters()) { - docstore.accept(_handler.getRewriteVisitor(), - *this, - *_docTypeRepo); + docstore.accept(_handler.getRewriteVisitor(),*this,*_docTypeRepo); } else { - docstore.accept(_handler, - *this, - *_docTypeRepo); + docstore.accept(_handler,*this,*_docTypeRepo); } _handler.done(); - ts = fastos::ClockSystem::now(); - int64_t elapsedTime = ts.ms() - _startTime; - EventLogger::reprocessDocumentsComplete(_subDbName, - _visitorCost, - elapsedTime); + _stopWatch.stop(); + EventLogger::reprocessDocumentsComplete(_subDbName,_visitorCost, _stopWatch.elapsed().ms()); } } - void ReprocessDocumentsTask::updateProgress(double progress) { _visitorProgress = progress; double deltaProgress = progress - _loggedProgress; if (deltaProgress >= 0.01) { - fastos::TimeStamp ts = fastos::ClockSystem::now(); - int64_t logDelayTime = ts.ms() - _loggedTime; - if (logDelayTime >= 60000 || deltaProgress >= 0.10) { - EventLogger::reprocessDocumentsProgress(_subDbName, - progress, - _visitorCost); - _loggedTime = ts.ms(); + fastos::StopWatch intermediate = _stopWatch; + fastos::TimeStamp logDelayTime = intermediate.stop().elapsed() - _stopWatch.elapsed(); + if (logDelayTime.ms() >= 60000 || deltaProgress >= 0.10) { + EventLogger::reprocessDocumentsProgress(_subDbName, progress,_visitorCost); + _stopWatch.stop(); _loggedProgress = progress; } } } - IReprocessingTask::Progress ReprocessDocumentsTask::getProgress() const { diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h index 8d0f54965a8..f64eddd29ee 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h @@ -26,9 +26,8 @@ class ReprocessDocumentsTask : public IReprocessingTask, double _visitorProgress; double _visitorCost; DocumentReprocessingHandler _handler; - int64_t _startTime; + fastos::StopWatch _stopWatch; double _loggedProgress; - int64_t _loggedTime; public: ReprocessDocumentsTask(IReprocessingInitializer &initializer, @@ -37,13 +36,9 @@ public: const vespalib::string &subDbName, uint32_t docIdLimit); - virtual void - run() override; - - virtual void - updateProgress(double progress) override; - - virtual Progress getProgress() const override; + void run() override; + void updateProgress(double progress) override; + Progress getProgress() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp index fca49a52553..84edd2b7512 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp @@ -13,6 +13,7 @@ LOG_SETUP(".proton.server.rtchooks"); using namespace vespalib; using vespalib::compression::CompressionConfig; +using fastos::TimeStamp; namespace { @@ -34,12 +35,12 @@ Pair::~Pair() = default; namespace proton { void -RPCHooksBase::checkState(StateArg::UP arg) +RPCHooksBase::checkState(std::unique_ptr<StateArg> arg) { - fastos::TimeStamp now(fastos::ClockSystem::now()); + TimeStamp now(fastos::ClockSteady::now()); if (now < arg->_dueTime) { std::unique_lock<std::mutex> guard(_stateLock); - if (_stateCond.wait_for(guard, std::chrono::milliseconds(std::min(INT64_C(1000), (arg->_dueTime - now)/fastos::TimeStamp::MS))) == std::cv_status::no_timeout) { + if (_stateCond.wait_for(guard, std::chrono::milliseconds(std::min(INT64_C(1000), (arg->_dueTime - now)/TimeStamp::MS))) == std::cv_status::no_timeout) { LOG(debug, "state has changed"); reportState(*arg->_session, arg->_req); arg->_req->Return(); @@ -108,8 +109,7 @@ RPCHooksBase::reportState(Session & session, FRT_RPCRequest * req) } RPCHooksBase::Session::Session() - : _createTime(fastos::ClockSystem::now()), - _numDocs(0u), + : _numDocs(0u), _delayedConfigs(), _gen(-1), _down(false) @@ -284,8 +284,8 @@ RPCHooksBase::rpc_GetState(FRT_RPCRequest *req) if (sharedSession->getGen() < 0 || sharedSession->getNumDocs() != numDocs) { // NB Should use something else to define generation. reportState(*sharedSession, req); } else { - fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS); - StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime)); + TimeStamp dueTime(fastos::ClockSteady::now() + TimeStamp(timeoutMS * TimeStamp::MS)); + auto stateArg = std::make_unique<StateArg>(sharedSession, req, dueTime); if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) { reportState(*sharedSession, req); req->Return(); @@ -349,8 +349,8 @@ RPCHooksBase::rpc_getIncrementalState(FRT_RPCRequest *req) if (sharedSession->getGen() < 0 || sharedSession->getNumDocs() != numDocs) { // NB Should use something else to define generation. reportState(*sharedSession, req); } else { - fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS); - StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime)); + TimeStamp dueTime(fastos::ClockSteady::now() + TimeStamp(timeoutMS * TimeStamp::MS)); + auto stateArg = std::make_unique<StateArg>(sharedSession, req, dueTime); if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) { reportState(*sharedSession, req); req->Return(); diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h index d8dc3d5f4db..c694647491b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h @@ -25,7 +25,6 @@ private: class Session { private: - fastos::TimeStamp _createTime; int64_t _numDocs; vespalib::string _delayedConfigs; int64_t _gen; @@ -34,12 +33,10 @@ private: typedef std::shared_ptr<Session> SP; Session(); int64_t getGen() const { return _gen; } - fastos::TimeStamp getCreateTime() const { return _createTime; } Session & setGen(int64_t gen) { _gen = gen; return *this; } int64_t getNumDocs() const { return _numDocs; } void setNumDocs(int64_t numDocs) { _numDocs = numDocs; } - bool getDown() const { return _down; } void setDown() { _down = true; } const vespalib::string & getDelayedConfigs() const { @@ -52,14 +49,13 @@ private: }; struct StateArg { - typedef std::unique_ptr<StateArg> UP; StateArg(Session::SP session, FRT_RPCRequest * req, fastos::TimeStamp dueTime) : - _session(session), + _session(std::move(session)), _req(req), _dueTime(dueTime) { } - Session::SP _session; - FRT_RPCRequest * _req; + Session::SP _session; + FRT_RPCRequest * _req; fastos::TimeStamp _dueTime; }; @@ -78,7 +74,7 @@ private: void triggerFlush(FRT_RPCRequest *req); void prepareRestart(FRT_RPCRequest *req); - void checkState(StateArg::UP arg); + void checkState(std::unique_ptr<StateArg> arg); void reportState(Session & session, FRT_RPCRequest * req) __attribute__((noinline)); void getProtonStatus(FRT_RPCRequest * req); void getDocsums(FRT_RPCRequest *req); |