summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-11-15 11:16:15 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-11-15 11:16:15 +0000
commitdf3df03bdd7793f3a0974b11e7de84a361f178cd (patch)
tree2313005a572663048b7d596bb5f61adab046908a /searchcore/src
parent363603705abd5d0204531f1ac8e5eb84338c120f (diff)
Use ClockSteady.
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp41
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h12
4 files changed, 29 insertions, 55 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp
index f3b41c745b6..eb3bec35118 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp
@@ -19,9 +19,7 @@ ReprocessDocumentsTask(IReprocessingInitializer &initializer,
_visitorProgress(0.0),
_visitorCost(0.0),
_handler(docIdLimit),
- _startTime(0),
- _loggedProgress(0.0),
- _loggedTime(0)
+ _loggedProgress(0.0)
{
initializer.initialize(_handler);
if (_handler.hasProcessors()) {
@@ -30,55 +28,40 @@ ReprocessDocumentsTask(IReprocessingInitializer &initializer,
}
}
-
void
ReprocessDocumentsTask::run()
{
if (_handler.hasProcessors()) {
- EventLogger::reprocessDocumentsStart(_subDbName,
- _visitorCost);
- fastos::TimeStamp ts(fastos::ClockSystem::now());
- _startTime = ts.ms();
- _loggedTime = _startTime;
+ EventLogger::reprocessDocumentsStart(_subDbName,_visitorCost);
+ _stopWatch.start();
search::IDocumentStore &docstore = _sm->getBackingStore();
if (_handler.hasRewriters()) {
- docstore.accept(_handler.getRewriteVisitor(),
- *this,
- *_docTypeRepo);
+ docstore.accept(_handler.getRewriteVisitor(),*this,*_docTypeRepo);
} else {
- docstore.accept(_handler,
- *this,
- *_docTypeRepo);
+ docstore.accept(_handler,*this,*_docTypeRepo);
}
_handler.done();
- ts = fastos::ClockSystem::now();
- int64_t elapsedTime = ts.ms() - _startTime;
- EventLogger::reprocessDocumentsComplete(_subDbName,
- _visitorCost,
- elapsedTime);
+ _stopWatch.stop();
+ EventLogger::reprocessDocumentsComplete(_subDbName,_visitorCost, _stopWatch.elapsed().ms());
}
}
-
void
ReprocessDocumentsTask::updateProgress(double progress)
{
_visitorProgress = progress;
double deltaProgress = progress - _loggedProgress;
if (deltaProgress >= 0.01) {
- fastos::TimeStamp ts = fastos::ClockSystem::now();
- int64_t logDelayTime = ts.ms() - _loggedTime;
- if (logDelayTime >= 60000 || deltaProgress >= 0.10) {
- EventLogger::reprocessDocumentsProgress(_subDbName,
- progress,
- _visitorCost);
- _loggedTime = ts.ms();
+ fastos::StopWatch intermediate = _stopWatch;
+ fastos::TimeStamp logDelayTime = intermediate.stop().elapsed() - _stopWatch.elapsed();
+ if (logDelayTime.ms() >= 60000 || deltaProgress >= 0.10) {
+ EventLogger::reprocessDocumentsProgress(_subDbName, progress,_visitorCost);
+ _stopWatch.stop();
_loggedProgress = progress;
}
}
}
-
IReprocessingTask::Progress
ReprocessDocumentsTask::getProgress() const
{
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h
index 8d0f54965a8..f64eddd29ee 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.h
@@ -26,9 +26,8 @@ class ReprocessDocumentsTask : public IReprocessingTask,
double _visitorProgress;
double _visitorCost;
DocumentReprocessingHandler _handler;
- int64_t _startTime;
+ fastos::StopWatch _stopWatch;
double _loggedProgress;
- int64_t _loggedTime;
public:
ReprocessDocumentsTask(IReprocessingInitializer &initializer,
@@ -37,13 +36,9 @@ public:
const vespalib::string &subDbName,
uint32_t docIdLimit);
- virtual void
- run() override;
-
- virtual void
- updateProgress(double progress) override;
-
- virtual Progress getProgress() const override;
+ void run() override;
+ void updateProgress(double progress) override;
+ Progress getProgress() const override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
index fca49a52553..84edd2b7512 100644
--- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
@@ -13,6 +13,7 @@ LOG_SETUP(".proton.server.rtchooks");
using namespace vespalib;
using vespalib::compression::CompressionConfig;
+using fastos::TimeStamp;
namespace {
@@ -34,12 +35,12 @@ Pair::~Pair() = default;
namespace proton {
void
-RPCHooksBase::checkState(StateArg::UP arg)
+RPCHooksBase::checkState(std::unique_ptr<StateArg> arg)
{
- fastos::TimeStamp now(fastos::ClockSystem::now());
+ TimeStamp now(fastos::ClockSteady::now());
if (now < arg->_dueTime) {
std::unique_lock<std::mutex> guard(_stateLock);
- if (_stateCond.wait_for(guard, std::chrono::milliseconds(std::min(INT64_C(1000), (arg->_dueTime - now)/fastos::TimeStamp::MS))) == std::cv_status::no_timeout) {
+ if (_stateCond.wait_for(guard, std::chrono::milliseconds(std::min(INT64_C(1000), (arg->_dueTime - now)/TimeStamp::MS))) == std::cv_status::no_timeout) {
LOG(debug, "state has changed");
reportState(*arg->_session, arg->_req);
arg->_req->Return();
@@ -108,8 +109,7 @@ RPCHooksBase::reportState(Session & session, FRT_RPCRequest * req)
}
RPCHooksBase::Session::Session()
- : _createTime(fastos::ClockSystem::now()),
- _numDocs(0u),
+ : _numDocs(0u),
_delayedConfigs(),
_gen(-1),
_down(false)
@@ -284,8 +284,8 @@ RPCHooksBase::rpc_GetState(FRT_RPCRequest *req)
if (sharedSession->getGen() < 0 || sharedSession->getNumDocs() != numDocs) { // NB Should use something else to define generation.
reportState(*sharedSession, req);
} else {
- fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS);
- StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime));
+ TimeStamp dueTime(fastos::ClockSteady::now() + TimeStamp(timeoutMS * TimeStamp::MS));
+ auto stateArg = std::make_unique<StateArg>(sharedSession, req, dueTime);
if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) {
reportState(*sharedSession, req);
req->Return();
@@ -349,8 +349,8 @@ RPCHooksBase::rpc_getIncrementalState(FRT_RPCRequest *req)
if (sharedSession->getGen() < 0 || sharedSession->getNumDocs() != numDocs) { // NB Should use something else to define generation.
reportState(*sharedSession, req);
} else {
- fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS);
- StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime));
+ TimeStamp dueTime(fastos::ClockSteady::now() + TimeStamp(timeoutMS * TimeStamp::MS));
+ auto stateArg = std::make_unique<StateArg>(sharedSession, req, dueTime);
if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) {
reportState(*sharedSession, req);
req->Return();
diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h
index d8dc3d5f4db..c694647491b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h
+++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h
@@ -25,7 +25,6 @@ private:
class Session {
private:
- fastos::TimeStamp _createTime;
int64_t _numDocs;
vespalib::string _delayedConfigs;
int64_t _gen;
@@ -34,12 +33,10 @@ private:
typedef std::shared_ptr<Session> SP;
Session();
int64_t getGen() const { return _gen; }
- fastos::TimeStamp getCreateTime() const { return _createTime; }
Session & setGen(int64_t gen) { _gen = gen; return *this; }
int64_t getNumDocs() const { return _numDocs; }
void setNumDocs(int64_t numDocs) { _numDocs = numDocs; }
- bool getDown() const { return _down; }
void setDown() { _down = true; }
const vespalib::string & getDelayedConfigs() const {
@@ -52,14 +49,13 @@ private:
};
struct StateArg {
- typedef std::unique_ptr<StateArg> UP;
StateArg(Session::SP session, FRT_RPCRequest * req, fastos::TimeStamp dueTime) :
- _session(session),
+ _session(std::move(session)),
_req(req),
_dueTime(dueTime)
{ }
- Session::SP _session;
- FRT_RPCRequest * _req;
+ Session::SP _session;
+ FRT_RPCRequest * _req;
fastos::TimeStamp _dueTime;
};
@@ -78,7 +74,7 @@ private:
void triggerFlush(FRT_RPCRequest *req);
void prepareRestart(FRT_RPCRequest *req);
- void checkState(StateArg::UP arg);
+ void checkState(std::unique_ptr<StateArg> arg);
void reportState(Session & session, FRT_RPCRequest * req) __attribute__((noinline));
void getProtonStatus(FRT_RPCRequest * req);
void getDocsums(FRT_RPCRequest *req);