aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp
blob: d47d2dd2877c459d0ce89140cf6b04dc7e31762b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "attribute_populator.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/searchlib/common/flush_token.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/gate.h>
#include <vespa/searchlib/attribute/attributevector.h>
#include <cassert>

#include <vespa/log/log.h>
LOG_SETUP(".proton.attribute.attribute_populator");

using vespalib::IDestructorCallback;

namespace proton {

namespace {

class PopulateDoneContext : public IDestructorCallback
{
    std::shared_ptr<document::Document> _doc;
public:
    PopulateDoneContext(std::shared_ptr<document::Document> doc) noexcept
        : _doc(std::move(doc))
    {
    }
    ~PopulateDoneContext() override = default;
};

}

search::SerialNum
AttributePopulator::nextSerialNum()
{
    assert(_currSerialNum <= _configSerialNum);
    return _currSerialNum++;
}

std::vector<vespalib::string>
AttributePopulator::getNames() const
{
    std::vector<search::AttributeGuard> attrs;
    _writer.getAttributeManager()->getAttributeList(attrs);
    std::vector<vespalib::string> names;
    names.reserve(attrs.size());
    for (const search::AttributeGuard &attr : attrs) {
        names.push_back(_subDbName + ".attribute." + attr->getName());
    }
    return names;
}

AttributePopulator::AttributePopulator(const proton::IAttributeManager::SP &mgr,
                                       search::SerialNum initSerialNum,
                                       const vespalib::string &subDbName,
                                       search::SerialNum configSerialNum)
    : _writer(mgr),
      _initSerialNum(initSerialNum),
      _currSerialNum(initSerialNum),
      _configSerialNum(configSerialNum),
      _subDbName(subDbName)
{
    if (LOG_WOULD_LOG(event)) {
        EventLogger::populateAttributeStart(getNames());
    }
}

AttributePopulator::~AttributePopulator()
{
    if (LOG_WOULD_LOG(event)) {
        EventLogger::populateAttributeComplete(getNames(),_currSerialNum - _initSerialNum);
    }
}

void
AttributePopulator::handleExisting(uint32_t lid, const std::shared_ptr<document::Document> &doc)
{
    search::SerialNum serialNum(nextSerialNum());
    _writer.put(serialNum, *doc, lid, std::make_shared<PopulateDoneContext>(doc));
    vespalib::Gate gate;
    _writer.forceCommit(serialNum, std::make_shared<vespalib::GateCallback>(gate));
    gate.await();
}

void
AttributePopulator::done()
{
    auto mgr = _writer.getAttributeManager();
    auto flushTargets = mgr->getFlushTargets();
    for (const auto &flushTarget : flushTargets) {
        assert(flushTarget->getFlushedSerialNum() < _configSerialNum);
        auto task = flushTarget->initFlush(_configSerialNum, std::make_shared<search::FlushToken>());
        // shrink target only return task if able to shrink.
        if (task) {
            task->run();
        }
        assert(flushTarget->getFlushedSerialNum() == _configSerialNum);
    }
}

} // namespace proton