aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
blob: 71ab22b6abf0718e6aa7ba96c8925aaa9ce05d37 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "caching_rpc_target_resolver.h"
#include "shared_rpc_resources.h"
#include <vespa/fnet/frt/target.h>
#include <vespa/slobrok/imirrorapi.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cassert>

#include <vespa/log/log.h>
LOG_SETUP(".storage.caching_rpc_target_resolver");

namespace storage::rpc {

CachingRpcTargetResolver::CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror,
                                                   const RpcTargetFactory& target_factory,
                                                   size_t num_targets_per_node)
    : _slobrok_mirror(slobrok_mirror),
      _target_factory(target_factory),
      _targets_rwmutex(),
      _num_targets_per_node(num_targets_per_node)
{
}

CachingRpcTargetResolver::~CachingRpcTargetResolver() = default;

vespalib::string
CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress& address) {
    vespalib::asciistream as;
    as << "storage/cluster." << address.getCluster()
       << '/' << ((address.getNodeType() == lib::NodeType::Type::STORAGE) ? "storage" : "distributor")
       << '/' << address.getIndex();
    return as.str();
}

std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint64_t bucket_id, uint32_t curr_slobrok_gen) {
    std::shared_lock lock(_targets_rwmutex);
    auto itr = _targets.find(address);
    if (itr != _targets.end()) {
        const auto& pool = itr->second;
        auto target = pool->get_target(bucket_id);
        if (target->is_valid() && (pool->slobrok_gen() == curr_slobrok_gen)) {
            return target;
        }
    }
    return {};
}

std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::consider_update_target_pool(const api::StorageMessageAddress& address,
                                                      uint64_t bucket_id,
                                                      const vespalib::string& connection_spec,
                                                      uint32_t curr_slobrok_gen,
                                                      [[maybe_unused]] const UniqueLock& targets_lock) {
    // If address has the same spec as the existing target pool, just reuse it.
    auto itr = _targets.find(address);
    if (itr != _targets.end()) {
        auto& pool = itr->second;
        auto target = pool->get_target(bucket_id);
        if (target->is_valid() && (pool->spec() == connection_spec)) {
            LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u",
                address.toString().c_str(), connection_spec.c_str(),
                pool->slobrok_gen(), curr_slobrok_gen);
            pool->update_slobrok_gen(curr_slobrok_gen);
            return target;
        }
    }
    return {};
}

std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::insert_new_target_mapping(const api::StorageMessageAddress& address,
                                                    uint64_t bucket_id,
                                                    const vespalib::string& connection_spec,
                                                    uint32_t curr_slobrok_gen,
                                                    [[maybe_unused]] const UniqueLock& targets_lock) {
    RpcTargetPool::RpcTargetVector targets;
    targets.reserve(_num_targets_per_node);
    for (size_t i = 0; i < _num_targets_per_node; ++i) {
        auto target = _target_factory.make_target(connection_spec); // TODO expensive inside lock?
        assert(target);
        targets.push_back(std::shared_ptr<RpcTarget>(std::move(target)));
    }
    // TODO emplacement (with replace) semantics to avoid need for default constructed K/V
    auto pool = std::make_shared<RpcTargetPool>(std::move(targets), connection_spec, curr_slobrok_gen);
    _targets[address] = pool;
    LOG(debug, "Added mapping '%s' -> '%s' at gen %u", address.toString().c_str(),
        connection_spec.c_str(), curr_slobrok_gen);
    return pool->get_target(bucket_id);
}

std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address, uint64_t bucket_id) {
    const uint32_t curr_slobrok_gen = _slobrok_mirror.updates();
    if (auto result = lookup_target(address, bucket_id, curr_slobrok_gen)) {
        return result;
    }
    auto slobrok_id = address_to_slobrok_id(address);
    auto specs = _slobrok_mirror.lookup(slobrok_id);
    if (specs.empty()) {
        LOG(debug, "Found no mapping for '%s'", slobrok_id.c_str());
        // TODO return potentially stale existing target if no longer existing in SB?
        // TODO or clear any existing mapping?
        return {};
    }
    // Note: We don't use wildcards so there is a 1-to-1 mapping between service name / slobrok id and connection spec.
    assert(specs.size() == 1);
    const auto& connection_spec = specs[0].second;
    std::unique_lock lock(_targets_rwmutex);
    if (auto result = consider_update_target_pool(address, bucket_id, connection_spec, curr_slobrok_gen, lock)) {
        return result;
    }
    return insert_new_target_mapping(address, bucket_id, connection_spec, curr_slobrok_gen, lock);
}

std::shared_ptr<RpcTargetPool>
CachingRpcTargetResolver::resolve_rpc_target_pool(const api::StorageMessageAddress& address) {
    std::shared_lock lock(_targets_rwmutex);
    auto itr = _targets.find(address);
    if (itr != _targets.end()) {
        return itr->second;
    }
    return {};
}

}