aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient/src/vespa/vespaclient/vespadoclocator/locator.cpp
blob: af805d461fcf5b1139d5a00ee6cddfbf46b7f02e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <boost/tokenizer.hpp>
#include <vespa/documentapi/messagebus/documentprotocol.h>
#include <vespa/messagebus/configagent.h>
#include <vespa/messagebus/iconfighandler.h>
#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/vdslib/bucketdistribution.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/config/helper/configgetter.hpp>


#include "locator.h"

typedef std::map<std::string, uint32_t> ClusterMap;
using namespace config;

namespace {

    void
    processHop(const mbus::HopSpec &hop, ClusterMap &clusters)
    {
        typedef boost::char_separator<char> CharSeparator;
        typedef boost::tokenizer<CharSeparator> Tokenizer;

        int colIdx = -1;
        for (uint32_t r = 0, len = hop.getNumRecipients(); r < len; ++r) {
            Tokenizer tokens(hop.getRecipient(r), CharSeparator("/"));
            Tokenizer::iterator token = tokens.begin();
            for (uint32_t t = 0; t < 2 && token != tokens.end(); ++t, ++token) {
                // empty
            }
            if (token != tokens.end()) {
                colIdx = std::max(colIdx, atoi(&token->c_str()[1]));
            }
        }
        if (colIdx < 0) {
            throw config::InvalidConfigException(vespalib::make_string("Failed to process cluster '%s'.",
                                                                       hop.getName().c_str()));
        }
        clusters.insert(ClusterMap::value_type(hop.getName().substr(15), colIdx + 1));
    }

    void
    processTable(const mbus::RoutingTableSpec &table, ClusterMap &clusters)
    {
        clusters.clear();
        for (uint32_t i = 0, len = table.getNumHops(); i < len; ++i) {
            const mbus::HopSpec &hop = table.getHop(i);
            if (hop.getName().find("search/cluster.") == 0) {
                processHop(hop, clusters);
            }
        }
        if (clusters.empty()) {
            throw config::InvalidConfigException("No search clusters found to resolve document location for.");
        }
    }

    void
    processRouting(const mbus::RoutingSpec &routing, ClusterMap &clusters)
    {
        const mbus::RoutingTableSpec *table = NULL;
        for (uint32_t i = 0, len = routing.getNumTables(); i < len; ++i) {
            const mbus::RoutingTableSpec &ref = routing.getTable(i);
            if (ref.getProtocol() == documentapi::DocumentProtocol::NAME) {
                table = &ref;
                break;
            }
        }
        if (table == NULL) {
            throw config::InvalidConfigException("No routing table available to derive config from.");
        }
        processTable(*table, clusters);
    }

    uint32_t
    getNumColumns(const mbus::RoutingSpec &routing, const std::string &clusterName)
    {
        ClusterMap clusters;
        processRouting(routing, clusters);

        if (clusterName.empty() && clusters.size() == 1) {
            return clusters.begin()->second;
        }

        ClusterMap::iterator it = clusters.find(clusterName);
        if (it == clusters.end()) {
            std::string str = "Cluster name must be one of ";
            int i = 0, len = clusters.size();
            for (it = clusters.begin(); it != clusters.end(); ++it, ++i)
            {
                str.append("'").append(it->first).append("'");
                if (i < len - 2) {
                    str.append(", ");
                } else if (i == len - 2) {
                    str.append(" or ");
                }
            }
            str.append(".");
            throw config::InvalidConfigException(str);
        }

        return it->second;
    }
}

Locator::Locator(uint32_t numColumns) :
    _factory(),
    _numColumns(numColumns)
{
    // empty
}

void
Locator::configure(const std::string &configId, const std::string &clusterName)
{
    config::ConfigUri configUri(configId);
    // Configure by inspecting routing config.
    struct MyCB : public mbus::IConfigHandler {
        mbus::RoutingSpec mySpec;
        MyCB() : mySpec() {}
        bool setupRouting(const mbus::RoutingSpec &spec) override {
            mySpec = spec;
            return true;
        }
    } myCB;
    mbus::ConfigAgent agent(myCB);
    agent.configure(ConfigGetter<messagebus::MessagebusConfig>::getConfig(configUri.getConfigId(), configUri.getContext()));
    _numColumns = getNumColumns(myCB.mySpec, clusterName);
}

document::BucketId
Locator::getBucketId(document::DocumentId &docId)
{
    return _factory.getBucketId(docId);
}

uint32_t
Locator::getSearchColumn(document::DocumentId &docId)
{
    vdslib::BucketDistribution dist(_numColumns, 16u);
    return dist.getColumn(getBucketId(docId));
}