aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
blob: 61bcffe6f00db7344e2fd1509cb7ee3920a8e3ac (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "externpolicy.h"
#include "mirror_with_all.h"
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/documentapi/messagebus/documentprotocol.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>

#include <vespa/log/log.h>
LOG_SETUP(".externpolicy");

using slobrok::api::IMirrorAPI;
using slobrok::api::MirrorAPI;

namespace documentapi {

ExternPolicy::ExternPolicy(const string &param) :
    _lock(),
    _mirrorWithAll(),
    _pattern(),
    _session(),
    _error("Not initialized."),
    _offset(0),
    _gen(0),
    _recipients()
{
    // Parse connection spec.
    if (param.empty()) {
        _error = "Expected parameter, got empty string.";
        return;
    }
    size_t pos = param.find(';');
    if (pos == string::npos || pos == 0 || pos == param.size() - 1) {
        _error = vespalib::make_string("Expected parameter on the form '<spec>;<pattern>', got '%s'.", param.c_str());
        return;
    }

    // Activate supervisor and register mirror.
    MirrorAPI::StringList spec;
    vespalib::string lst = param.substr(0, pos);
    vespalib::StringTokenizer slobrokList(lst, ",");
    for (uint32_t j = 0; j < slobrokList.size(); j++) {
        spec.push_back(slobrokList[j]);
    }

    if (spec.empty()) {
        _error = vespalib::make_string("Extern policy needs at least one slobrok: Slobrok list '%s' resolved to no slobroks", lst.c_str());
        return;
    }

    slobrok::ConfiguratorFactory config(spec);
    _mirrorWithAll = std::make_unique<MirrorAndStuff>(config);
     LOG(debug, "Connecting to extern slobrok mirror '%s'..", lst.c_str());


    // Parse query pattern.
    _pattern = param.substr(pos + 1);
    pos = _pattern.find_last_of('/');
    if (pos == string::npos) {
        _error = vespalib::make_string("Expected pattern on the form '<service>/<session>', got '%s'.", _pattern.c_str());
        return;
    }
    _session = _pattern.substr(pos);

    // All ok.
    _error.clear();
}

ExternPolicy::~ExternPolicy() = default;

const IMirrorAPI *
ExternPolicy::getMirror() const {
    return _mirrorWithAll ? _mirrorWithAll->mirror() : nullptr;
}

void
ExternPolicy::select(mbus::RoutingContext &ctx)
{
    if (!_error.empty()) {
        ctx.setError(DocumentProtocol::ERROR_POLICY_FAILURE, _error);
    } else if (_mirrorWithAll->mirror()->ready()) {
        mbus::Hop hop = getRecipient();
        if (hop.hasDirectives()) {
            mbus::Route route = ctx.getRoute();
            route.setHop(0, hop);
            ctx.addChild(route);
        } else {
            ctx.setError(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE,
                         vespalib::make_string("Could not resolve any recipients from '%s'.", _pattern.c_str()));
        }
    } else {
        ctx.setError(mbus::ErrorCode::APP_TRANSIENT_ERROR, "Extern slobrok not ready.");
    }

}

void
ExternPolicy::merge(mbus::RoutingContext &ctx)
{
    DocumentProtocol::merge(ctx);
}

mbus::Hop
ExternPolicy::getRecipient()
{
    std::lock_guard guard(_lock);
    update();
    if (_recipients.empty()) {
        return mbus::Hop();
    }
    return _recipients[++_offset % _recipients.size()];
}

void
ExternPolicy::update()
{
    uint32_t upd = _mirrorWithAll->mirror()->updates();
    if (_gen != upd) {
        _gen = upd;
        _recipients.clear();

        IMirrorAPI::SpecList entries = _mirrorWithAll->mirror()->lookup(_pattern);
        if (!entries.empty()) {
            for (const auto & spec : entries) {
                _recipients.push_back(mbus::Hop::parse(spec.second + _session));
            }
        }
    }
}

}