aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp
blob: 211fa36c3054f0e9e03a7089696d7aef5386a1d4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "fusionrunner.h"
#include "eventlogger.h"
#include "fusionspec.h"
#include <vespa/searchlib/common/serialnumfileheadercontext.h>
#include <vespa/searchlib/attribute/fixedsourceselector.h>
#include <vespa/searchlib/queryeval/isourceselector.h>

#include <vespa/log/log.h>
LOG_SETUP(".searchcorespi.index.fusionrunner");

using search::FixedSourceSelector;
using search::TuneFileAttributes;
using search::TuneFileIndexing;
using search::common::FileHeaderContext;
using search::common::SerialNumFileHeaderContext;
using search::index::Schema;
using search::queryeval::ISourceSelector;
using search::diskindex::SelectorArray;
using search::SerialNum;
using std::vector;
using vespalib::string;

namespace searchcorespi::index {

FusionRunner::FusionRunner(const string &base_dir,
                           const Schema &schema,
                           const TuneFileAttributes &tuneFileAttributes,
                           const FileHeaderContext &fileHeaderContext)
    : _diskLayout(base_dir),
      _schema(schema),
      _tuneFileAttributes(tuneFileAttributes),
      _fileHeaderContext(fileHeaderContext)
{ }

FusionRunner::~FusionRunner() = default;

namespace {

void readSelectorArray(const string &selector_name, SelectorArray &selector_array,
                       const vector<uint8_t> &id_map, uint32_t base_id, uint32_t fusion_id) {
    FixedSourceSelector::UP selector =
        FixedSourceSelector::load(selector_name, fusion_id);
    if (base_id != selector->getBaseId()) {
        selector = selector->cloneAndSubtract("tmp_for_fusion", base_id - selector->getBaseId());
    }

    const uint32_t num_docs = selector->getDocIdLimit();
    selector_array.reserve(num_docs);
    auto it = selector->createIterator();
    for (uint32_t i = 0; i < num_docs; ++i) {
        search::queryeval::Source source = it->getSource(i);
        // Workaround for source selector corruption.
        // Treat out of range source as last source.
        if (source >= id_map.size()) {
            source = id_map.size() - 1;
        }
        assert(source < id_map.size());
        selector_array.push_back(id_map[source]);
    }
}

bool
writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id,
                    uint32_t highest_doc_id,
                    const TuneFileAttributes &tuneFileAttributes,
                    const FileHeaderContext &fileHeaderContext)
{
    const search::queryeval::Source default_source = 0;
    FixedSourceSelector fusion_selector(default_source, "fusion_selector");
    fusion_selector.setSource(highest_doc_id, default_source);
    fusion_selector.setBaseId(fusion_id);
    string selector_name = IndexDiskLayout::getSelectorFileName(diskLayout.getFusionDir(fusion_id));
    if (!fusion_selector.extractSaveInfo(selector_name)->save(tuneFileAttributes, fileHeaderContext)) {
        LOG(warning, "Unable to write source selector data for fusion.%u.", fusion_id);
        return false;
    }
    return true;
}
}  // namespace

uint32_t
FusionRunner::fuse(const FusionSpec &fusion_spec,
                   SerialNum lastSerialNum,
                   IIndexMaintainerOperations &operations,
                   std::shared_ptr<search::IFlushToken> flush_token)
{
    const vector<uint32_t> &ids = fusion_spec.flush_ids;
    if (ids.empty()) {
        return 0;
    }
    const uint32_t fusion_id = ids.back();
    const string fusion_dir = _diskLayout.getFusionDir(fusion_id);

    vector<string> sources;
    vector<uint8_t> id_map(fusion_id + 1);
    if (fusion_spec.last_fusion_id != 0) {
        id_map[0] = sources.size();
        sources.push_back(_diskLayout.getFusionDir(fusion_spec.last_fusion_id));
    }
    for (uint32_t id : ids) {
        id_map[id - fusion_spec.last_fusion_id] = sources.size();
        sources.push_back(_diskLayout.getFlushDir(id));
    }

    if (LOG_WOULD_LOG(event)) {
        EventLogger::diskFusionStart(sources, fusion_dir);
    }
    vespalib::Timer timer;

    const string selector_name = IndexDiskLayout::getSelectorFileName(_diskLayout.getFlushDir(fusion_id));
    SelectorArray selector_array;
    readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id);

    if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) {
        return 0;
    }

    const uint32_t highest_doc_id = selector_array.size() - 1;
    SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, lastSerialNum);
    if (!writeFusionSelector(_diskLayout, fusion_id, highest_doc_id, _tuneFileAttributes, fileHeaderContext)) {
        return 0;
    }

    if (LOG_WOULD_LOG(event)) {
        EventLogger::diskFusionComplete(fusion_dir, vespalib::count_ms(timer.elapsed()));
    }
    return fusion_id;
}

}