summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2017-10-26 21:22:53 +0200
committerTor Egge <Tor.Egge@oath.com>2017-10-27 08:50:13 +0000
commitb5650cbdce83b661b92efb1797b8396371273671 (patch)
tree09f94b83f49162b7b1e87da0d947b8b143ec3a58 /searchcore
parentfaef2c4997d45789a43eb3b8c8ed002fce20288a (diff)
Use std::unique_lock<std::mutex> to guard fnet search.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp152
-rw-r--r--searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h9
2 files changed, 85 insertions, 76 deletions
diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp
index 6beaf22a238..0cfbdc8b69a 100644
--- a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp
+++ b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp
@@ -597,7 +597,8 @@ void
FastS_FNET_Search::GotQueryResult(FastS_FNET_SearchNode *node,
FS4Packet_QUERYRESULTX *qrx)
{
- if (!BeginFNETWork()) {
+ auto searchGuard(BeginFNETWork());
+ if (!searchGuard) {
qrx->Free();
return;
}
@@ -621,14 +622,15 @@ FastS_FNET_Search::GotQueryResult(FastS_FNET_SearchNode *node,
} else {
qrx->Free();
}
- EndFNETWork();
+ EndFNETWork(std::move(searchGuard));
}
void
FastS_FNET_Search::GotDocsum(FastS_FNET_SearchNode *node,
FS4Packet_DOCSUM *docsum)
{
- if (!BeginFNETWork()) {
+ auto searchGuard(BeginFNETWork());
+ if (!searchGuard) {
docsum->Free();
return;
}
@@ -649,14 +651,16 @@ FastS_FNET_Search::GotDocsum(FastS_FNET_SearchNode *node,
adjustDocsumTimeout();
}
docsum->Free();
- EndFNETWork();
+ EndFNETWork(std::move(searchGuard));
}
void
FastS_FNET_Search::LostSearchNode(FastS_FNET_SearchNode *node)
{
- if (!BeginFNETWork())
+ auto searchGuard(BeginFNETWork());
+ if (!searchGuard) {
return;
+ }
if (_FNET_mode == FNET_QUERY && node->_flags._pendingQuery) {
FastS_assert(_pendingQueries > 0);
@@ -672,15 +676,17 @@ FastS_FNET_Search::LostSearchNode(FastS_FNET_SearchNode *node)
_pendingDocsumNodes--;
adjustDocsumTimeout();
}
- EndFNETWork();
+ EndFNETWork(std::move(searchGuard));
}
void
FastS_FNET_Search::GotEOL(FastS_FNET_SearchNode *node)
{
- if (!BeginFNETWork())
+ auto searchGuard(BeginFNETWork());
+ if (!searchGuard) {
return;
+ }
LOG(spam, "Got EOL from row(%d), part(%d) = pendingQ(%d) pendingDocsum(%d)", node->GetRowID(), node->getPartID(), node->_flags._pendingQuery, node->_pendingDocsums);
if (_FNET_mode == FNET_QUERY && node->_flags._pendingQuery) {
@@ -697,7 +703,7 @@ FastS_FNET_Search::GotEOL(FastS_FNET_SearchNode *node)
_pendingDocsumNodes--;
adjustDocsumTimeout();
}
- EndFNETWork();
+ EndFNETWork(std::move(searchGuard));
}
@@ -705,7 +711,8 @@ void
FastS_FNET_Search::GotError(FastS_FNET_SearchNode *node,
FS4Packet_ERROR *error)
{
- if (!BeginFNETWork()) {
+ auto searchGuard(BeginFNETWork());
+ if (!searchGuard) {
error->Free();
return;
}
@@ -740,15 +747,17 @@ FastS_FNET_Search::GotError(FastS_FNET_SearchNode *node,
adjustDocsumTimeout();
}
error->Free();
- EndFNETWork();
+ EndFNETWork(std::move(searchGuard));
}
void
FastS_FNET_Search::HandleTimeout()
{
- if (!BeginFNETWork())
+ auto searchGuard(BeginFNETWork());
+ if (!searchGuard) {
return;
+ }
if (_FNET_mode == FNET_QUERY) {
for (FastS_FNET_SearchNode & node : _nodes) {
@@ -792,32 +801,30 @@ FastS_FNET_Search::HandleTimeout()
}
_docsumTimeout = true;
}
- EndFNETWork();
+ EndFNETWork(std::move(searchGuard));
}
-bool
+std::unique_lock<std::mutex>
FastS_FNET_Search::BeginFNETWork()
{
- Lock();
- if (_FNET_mode != FNET_NONE)
- return true;
- Unlock();
- return false;
+ std::unique_lock<std::mutex> searchGuard(_lock);
+ if (_FNET_mode == FNET_NONE) {
+ searchGuard.unlock();
+ }
+ return searchGuard;
}
void
-FastS_FNET_Search::EndFNETWork()
+FastS_FNET_Search::EndFNETWork(std::unique_lock<std::mutex> searchGuard)
{
if (_FNET_mode == FNET_QUERY && _pendingQueries == 0) {
_FNET_mode = FNET_NONE;
- Unlock();
+ searchGuard.unlock();
_searchOwner->DoneQuery(this, _searchContext);
} else if (_FNET_mode == FNET_DOCSUMS && _pendingDocsums == 0) {
_FNET_mode = FNET_NONE;
- Unlock();
+ searchGuard.unlock();
_searchOwner->DoneDocsums(this, _searchContext);
- } else {
- Unlock();
}
}
@@ -1069,13 +1076,14 @@ FastS_FNET_Search::Search(uint32_t searchOffset,
std::vector<uint32_t> send_failed; // partitions where packet send failed
// allow FNET responses while requests are being sent
- Lock();
- ++_pendingQueries; // add Elephant query node to avoid early query done
- ++_queryNodes; // add Elephant query node to avoid early query done
- _FNET_mode = FNET_QUERY;
- _queryStartTime = GetTimeKeeper()->GetTime();
- _timeout.Schedule(_adjustedQueryTimeOut);
- Unlock();
+ {
+ std::unique_lock<std::mutex> searchGuard(_lock);
+ ++_pendingQueries; // add Elephant query node to avoid early query done
+ ++_queryNodes; // add Elephant query node to avoid early query done
+ _FNET_mode = FNET_QUERY;
+ _queryStartTime = GetTimeKeeper()->GetTime();
+ _timeout.Schedule(_adjustedQueryTimeOut);
+ }
FNET_Packet::SP shared(new FS4Packet_PreSerialized(*setupQueryPacket(hitsPerNode, qflags, _queryArgs->propertiesMap)));
for (uint32_t i = 0; i < _nodes.size(); i++) {
FastS_FNET_SearchNode & node = _nodes[i];
@@ -1092,30 +1100,32 @@ FastS_FNET_Search::Search(uint32_t searchOffset,
}
// finalize setup and check if query is still in progress
- Lock();
- assert(_queryNodes >= _pendingQueries);
- for (uint32_t i: send_failed) {
- // conditional revert of state for failed nodes
- if (_nodes[i]._flags._pendingQuery) {
- _nodes[i]._flags._pendingQuery = false;
- assert(_pendingQueries > 0);
- --_pendingQueries;
- --_queryNodes;
+ bool done;
+ {
+ std::unique_lock<std::mutex> searchGuard(_lock);
+ assert(_queryNodes >= _pendingQueries);
+ for (uint32_t i: send_failed) {
+ // conditional revert of state for failed nodes
+ if (_nodes[i]._flags._pendingQuery) {
+ _nodes[i]._flags._pendingQuery = false;
+ assert(_pendingQueries > 0);
+ --_pendingQueries;
+ --_queryNodes;
+ }
}
- }
- // revert Elephant query node to allow search to complete
- assert(_pendingQueries > 0);
- --_pendingQueries;
- --_queryNodes;
- bool done = (_pendingQueries == 0);
- bool all_down = (num_send_ok == 0);
- if (done) {
- _FNET_mode = FNET_NONE;
- if (all_down) {
- SetError(search::engine::ECODE_ALL_PARTITIONS_DOWN, NULL);
+ // revert Elephant query node to allow search to complete
+ assert(_pendingQueries > 0);
+ --_pendingQueries;
+ --_queryNodes;
+ done = (_pendingQueries == 0);
+ bool all_down = (num_send_ok == 0);
+ if (done) {
+ _FNET_mode = FNET_NONE;
+ if (all_down) {
+ SetError(search::engine::ECODE_ALL_PARTITIONS_DOWN, NULL);
+ }
}
}
- Unlock();
return (done) ? RET_OK : RET_INPROGRESS;
}
@@ -1386,31 +1396,33 @@ FastS_FNET_Search::GetDocsums(const FastS_hitresult *hits, uint32_t hitcnt)
FastS_assert(p == hits + hitcnt);
ConnectDocsumNodes(ignoreRow);
- Lock();
+ bool done;
+ {
+ std::unique_lock<std::mutex> searchGuard(_lock);
- // patch in engine dependent features and send docsum requests
+ // patch in engine dependent features and send docsum requests
- for (FastS_FNET_SearchNode & node : _nodes) {
- if (node._gdx != NULL)
- node.postGDX(&_pendingDocsums, &_docsumNodes);
- for (FastS_FNET_SearchNode::ExtraDocsumNodesIter iter(&node); iter.valid(); ++iter) {
- FastS_FNET_SearchNode *eNode = *iter;
- if (eNode->_gdx != NULL)
- eNode->postGDX(&_pendingDocsums, &_docsumNodes);
+ for (FastS_FNET_SearchNode & node : _nodes) {
+ if (node._gdx != NULL)
+ node.postGDX(&_pendingDocsums, &_docsumNodes);
+ for (FastS_FNET_SearchNode::ExtraDocsumNodesIter iter(&node); iter.valid(); ++iter) {
+ FastS_FNET_SearchNode *eNode = *iter;
+ if (eNode->_gdx != NULL)
+ eNode->postGDX(&_pendingDocsums, &_docsumNodes);
+ }
}
- }
- _pendingDocsumNodes = _docsumNodes;
- _requestedDocsums = _pendingDocsums;
+ _pendingDocsumNodes = _docsumNodes;
+ _requestedDocsums = _pendingDocsums;
- bool done = (_pendingDocsums == 0);
- if (!done) {
- _FNET_mode = FNET_DOCSUMS; // FNET; do your thing
+ done = (_pendingDocsums == 0);
+ if (!done) {
+ _FNET_mode = FNET_DOCSUMS; // FNET; do your thing
- _adjustedDocSumTimeOut = args->getTimeout().sec();
- _docSumStartTime = GetTimeKeeper()->GetTime();
- _timeout.Schedule(_adjustedDocSumTimeOut);
+ _adjustedDocSumTimeOut = args->getTimeout().sec();
+ _docSumStartTime = GetTimeKeeper()->GetTime();
+ _timeout.Schedule(_adjustedDocSumTimeOut);
+ }
}
- Unlock();
return (done) ? RET_OK : RET_INPROGRESS;
}
diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h
index de913bda46f..a19dcff025d 100644
--- a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h
+++ b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h
@@ -214,7 +214,7 @@ public:
};
private:
- FastOS_Mutex _lock;
+ std::mutex _lock;
FastS_TimeKeeper *_timeKeeper;
double _startTime;
Timeout _timeout;
@@ -272,11 +272,8 @@ private:
uint32_t getFixedRowCandidate();
uint32_t getHashedRow() const;
- void Lock() { _lock.Lock(); }
- void Unlock() { _lock.Unlock(); }
-
- bool BeginFNETWork();
- void EndFNETWork();
+ std::unique_lock<std::mutex> BeginFNETWork();
+ void EndFNETWork(std::unique_lock<std::mutex> searchGuard);
void EncodePartIDs(uint32_t partid, uint32_t rowid, bool mld,
FS4Packet_QUERYRESULTX::FS4_hit *pt,