summaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.h
blob: 039a56fbc9b98bdd2c1b3f29598d09ca0518a0e8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
// Copyright (C) 1998-2003 Fast Search & Transfer ASA
// Copyright (C) 2003 Overture Services Norway AS

#pragma once

#include <vespa/fnet/fnet.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/searchcore/fdispatch/common/search.h>
#include <vespa/searchlib/common/sortdata.h>
#include <vespa/searchcore/grouping/mergingmanager.h>
#include <vespa/searchcore/fdispatch/search/search_path.h>
#include <vespa/searchcore/fdispatch/search/querycacheutil.h>
#include <vespa/searchcore/fdispatch/search/fnet_engine.h>

class FastS_FNET_Engine;
class FastS_FNET_Search;

using search::fs4transport::FS4Packet_QUERYRESULTX;
using search::fs4transport::FS4Packet_GETDOCSUMSX;
using search::fs4transport::FS4Packet_DOCSUM;
using search::fs4transport::FS4Packet_TRACEREPLY;

//-----------------------------------------------------------------

class FastS_FNET_SearchNode : public FNET_IPacketHandler
{
public:
    class ExtraDocsumNodesIter;
    typedef std::unique_ptr<FastS_FNET_SearchNode> UP;
private:
    friend class ExtraDocsumNodesIter;

    FastS_FNET_Search       *_search;   // we are part of this search
    FastS_FNET_Engine       *_engine;   // we use this search engine
    FNET_Channel            *_channel;  // connection with search engine
    uint32_t                 _subds;    // engine sub dataset
    uint32_t                 _partid;   // engine partition id
    uint32_t                 _rowid;    // engine row id
    uint32_t                 _stamp;    // engine timestamp

public:

    FS4Packet_QUERYRESULTX *_qresult; // query result packet
    double                  _queryTime;
    struct Flags {
        Flags() :
            _pendingQuery(false),
            _docsumMld(false),
            _queryTimeout(false),
            _docsumTimeout(false),
            _needSubCost(false)
        { }
        bool  _pendingQuery;   // is query pending ?
        bool  _docsumMld;
        bool  _queryTimeout;
        bool  _docsumTimeout;
        bool  _needSubCost;
    };

    Flags       _flags;

// Docsum related stuff.
    uint32_t    _docidCnt;
    uint32_t    _pendingDocsums; // how many docsums pending ?
    uint32_t    _docsumRow;
    uint32_t    _docsumStamp;
    uint32_t    _docsum_offsets_idx;
    double      _docsumTime;

    FS4Packet_GETDOCSUMSX  *_gdx;
    std::vector<uint32_t>   _docsum_offsets;
private:
    std::vector<FastS_FNET_SearchNode::UP> _extraDocsumNodes;
    FastS_FNET_SearchNode *_nextExtraDocsumNode;
    FastS_FNET_SearchNode *_prevExtraDocsumNode;
public:

// Query processing stuff.
    FS4Packet_QUERYRESULTX::FS4_hit *_hit_beg; // hit array start
    FS4Packet_QUERYRESULTX::FS4_hit *_hit_cur; // current hit
    FS4Packet_QUERYRESULTX::FS4_hit *_hit_end; // end boundary

    search::common::SortDataIterator _sortDataIterator;

public:
    FastS_FNET_SearchNode(FastS_FNET_Search *search, uint32_t partid);
    // These objects are referenced everywhere and must never be either copied nor moved,
    // but std::vector requires this to exist. If called it will assert.
    FastS_FNET_SearchNode(FastS_FNET_SearchNode && rhs);
    FastS_FNET_SearchNode(const FastS_FNET_SearchNode &) = delete;
    FastS_FNET_SearchNode& operator=(const FastS_FNET_SearchNode &) = delete;

    virtual ~FastS_FNET_SearchNode();

    // Methods needed by mergehits
    bool NT_InitMerge(uint32_t *numDocs, uint64_t *totalHits, search::HitRank *maxRank, uint32_t *sortDataDocs);
    search::common::SortDataIterator *NT_GetSortDataIterator() { return &_sortDataIterator; }
    FS4Packet_QUERYRESULTX::FS4_hit *NT_GetHit() const { return _hit_cur; }
    uint32_t NT_GetNumHitsUsed() const { return (_hit_cur - _hit_beg); }
    uint32_t NT_GetNumHitsLeft() const { return (_hit_end - _hit_cur); }
    uint64_t NT_GetTotalHits() const { return (_qresult != NULL) ? _qresult->_totNumDocs : 0; }
    uint32_t NT_GetNumHits() const { return (_hit_end - _hit_beg); }
    void NT_NextHit() { _hit_cur++; }

    uint32_t getPartID() const     { return _partid; }
    uint32_t GetRowID() const     { return _rowid; }
    uint32_t GetTimeStamp() const { return _stamp; }

    FastS_FNET_SearchNode * allocExtraDocsumNode(bool mld, uint32_t rowid, uint32_t rowbits);

    FastS_FNET_Engine *GetEngine() const { return _engine; }

    bool IsConnected() const { return _channel != NULL; }
    void Connect(FastS_FNET_Engine *engine);
    void Connect_HasDSLock(FastS_FNET_Engine *engine);
    FastS_EngineBase * getPartition(const FastOS_Mutex & dsMutex, bool userow, FastS_FNET_DataSet *dataset);
    void allocGDX(search::docsummary::GetDocsumArgs *args, const search::engine::PropertiesMap &properties);
    void postGDX(uint32_t *pendingDocsums, uint32_t *pendingDocsumNodes);
    vespalib::string toString() const;

    const char *getHostName() const {
        return (_engine == NULL ? "localhost" : _engine->getHostName());
    }
    int getPortNumber() const {
        return (_engine == NULL ? 0 : _engine->getPortNumber());
    }

    void dropCost() {
        if (_engine != NULL && _flags._needSubCost) {
            _engine->SubCost();
            _flags._needSubCost = false;
        }
    }


    void Disconnect()
    {
        if (_channel != NULL) {
            _channel->CloseAndFree();
            _channel = NULL;
        }
        if (_engine != NULL) {
            if (_flags._needSubCost) {
                _engine->SubCost();
                _flags._needSubCost = false;
            }
            _engine = NULL;
        }
    }


    bool PostPacket(FNET_Packet *packet) {
        return (_channel == NULL) ?  packet->Free(), false : _channel->Send(packet);
    }

    virtual HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override;
};


class FastS_FNET_SearchNode::ExtraDocsumNodesIter
{
private:
    ExtraDocsumNodesIter(const ExtraDocsumNodesIter &other);
    ExtraDocsumNodesIter& operator=(const ExtraDocsumNodesIter &other);

    FastS_FNET_SearchNode       *_cur;
    const FastS_FNET_SearchNode *_head;

public:
    ExtraDocsumNodesIter(const FastS_FNET_SearchNode *head)
        : _cur(head->_nextExtraDocsumNode),
          _head(head)
    {
    }

    ExtraDocsumNodesIter & operator++() {
        _cur = _cur->_nextExtraDocsumNode;
        return *this;
    }

    bool valid() const { return _cur != _head; }
    FastS_FNET_SearchNode *operator*() const { return _cur; }
};


//-----------------------------------------------------------------

class FastS_FNET_Search : public FastS_AsyncSearch
{
private:
    FastS_FNET_Search(const FastS_FNET_Search &);
    FastS_FNET_Search& operator=(const FastS_FNET_Search &);

public:

    class Timeout : public FNET_Task
    {
    private:
        Timeout(const Timeout &);
        Timeout& operator=(const Timeout &);

        FastS_FNET_Search *_search;

    public:
        Timeout(FNET_Scheduler *scheduler, FastS_FNET_Search *search)
            : FNET_Task(scheduler),
              _search(search) {}
        void PerformTask() override;
    };

    enum FNETMode {
        FNET_NONE    = 0x00,
        FNET_QUERY   = 0x01,
        FNET_DOCSUMS = 0x02
    };

private:
    FastOS_Mutex             _lock;
    FastS_TimeKeeper        *_timeKeeper;
    double                   _startTime;
    Timeout                  _timeout;
    FastS_QueryCacheUtil     _util;
    std::unique_ptr<search::grouping::MergingManager> _groupMerger;
    FastS_DataSetCollection *_dsc;	// owner keeps this alive
    FastS_FNET_DataSet      *_dataset;
    bool		     _datasetActiveCostRef;
    std::vector<FastS_FNET_SearchNode> _nodes;
    bool                     _nodesConnected;

    uint32_t                 _estParts;
    uint32_t                 _estPartCutoff;

    FNETMode                 _FNET_mode;

    uint32_t                 _pendingQueries;	// # nodes with query left
    uint32_t		     _goodQueries;	// # queries good
    uint32_t                 _pendingDocsums;	// # docsums left
    uint32_t                 _pendingDocsumNodes; // # nodes with docsums left
    uint32_t		     _requestedDocsums; // # docsums requested
    uint32_t                 _goodDocsums;	// # docsums good
    uint32_t                 _queryNodes;	  // #nodes with query
    uint32_t                 _queryNodesTimedOut;  // #nodes with query timeout
    uint32_t                 _docsumNodes;	  // #nodes with docsums
    uint32_t                 _docsumNodesTimedOut; // #nodes with docsum timeout
    uint32_t                 _docsumsTimedOut;
    bool                     _queryTimeout;
    bool                     _docsumTimeout;

    double		     _queryStartTime;
    double                   _queryMinWait;
    double                   _queryMaxWait;
    bool                     _queryWaitCalculated;
    double		     _adjustedQueryTimeOut;
    double		     _docSumStartTime;
    double		     _adjustedDocSumTimeOut;
    uint32_t                 _fixedRow;

    std::vector<FastS_fullresult>  _resbuf;

    void dropDatasetActiveCostRef();

    typedef std::vector<std::pair<FastS_EngineBase *, FastS_FNET_SearchNode *>> EngineNodeMap;
    void connectNodes(const EngineNodeMap & engines);
    void AllocNodes();
    void ConnectQueryNodes();
    void ConnectEstimateNodes();
    void connectSearchPath(const vespalib::string &spec);
    void connectSearchPath(const fdispatch::SearchPath::Element &elem,
                           const vespalib::string &spec,
                           uint32_t dispatchLevel);
    void ConnectDocsumNodes(bool ignoreRow);
    uint32_t getNextFixedRow();
    uint32_t getFixedRowCandidate();
    uint32_t getHashedRow() const;

    void Lock()   { _lock.Lock();   }
    void Unlock() { _lock.Unlock(); }

    bool BeginFNETWork();
    void EndFNETWork();

    void EncodePartIDs(uint32_t partid, uint32_t rowid, bool mld,
                       FS4Packet_QUERYRESULTX::FS4_hit *pt,
                       FS4Packet_QUERYRESULTX::FS4_hit *end);

    FastS_TimeKeeper *GetTimeKeeper() const { return _timeKeeper; }

    FastS_FNET_SearchNode * getNode(size_t i) { return &_nodes[i]; }
public:
    FastS_FNET_Search(FastS_DataSetCollection *dsc,
                      FastS_FNET_DataSet *dataset,
                      FastS_TimeKeeper *timeKeeper);
    virtual ~FastS_FNET_Search();

    void GotQueryResult(FastS_FNET_SearchNode *node, FS4Packet_QUERYRESULTX *qrx);
    void GotDocsum(FastS_FNET_SearchNode *node, FS4Packet_DOCSUM *docsum);
    void LostSearchNode(FastS_FNET_SearchNode *node);
    void GotEOL(FastS_FNET_SearchNode *node);
    void GotError(FastS_FNET_SearchNode *node, search::fs4transport::FS4Packet_ERROR *error);

    void HandleTimeout();

    bool ShouldLimitHitsPerNode() const;
    void MergeHits();
    void CheckCoverage();
    void CheckQueryTimes();
    void CheckDocsumTimes();
    void CheckQueryTimeout();
    void CheckDocsumTimeout();

    // *** API methods -- BEGIN ***

    virtual FastS_SearchInfo *GetSearchInfo() override { return _util.GetSearchInfo(); }

    virtual RetCode Search(uint32_t searchOffset, uint32_t maxhits, uint32_t minhits = 0) override;
    virtual RetCode ProcessQueryDone() override;
    virtual RetCode GetDocsums(const FastS_hitresult *hits, uint32_t hitcnt) override;
    virtual RetCode ProcessDocsumsDone() override;

    // *** API methods -- END ***

    // Hit merging methods

    FastS_FNET_SearchNode *ST_GetNode(size_t i) { return getNode(i); }
    uint32_t ST_GetNumNodes() const { return _nodes.size(); }
    bool ST_IsEstimate() const { return _util.IsEstimate(); }
    uint32_t ST_GetEstParts() const { return _estParts; }
    uint32_t ST_GetEstPartCutoff() const { return _estPartCutoff; }
    bool ST_ShouldDropSortData() const { return _util.ShouldDropSortData(); }
    bool ST_ShouldLimitHitsPerNode() const { return ShouldLimitHitsPerNode(); }
    void ST_SetNumHits(uint32_t numHits) {
        _util.SetAlignedHitCount(numHits);
        _util.CalcHitCount();
        _util.AllocAlignedHitBuf();
    }
    uint32_t ST_GetAlignedSearchOffset() const { return _util.GetAlignedSearchOffset(); }
    uint32_t ST_GetAlignedMaxHits() const { return _util.GetAlignedMaxHits(); }
    uint32_t ST_GetAlignedHitCount() const { return _util.GetAlignedHitCount(); }
    FastS_hitresult *ST_GetAlignedHitBuf() { return _util.GetAlignedHitBuf(); }
    FastS_hitresult *ST_GetAlignedHitBufEnd() { return _util.GetAlignedHitBufEnd(); }
    void ST_AllocSortData(uint32_t len) { _util.AllocSortData(len); }
    uint32_t *ST_GetSortIndex() { return _util.GetSortIndex(); }
    char *ST_GetSortData() { return _util.GetSortData(); }
    FastS_QueryResult *ST_GetQueryResult() { return _util.GetQueryResult(); }

    void adjustQueryTimeout();
    void adjustDocsumTimeout();
    uint32_t getGoodQueries() const { return _goodQueries; }
    uint32_t getRequestedQueries() const { return _queryNodes; }
    uint32_t getPendingQueries() const { return _pendingQueries; }
    uint32_t getDoneQueries() const {
        return getRequestedQueries() - getPendingQueries();
    }
    uint32_t getBadQueries() const {
        return getDoneQueries() - getGoodQueries();
    }
    uint32_t getGoodDocsums() const { return _goodDocsums; }
    uint32_t getRequestedDocsums() const { return _requestedDocsums; }
    uint32_t getPendingDocsums() const { return _pendingDocsums; }
    uint32_t getDoneDocsums() const {
        return getRequestedDocsums() - getPendingDocsums();
    }
    uint32_t getBadDocsums() const {
        return getDoneDocsums() - getGoodDocsums();
    }

    FNET_Packet::UP
    setupQueryPacket(uint32_t hitsPerNode, uint32_t qflags,
                     const search::engine::PropertiesMap &properties);
};

//-----------------------------------------------------------------------------

class FastS_Sync_FNET_Search : public FastS_SyncSearchAdapter
{
private:
    FastS_FNET_Search _search;

public:
    FastS_Sync_FNET_Search(FastS_DataSetCollection *dsc,
                           FastS_FNET_DataSet *dataset,
                           FastS_TimeKeeper *timeKeeper) :
        FastS_SyncSearchAdapter(&_search),
        _search(dsc, dataset, timeKeeper)
    {
        _search.SetAsyncArgs(this, FastS_SearchContext());
    }
    virtual ~FastS_Sync_FNET_Search();
    virtual void Free() override { delete this; }
};

//-----------------------------------------------------------------