diff options
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp | 152 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h | 9 |
2 files changed, 85 insertions, 76 deletions
diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp index 6beaf22a238..0cfbdc8b69a 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp @@ -597,7 +597,8 @@ void FastS_FNET_Search::GotQueryResult(FastS_FNET_SearchNode *node, FS4Packet_QUERYRESULTX *qrx) { - if (!BeginFNETWork()) { + auto searchGuard(BeginFNETWork()); + if (!searchGuard) { qrx->Free(); return; } @@ -621,14 +622,15 @@ FastS_FNET_Search::GotQueryResult(FastS_FNET_SearchNode *node, } else { qrx->Free(); } - EndFNETWork(); + EndFNETWork(std::move(searchGuard)); } void FastS_FNET_Search::GotDocsum(FastS_FNET_SearchNode *node, FS4Packet_DOCSUM *docsum) { - if (!BeginFNETWork()) { + auto searchGuard(BeginFNETWork()); + if (!searchGuard) { docsum->Free(); return; } @@ -649,14 +651,16 @@ FastS_FNET_Search::GotDocsum(FastS_FNET_SearchNode *node, adjustDocsumTimeout(); } docsum->Free(); - EndFNETWork(); + EndFNETWork(std::move(searchGuard)); } void FastS_FNET_Search::LostSearchNode(FastS_FNET_SearchNode *node) { - if (!BeginFNETWork()) + auto searchGuard(BeginFNETWork()); + if (!searchGuard) { return; + } if (_FNET_mode == FNET_QUERY && node->_flags._pendingQuery) { FastS_assert(_pendingQueries > 0); @@ -672,15 +676,17 @@ FastS_FNET_Search::LostSearchNode(FastS_FNET_SearchNode *node) _pendingDocsumNodes--; adjustDocsumTimeout(); } - EndFNETWork(); + EndFNETWork(std::move(searchGuard)); } void FastS_FNET_Search::GotEOL(FastS_FNET_SearchNode *node) { - if (!BeginFNETWork()) + auto searchGuard(BeginFNETWork()); + if (!searchGuard) { return; + } LOG(spam, "Got EOL from row(%d), part(%d) = pendingQ(%d) pendingDocsum(%d)", node->GetRowID(), node->getPartID(), node->_flags._pendingQuery, node->_pendingDocsums); if (_FNET_mode == FNET_QUERY && node->_flags._pendingQuery) { @@ -697,7 +703,7 @@ FastS_FNET_Search::GotEOL(FastS_FNET_SearchNode *node) _pendingDocsumNodes--; adjustDocsumTimeout(); } - EndFNETWork(); + EndFNETWork(std::move(searchGuard)); } @@ -705,7 +711,8 @@ void FastS_FNET_Search::GotError(FastS_FNET_SearchNode *node, FS4Packet_ERROR *error) { - if (!BeginFNETWork()) { + auto searchGuard(BeginFNETWork()); + if (!searchGuard) { error->Free(); return; } @@ -740,15 +747,17 @@ FastS_FNET_Search::GotError(FastS_FNET_SearchNode *node, adjustDocsumTimeout(); } error->Free(); - EndFNETWork(); + EndFNETWork(std::move(searchGuard)); } void FastS_FNET_Search::HandleTimeout() { - if (!BeginFNETWork()) + auto searchGuard(BeginFNETWork()); + if (!searchGuard) { return; + } if (_FNET_mode == FNET_QUERY) { for (FastS_FNET_SearchNode & node : _nodes) { @@ -792,32 +801,30 @@ FastS_FNET_Search::HandleTimeout() } _docsumTimeout = true; } - EndFNETWork(); + EndFNETWork(std::move(searchGuard)); } -bool +std::unique_lock<std::mutex> FastS_FNET_Search::BeginFNETWork() { - Lock(); - if (_FNET_mode != FNET_NONE) - return true; - Unlock(); - return false; + std::unique_lock<std::mutex> searchGuard(_lock); + if (_FNET_mode == FNET_NONE) { + searchGuard.unlock(); + } + return searchGuard; } void -FastS_FNET_Search::EndFNETWork() +FastS_FNET_Search::EndFNETWork(std::unique_lock<std::mutex> searchGuard) { if (_FNET_mode == FNET_QUERY && _pendingQueries == 0) { _FNET_mode = FNET_NONE; - Unlock(); + searchGuard.unlock(); _searchOwner->DoneQuery(this, _searchContext); } else if (_FNET_mode == FNET_DOCSUMS && _pendingDocsums == 0) { _FNET_mode = FNET_NONE; - Unlock(); + searchGuard.unlock(); _searchOwner->DoneDocsums(this, _searchContext); - } else { - Unlock(); } } @@ -1069,13 +1076,14 @@ FastS_FNET_Search::Search(uint32_t searchOffset, std::vector<uint32_t> send_failed; // partitions where packet send failed // allow FNET responses while requests are being sent - Lock(); - ++_pendingQueries; // add Elephant query node to avoid early query done - ++_queryNodes; // add Elephant query node to avoid early query done - _FNET_mode = FNET_QUERY; - _queryStartTime = GetTimeKeeper()->GetTime(); - _timeout.Schedule(_adjustedQueryTimeOut); - Unlock(); + { + std::unique_lock<std::mutex> searchGuard(_lock); + ++_pendingQueries; // add Elephant query node to avoid early query done + ++_queryNodes; // add Elephant query node to avoid early query done + _FNET_mode = FNET_QUERY; + _queryStartTime = GetTimeKeeper()->GetTime(); + _timeout.Schedule(_adjustedQueryTimeOut); + } FNET_Packet::SP shared(new FS4Packet_PreSerialized(*setupQueryPacket(hitsPerNode, qflags, _queryArgs->propertiesMap))); for (uint32_t i = 0; i < _nodes.size(); i++) { FastS_FNET_SearchNode & node = _nodes[i]; @@ -1092,30 +1100,32 @@ FastS_FNET_Search::Search(uint32_t searchOffset, } // finalize setup and check if query is still in progress - Lock(); - assert(_queryNodes >= _pendingQueries); - for (uint32_t i: send_failed) { - // conditional revert of state for failed nodes - if (_nodes[i]._flags._pendingQuery) { - _nodes[i]._flags._pendingQuery = false; - assert(_pendingQueries > 0); - --_pendingQueries; - --_queryNodes; + bool done; + { + std::unique_lock<std::mutex> searchGuard(_lock); + assert(_queryNodes >= _pendingQueries); + for (uint32_t i: send_failed) { + // conditional revert of state for failed nodes + if (_nodes[i]._flags._pendingQuery) { + _nodes[i]._flags._pendingQuery = false; + assert(_pendingQueries > 0); + --_pendingQueries; + --_queryNodes; + } } - } - // revert Elephant query node to allow search to complete - assert(_pendingQueries > 0); - --_pendingQueries; - --_queryNodes; - bool done = (_pendingQueries == 0); - bool all_down = (num_send_ok == 0); - if (done) { - _FNET_mode = FNET_NONE; - if (all_down) { - SetError(search::engine::ECODE_ALL_PARTITIONS_DOWN, NULL); + // revert Elephant query node to allow search to complete + assert(_pendingQueries > 0); + --_pendingQueries; + --_queryNodes; + done = (_pendingQueries == 0); + bool all_down = (num_send_ok == 0); + if (done) { + _FNET_mode = FNET_NONE; + if (all_down) { + SetError(search::engine::ECODE_ALL_PARTITIONS_DOWN, NULL); + } } } - Unlock(); return (done) ? RET_OK : RET_INPROGRESS; } @@ -1386,31 +1396,33 @@ FastS_FNET_Search::GetDocsums(const FastS_hitresult *hits, uint32_t hitcnt) FastS_assert(p == hits + hitcnt); ConnectDocsumNodes(ignoreRow); - Lock(); + bool done; + { + std::unique_lock<std::mutex> searchGuard(_lock); - // patch in engine dependent features and send docsum requests + // patch in engine dependent features and send docsum requests - for (FastS_FNET_SearchNode & node : _nodes) { - if (node._gdx != NULL) - node.postGDX(&_pendingDocsums, &_docsumNodes); - for (FastS_FNET_SearchNode::ExtraDocsumNodesIter iter(&node); iter.valid(); ++iter) { - FastS_FNET_SearchNode *eNode = *iter; - if (eNode->_gdx != NULL) - eNode->postGDX(&_pendingDocsums, &_docsumNodes); + for (FastS_FNET_SearchNode & node : _nodes) { + if (node._gdx != NULL) + node.postGDX(&_pendingDocsums, &_docsumNodes); + for (FastS_FNET_SearchNode::ExtraDocsumNodesIter iter(&node); iter.valid(); ++iter) { + FastS_FNET_SearchNode *eNode = *iter; + if (eNode->_gdx != NULL) + eNode->postGDX(&_pendingDocsums, &_docsumNodes); + } } - } - _pendingDocsumNodes = _docsumNodes; - _requestedDocsums = _pendingDocsums; + _pendingDocsumNodes = _docsumNodes; + _requestedDocsums = _pendingDocsums; - bool done = (_pendingDocsums == 0); - if (!done) { - _FNET_mode = FNET_DOCSUMS; // FNET; do your thing + done = (_pendingDocsums == 0); + if (!done) { + _FNET_mode = FNET_DOCSUMS; // FNET; do your thing - _adjustedDocSumTimeOut = args->getTimeout().sec(); - _docSumStartTime = GetTimeKeeper()->GetTime(); - _timeout.Schedule(_adjustedDocSumTimeOut); + _adjustedDocSumTimeOut = args->getTimeout().sec(); + _docSumStartTime = GetTimeKeeper()->GetTime(); + _timeout.Schedule(_adjustedDocSumTimeOut); + } } - Unlock(); return (done) ? RET_OK : RET_INPROGRESS; } diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h index de913bda46f..a19dcff025d 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h +++ b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h @@ -214,7 +214,7 @@ public: }; private: - FastOS_Mutex _lock; + std::mutex _lock; FastS_TimeKeeper *_timeKeeper; double _startTime; Timeout _timeout; @@ -272,11 +272,8 @@ private: uint32_t getFixedRowCandidate(); uint32_t getHashedRow() const; - void Lock() { _lock.Lock(); } - void Unlock() { _lock.Unlock(); } - - bool BeginFNETWork(); - void EndFNETWork(); + std::unique_lock<std::mutex> BeginFNETWork(); + void EndFNETWork(std::unique_lock<std::mutex> searchGuard); void EncodePartIDs(uint32_t partid, uint32_t rowid, bool mld, FS4Packet_QUERYRESULTX::FS4_hit *pt, |