diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2021-08-11 22:17:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-11 22:17:14 +0200 |
commit | d7f026fdc2bbe507da717f45c0a8704291cff16f (patch) | |
tree | 236635985afaf78405fc563b4296c145510f773a | |
parent | 35e7cbbfdde4e544ce72b968e9c79e552fa0d3a2 (diff) | |
parent | d34ca2eae64e5dccc2de4857a0543e9a70959e12 (diff) |
Merge pull request #18712 from vespa-engine/arnej/use-internal-fetch-2
Try using slobrok.internal.fetchLocalView also
6 files changed, 229 insertions, 2 deletions
diff --git a/slobrok/src/vespa/slobrok/server/CMakeLists.txt b/slobrok/src/vespa/slobrok/server/CMakeLists.txt index e75b94d26bc..1492676cd3a 100644 --- a/slobrok/src/vespa/slobrok/server/CMakeLists.txt +++ b/slobrok/src/vespa/slobrok/server/CMakeLists.txt @@ -11,6 +11,7 @@ vespa_add_library(slobrok_slobrokserver map_diff.cpp map_listener.cpp map_source.cpp + service_map_mirror.cpp metrics_producer.cpp monitor.cpp named_service.cpp diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp index c7e3c3b32a6..a608c4b3cba 100644 --- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp +++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp @@ -109,6 +109,7 @@ void ExchangeManager::healthCheck() { for (const auto & [ name, partner ] : _partners) { + partner->maybeStartFetch(); partner->maybePushMine(); } LOG(debug, "ExchangeManager::healthCheck for %ld partners", _partners.size()); diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp index 539a901fd9d..0b9872c03e2 100644 --- a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp +++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp @@ -27,6 +27,7 @@ RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec, _remListReq(nullptr), _remAddReq(nullptr), _remRemReq(nullptr), + _remFetchReq(nullptr), _pending() { _rpcserver.healthCheck(); @@ -43,6 +44,9 @@ RemoteSlobrok::~RemoteSlobrok() _remote = nullptr; } + if (_remFetchReq != nullptr) { + _remFetchReq->Abort(); + } if (_remAddPeerReq != nullptr) { _remAddPeerReq->Abort(); } @@ -90,9 +94,76 @@ RemoteSlobrok::doPending() } // XXX should save this and pick up on RequestDone() } +} + +void RemoteSlobrok::maybeStartFetch() { + if (_remFetchReq != nullptr) return; + if (_remote == nullptr) return; + _remFetchReq = getSupervisor()->AllocRPCRequest(); + _remFetchReq->SetMethodName("slobrok.internal.fetchLocalView"); + _remFetchReq->GetParams()->AddInt32(_serviceMapMirror.currentGeneration().getAsInt()); + _remFetchReq->GetParams()->AddInt32(5000); + _remote->InvokeAsync(_remFetchReq, 15.0, this); +} +void RemoteSlobrok::handleFetchResult() { + LOG_ASSERT(_remFetchReq != nullptr); + bool success = true; + if (_remFetchReq->CheckReturnTypes("iSSSi")) { + FRT_Values &answer = *(_remFetchReq->GetReturn()); + + uint32_t diff_from = answer[0]._intval32; + uint32_t numRemove = answer[1]._string_array._len; + FRT_StringValue *r = answer[1]._string_array._pt; + uint32_t numNames = answer[2]._string_array._len; + FRT_StringValue *n = answer[2]._string_array._pt; + uint32_t numSpecs = answer[3]._string_array._len; + FRT_StringValue *s = answer[3]._string_array._pt; + uint32_t diff_to = answer[4]._intval32; + + std::vector<vespalib::string> removed; + for (uint32_t idx = 0; idx < numRemove; ++idx) { + removed.emplace_back(r[idx]._str); + } + ServiceMappingList updated; + if (numNames == numSpecs) { + for (uint32_t idx = 0; idx < numNames; ++idx) { + updated.emplace_back(n[idx]._str, s[idx]._str); + } + } else { + diff_from = 0; + diff_to = 0; + success = false; + } + MapDiff diff(diff_from, std::move(removed), std::move(updated), diff_to); + if (diff_from == 0) { + _serviceMapMirror.clear(); + _serviceMapMirror.apply(std::move(diff)); + } else if (diff_from == _serviceMapMirror.currentGeneration().getAsInt()) { + _serviceMapMirror.apply(std::move(diff)); + } else { + _serviceMapMirror.clear(); + success = false; + } + } else { + if (_remFetchReq->GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) { + LOG(debug, "partner slobrok too old - not mirroring"); + } else { + LOG(warning, "fetchLocalView() failed with partner %s: %s", + getName().c_str(), _remFetchReq->GetErrorMessage()); + } + _serviceMapMirror.clear(); + success = false; + } + _remFetchReq->SubRef(); + _remFetchReq = nullptr; + if (success) { + maybeStartFetch(); + } } + + void RemoteSlobrok::pushMine() { @@ -109,13 +180,17 @@ RemoteSlobrok::pushMine() void RemoteSlobrok::RequestDone(FRT_RPCRequest *req) { + if (req == _remFetchReq) { + handleFetchResult(); + return; + } FRT_Values &answer = *(req->GetReturn()); if (req == _remAddPeerReq) { // handle response after asking remote slobrok to add me as a peer: if (req->IsError()) { FRT_Values &args = *req->GetParams(); - const char *myname = args[0]._string._str; - const char *myspec = args[1]._string._str; + const char *myname = args[0]._string._str; + const char *myspec = args[1]._string._str; LOG(info, "addPeer(%s, %s) on remote slobrok %s at %s: %s", myname, myspec, getName().c_str(), getSpec().c_str(), req->GetErrorMessage()); req->SubRef(); @@ -162,6 +237,7 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req) _remListReq = nullptr; // next step is to push the ones I own: + maybeStartFetch(); maybePushMine(); } else if (req == _remAddReq) { // handle response after pushing some name that we managed: @@ -268,10 +344,12 @@ RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) _reconnecter.disable(); if (_remote != nullptr) { + maybeStartFetch(); // the rest here should only be done on first notifyOk return; } _remote = getSupervisor()->GetTarget(getSpec().c_str()); + maybeStartFetch(); // at this point, we will do (in sequence): // ask peer to connect to us too; diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.h b/slobrok/src/vespa/slobrok/server/remote_slobrok.h index fa627bbd821..9e3fb1df55f 100644 --- a/slobrok/src/vespa/slobrok/server/remote_slobrok.h +++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.h @@ -5,6 +5,7 @@ #include "cmd.h" #include "i_rpc_server_manager.h" #include "managed_rpc_server.h" +#include "service_map_mirror.h" #include <deque> namespace slobrok { @@ -44,6 +45,7 @@ private: ExchangeManager &_exchanger; RpcServerManager &_rpcsrvmanager; FRT_Target *_remote; + ServiceMapMirror _serviceMapMirror; ManagedRpcServer _rpcserver; Reconnecter _reconnecter; int _failCnt; @@ -52,10 +54,12 @@ private: FRT_RPCRequest *_remListReq; FRT_RPCRequest *_remAddReq; FRT_RPCRequest *_remRemReq; + FRT_RPCRequest *_remFetchReq; std::deque<std::unique_ptr<NamedService>> _pending; void pushMine(); void doPending(); + void handleFetchResult(); public: RemoteSlobrok(const RemoteSlobrok&) = delete; @@ -67,9 +71,11 @@ public: bool isConnected() const { return (_remote != nullptr); } void tryConnect(); void maybePushMine(); + void maybeStartFetch(); void invokeAsync(FRT_RPCRequest *req, double timeout, FRT_IRequestWait *rwaiter); const std::string & getName() const { return _rpcserver.getName(); } const std::string & getSpec() const { return _rpcserver.getSpec(); } + ServiceMapMirror &remoteMap() { return _serviceMapMirror; } // interfaces implemented: void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override; diff --git a/slobrok/src/vespa/slobrok/server/service_map_mirror.cpp b/slobrok/src/vespa/slobrok/server/service_map_mirror.cpp new file mode 100644 index 00000000000..8b9a40efed7 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/service_map_mirror.cpp @@ -0,0 +1,90 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "service_map_mirror.h" +#include <vespa/log/log.h> +LOG_SETUP(".slobrok.service_map_mirror"); + +namespace slobrok { + +ServiceMapMirror::ServiceMapMirror() + : _map(), + _currGen(0), + _lock() +{} + +ServiceMapMirror::~ServiceMapMirror() { + clear(); +} + +void ServiceMapMirror::apply(const MapDiff &diff) { + std::lock_guard guard(_lock); + LOG(debug, "Applying diff from gen %u", diff.fromGen.getAsInt()); + LOG_ASSERT(diff.fromGen == _currGen); + for (const auto & name : diff.removed) { + auto iter = _map.find(name); + if (iter != _map.end()) { + LOG(debug, "Apply remove %s->%s", name.c_str(), iter->second.c_str()); + ServiceMapping mapping(name, iter->second); + for (auto * listener : _listeners) { + listener->remove(mapping); + } + _map.erase(iter); + } else { + LOG(debug, "Apply remove %s [already removed]", name.c_str()); + } + } + for (const auto & mapping : diff.updated) { + LOG(debug, "Apply update %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + auto iter = _map.find(mapping.name); + if (iter != _map.end()) { + ServiceMapping old{mapping.name, iter->second}; + iter->second = mapping.spec; + for (auto * listener : _listeners) { + listener->update(old, mapping); + } + } else { + _map.emplace(mapping.name, mapping.spec); + for (auto * listener : _listeners) { + listener->add(mapping); + } + } + } + LOG(debug, "Apply diff complete to gen %u", diff.toGen.getAsInt()); + _currGen = diff.toGen; +} + +void ServiceMapMirror::clear() { + std::lock_guard guard(_lock); + for (const auto & [ k, v ] : _map) { + ServiceMapping mapping{k, v}; + for (auto * listener : _listeners) { + listener->remove(mapping); + } + } + _map.clear(); + _currGen.reset(); +} + +ServiceMappingList ServiceMapMirror::allMappings() const { + std::lock_guard guard(_lock); + ServiceMappingList result; + result.reserve(_map.size()); + for (const auto & [ k, v ] : _map) { + result.emplace_back(k, v); + } + return result; +} + +void ServiceMapMirror::registerListener(MapListener &listener) { + std::lock_guard guard(_lock); + _listeners.insert(&listener); +} + +void ServiceMapMirror::unregisterListener(MapListener &listener) { + std::lock_guard guard(_lock); + _listeners.erase(&listener); +} + + +} // namespace slobrok + diff --git a/slobrok/src/vespa/slobrok/server/service_map_mirror.h b/slobrok/src/vespa/slobrok/server/service_map_mirror.h new file mode 100644 index 00000000000..4cd1c91b7f1 --- /dev/null +++ b/slobrok/src/vespa/slobrok/server/service_map_mirror.h @@ -0,0 +1,51 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "service_mapping.h" +#include "map_diff.h" +#include "map_source.h" +#include <vespa/vespalib/util/gencnt.h> +#include <map> +#include <mutex> +#include <set> + +namespace slobrok { + +/** + * @class ServiceMapMirror + * @brief Holds a name->spec map which can be incrementally updated + **/ +class ServiceMapMirror : public MapSource +{ +public: + using Generation = vespalib::GenCnt; + + ServiceMapMirror(); + ~ServiceMapMirror(); + + /** update according to diff */ + void apply(const MapDiff &diff); + + /** remove all mappings */ + void clear(); + + const Generation ¤tGeneration() const { return _currGen; } + + ServiceMappingList allMappings() const; + + void registerListener(MapListener &listener) override; + void unregisterListener(MapListener &listener) override; + +private: + using Map = std::map<vespalib::string, vespalib::string>; + Map _map; + Generation _currGen; + mutable std::mutex _lock; + std::set<MapListener *> _listeners; +}; + +//----------------------------------------------------------------------------- + +} // namespace slobrok + |