summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArne H Juul <arnej27959@users.noreply.github.com>2021-08-11 22:17:14 +0200
committerGitHub <noreply@github.com>2021-08-11 22:17:14 +0200
commitd7f026fdc2bbe507da717f45c0a8704291cff16f (patch)
tree236635985afaf78405fc563b4296c145510f773a
parent35e7cbbfdde4e544ce72b968e9c79e552fa0d3a2 (diff)
parentd34ca2eae64e5dccc2de4857a0543e9a70959e12 (diff)
Merge pull request #18712 from vespa-engine/arnej/use-internal-fetch-2
Try using slobrok.internal.fetchLocalView also
-rw-r--r--slobrok/src/vespa/slobrok/server/CMakeLists.txt1
-rw-r--r--slobrok/src/vespa/slobrok/server/exchange_manager.cpp1
-rw-r--r--slobrok/src/vespa/slobrok/server/remote_slobrok.cpp82
-rw-r--r--slobrok/src/vespa/slobrok/server/remote_slobrok.h6
-rw-r--r--slobrok/src/vespa/slobrok/server/service_map_mirror.cpp90
-rw-r--r--slobrok/src/vespa/slobrok/server/service_map_mirror.h51
6 files changed, 229 insertions, 2 deletions
diff --git a/slobrok/src/vespa/slobrok/server/CMakeLists.txt b/slobrok/src/vespa/slobrok/server/CMakeLists.txt
index e75b94d26bc..1492676cd3a 100644
--- a/slobrok/src/vespa/slobrok/server/CMakeLists.txt
+++ b/slobrok/src/vespa/slobrok/server/CMakeLists.txt
@@ -11,6 +11,7 @@ vespa_add_library(slobrok_slobrokserver
map_diff.cpp
map_listener.cpp
map_source.cpp
+ service_map_mirror.cpp
metrics_producer.cpp
monitor.cpp
named_service.cpp
diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
index c7e3c3b32a6..a608c4b3cba 100644
--- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
+++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
@@ -109,6 +109,7 @@ void
ExchangeManager::healthCheck()
{
for (const auto & [ name, partner ] : _partners) {
+ partner->maybeStartFetch();
partner->maybePushMine();
}
LOG(debug, "ExchangeManager::healthCheck for %ld partners", _partners.size());
diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
index 539a901fd9d..0b9872c03e2 100644
--- a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
+++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
@@ -27,6 +27,7 @@ RemoteSlobrok::RemoteSlobrok(const std::string &name, const std::string &spec,
_remListReq(nullptr),
_remAddReq(nullptr),
_remRemReq(nullptr),
+ _remFetchReq(nullptr),
_pending()
{
_rpcserver.healthCheck();
@@ -43,6 +44,9 @@ RemoteSlobrok::~RemoteSlobrok()
_remote = nullptr;
}
+ if (_remFetchReq != nullptr) {
+ _remFetchReq->Abort();
+ }
if (_remAddPeerReq != nullptr) {
_remAddPeerReq->Abort();
}
@@ -90,9 +94,76 @@ RemoteSlobrok::doPending()
}
// XXX should save this and pick up on RequestDone()
}
+}
+
+void RemoteSlobrok::maybeStartFetch() {
+ if (_remFetchReq != nullptr) return;
+ if (_remote == nullptr) return;
+ _remFetchReq = getSupervisor()->AllocRPCRequest();
+ _remFetchReq->SetMethodName("slobrok.internal.fetchLocalView");
+ _remFetchReq->GetParams()->AddInt32(_serviceMapMirror.currentGeneration().getAsInt());
+ _remFetchReq->GetParams()->AddInt32(5000);
+ _remote->InvokeAsync(_remFetchReq, 15.0, this);
+}
+void RemoteSlobrok::handleFetchResult() {
+ LOG_ASSERT(_remFetchReq != nullptr);
+ bool success = true;
+ if (_remFetchReq->CheckReturnTypes("iSSSi")) {
+ FRT_Values &answer = *(_remFetchReq->GetReturn());
+
+ uint32_t diff_from = answer[0]._intval32;
+ uint32_t numRemove = answer[1]._string_array._len;
+ FRT_StringValue *r = answer[1]._string_array._pt;
+ uint32_t numNames = answer[2]._string_array._len;
+ FRT_StringValue *n = answer[2]._string_array._pt;
+ uint32_t numSpecs = answer[3]._string_array._len;
+ FRT_StringValue *s = answer[3]._string_array._pt;
+ uint32_t diff_to = answer[4]._intval32;
+
+ std::vector<vespalib::string> removed;
+ for (uint32_t idx = 0; idx < numRemove; ++idx) {
+ removed.emplace_back(r[idx]._str);
+ }
+ ServiceMappingList updated;
+ if (numNames == numSpecs) {
+ for (uint32_t idx = 0; idx < numNames; ++idx) {
+ updated.emplace_back(n[idx]._str, s[idx]._str);
+ }
+ } else {
+ diff_from = 0;
+ diff_to = 0;
+ success = false;
+ }
+ MapDiff diff(diff_from, std::move(removed), std::move(updated), diff_to);
+ if (diff_from == 0) {
+ _serviceMapMirror.clear();
+ _serviceMapMirror.apply(std::move(diff));
+ } else if (diff_from == _serviceMapMirror.currentGeneration().getAsInt()) {
+ _serviceMapMirror.apply(std::move(diff));
+ } else {
+ _serviceMapMirror.clear();
+ success = false;
+ }
+ } else {
+ if (_remFetchReq->GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) {
+ LOG(debug, "partner slobrok too old - not mirroring");
+ } else {
+ LOG(warning, "fetchLocalView() failed with partner %s: %s",
+ getName().c_str(), _remFetchReq->GetErrorMessage());
+ }
+ _serviceMapMirror.clear();
+ success = false;
+ }
+ _remFetchReq->SubRef();
+ _remFetchReq = nullptr;
+ if (success) {
+ maybeStartFetch();
+ }
}
+
+
void
RemoteSlobrok::pushMine()
{
@@ -109,13 +180,17 @@ RemoteSlobrok::pushMine()
void
RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
{
+ if (req == _remFetchReq) {
+ handleFetchResult();
+ return;
+ }
FRT_Values &answer = *(req->GetReturn());
if (req == _remAddPeerReq) {
// handle response after asking remote slobrok to add me as a peer:
if (req->IsError()) {
FRT_Values &args = *req->GetParams();
- const char *myname = args[0]._string._str;
- const char *myspec = args[1]._string._str;
+ const char *myname = args[0]._string._str;
+ const char *myspec = args[1]._string._str;
LOG(info, "addPeer(%s, %s) on remote slobrok %s at %s: %s",
myname, myspec, getName().c_str(), getSpec().c_str(), req->GetErrorMessage());
req->SubRef();
@@ -162,6 +237,7 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
_remListReq = nullptr;
// next step is to push the ones I own:
+ maybeStartFetch();
maybePushMine();
} else if (req == _remAddReq) {
// handle response after pushing some name that we managed:
@@ -268,10 +344,12 @@ RemoteSlobrok::notifyOkRpcSrv(ManagedRpcServer *rpcsrv)
_reconnecter.disable();
if (_remote != nullptr) {
+ maybeStartFetch();
// the rest here should only be done on first notifyOk
return;
}
_remote = getSupervisor()->GetTarget(getSpec().c_str());
+ maybeStartFetch();
// at this point, we will do (in sequence):
// ask peer to connect to us too;
diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.h b/slobrok/src/vespa/slobrok/server/remote_slobrok.h
index fa627bbd821..9e3fb1df55f 100644
--- a/slobrok/src/vespa/slobrok/server/remote_slobrok.h
+++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.h
@@ -5,6 +5,7 @@
#include "cmd.h"
#include "i_rpc_server_manager.h"
#include "managed_rpc_server.h"
+#include "service_map_mirror.h"
#include <deque>
namespace slobrok {
@@ -44,6 +45,7 @@ private:
ExchangeManager &_exchanger;
RpcServerManager &_rpcsrvmanager;
FRT_Target *_remote;
+ ServiceMapMirror _serviceMapMirror;
ManagedRpcServer _rpcserver;
Reconnecter _reconnecter;
int _failCnt;
@@ -52,10 +54,12 @@ private:
FRT_RPCRequest *_remListReq;
FRT_RPCRequest *_remAddReq;
FRT_RPCRequest *_remRemReq;
+ FRT_RPCRequest *_remFetchReq;
std::deque<std::unique_ptr<NamedService>> _pending;
void pushMine();
void doPending();
+ void handleFetchResult();
public:
RemoteSlobrok(const RemoteSlobrok&) = delete;
@@ -67,9 +71,11 @@ public:
bool isConnected() const { return (_remote != nullptr); }
void tryConnect();
void maybePushMine();
+ void maybeStartFetch();
void invokeAsync(FRT_RPCRequest *req, double timeout, FRT_IRequestWait *rwaiter);
const std::string & getName() const { return _rpcserver.getName(); }
const std::string & getSpec() const { return _rpcserver.getSpec(); }
+ ServiceMapMirror &remoteMap() { return _serviceMapMirror; }
// interfaces implemented:
void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override;
diff --git a/slobrok/src/vespa/slobrok/server/service_map_mirror.cpp b/slobrok/src/vespa/slobrok/server/service_map_mirror.cpp
new file mode 100644
index 00000000000..8b9a40efed7
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/service_map_mirror.cpp
@@ -0,0 +1,90 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "service_map_mirror.h"
+#include <vespa/log/log.h>
+LOG_SETUP(".slobrok.service_map_mirror");
+
+namespace slobrok {
+
+ServiceMapMirror::ServiceMapMirror()
+ : _map(),
+ _currGen(0),
+ _lock()
+{}
+
+ServiceMapMirror::~ServiceMapMirror() {
+ clear();
+}
+
+void ServiceMapMirror::apply(const MapDiff &diff) {
+ std::lock_guard guard(_lock);
+ LOG(debug, "Applying diff from gen %u", diff.fromGen.getAsInt());
+ LOG_ASSERT(diff.fromGen == _currGen);
+ for (const auto & name : diff.removed) {
+ auto iter = _map.find(name);
+ if (iter != _map.end()) {
+ LOG(debug, "Apply remove %s->%s", name.c_str(), iter->second.c_str());
+ ServiceMapping mapping(name, iter->second);
+ for (auto * listener : _listeners) {
+ listener->remove(mapping);
+ }
+ _map.erase(iter);
+ } else {
+ LOG(debug, "Apply remove %s [already removed]", name.c_str());
+ }
+ }
+ for (const auto & mapping : diff.updated) {
+ LOG(debug, "Apply update %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ auto iter = _map.find(mapping.name);
+ if (iter != _map.end()) {
+ ServiceMapping old{mapping.name, iter->second};
+ iter->second = mapping.spec;
+ for (auto * listener : _listeners) {
+ listener->update(old, mapping);
+ }
+ } else {
+ _map.emplace(mapping.name, mapping.spec);
+ for (auto * listener : _listeners) {
+ listener->add(mapping);
+ }
+ }
+ }
+ LOG(debug, "Apply diff complete to gen %u", diff.toGen.getAsInt());
+ _currGen = diff.toGen;
+}
+
+void ServiceMapMirror::clear() {
+ std::lock_guard guard(_lock);
+ for (const auto & [ k, v ] : _map) {
+ ServiceMapping mapping{k, v};
+ for (auto * listener : _listeners) {
+ listener->remove(mapping);
+ }
+ }
+ _map.clear();
+ _currGen.reset();
+}
+
+ServiceMappingList ServiceMapMirror::allMappings() const {
+ std::lock_guard guard(_lock);
+ ServiceMappingList result;
+ result.reserve(_map.size());
+ for (const auto & [ k, v ] : _map) {
+ result.emplace_back(k, v);
+ }
+ return result;
+}
+
+void ServiceMapMirror::registerListener(MapListener &listener) {
+ std::lock_guard guard(_lock);
+ _listeners.insert(&listener);
+}
+
+void ServiceMapMirror::unregisterListener(MapListener &listener) {
+ std::lock_guard guard(_lock);
+ _listeners.erase(&listener);
+}
+
+
+} // namespace slobrok
+
diff --git a/slobrok/src/vespa/slobrok/server/service_map_mirror.h b/slobrok/src/vespa/slobrok/server/service_map_mirror.h
new file mode 100644
index 00000000000..4cd1c91b7f1
--- /dev/null
+++ b/slobrok/src/vespa/slobrok/server/service_map_mirror.h
@@ -0,0 +1,51 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "service_mapping.h"
+#include "map_diff.h"
+#include "map_source.h"
+#include <vespa/vespalib/util/gencnt.h>
+#include <map>
+#include <mutex>
+#include <set>
+
+namespace slobrok {
+
+/**
+ * @class ServiceMapMirror
+ * @brief Holds a name->spec map which can be incrementally updated
+ **/
+class ServiceMapMirror : public MapSource
+{
+public:
+ using Generation = vespalib::GenCnt;
+
+ ServiceMapMirror();
+ ~ServiceMapMirror();
+
+ /** update according to diff */
+ void apply(const MapDiff &diff);
+
+ /** remove all mappings */
+ void clear();
+
+ const Generation &currentGeneration() const { return _currGen; }
+
+ ServiceMappingList allMappings() const;
+
+ void registerListener(MapListener &listener) override;
+ void unregisterListener(MapListener &listener) override;
+
+private:
+ using Map = std::map<vespalib::string, vespalib::string>;
+ Map _map;
+ Generation _currGen;
+ mutable std::mutex _lock;
+ std::set<MapListener *> _listeners;
+};
+
+//-----------------------------------------------------------------------------
+
+} // namespace slobrok
+