summaryrefslogtreecommitdiffstats
path: root/slobrok
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-06-28 13:14:22 +0000
committerArne Juul <arnej@verizonmedia.com>2021-06-29 06:52:10 +0000
commit2f1ff6b80c7f769c2c50d9889c38efed613a7b9c (patch)
tree379956a169292994f436d699e29eb45dfedd07dc /slobrok
parentdfa07c1710906646b6474b10e59a71600a162a8b (diff)
use ServiceMapHistory
Diffstat (limited to 'slobrok')
-rw-r--r--slobrok/src/vespa/slobrok/server/rpc_server_map.cpp11
-rw-r--r--slobrok/src/vespa/slobrok/server/rpc_server_map.h11
-rw-r--r--slobrok/src/vespa/slobrok/server/rpchooks.cpp2
-rw-r--r--slobrok/src/vespa/slobrok/server/rpcmirror.cpp90
-rw-r--r--slobrok/src/vespa/slobrok/server/rpcmirror.h16
5 files changed, 47 insertions, 83 deletions
diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp b/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp
index e19a91cd78d..0f0fae61d4c 100644
--- a/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp
@@ -67,9 +67,7 @@ RpcServerMap::add(NamedService *rpcsrv)
LOG_ASSERT(_myrpcsrv_map.find(name) == _myrpcsrv_map.end());
removeReservation(name);
-
- LOG_ASSERT(_visible_map.lookup(name) == nullptr);
- _visible_map.addNew(rpcsrv);
+ _visible_map.update(ServiceMapping{name, rpcsrv->getSpec()});
}
void
@@ -82,7 +80,7 @@ RpcServerMap::addNew(std::unique_ptr<ManagedRpcServer> rpcsrv)
if (oldman) {
const ReservedName *oldres = _reservations[name].get();
- const NamedService *oldvis = _visible_map.remove(name);
+ _visible_map.remove(name);
const std::string &spec = rpcsrv->getSpec();
const std::string &oldname = oldman->getName();
@@ -90,11 +88,6 @@ RpcServerMap::addNew(std::unique_ptr<ManagedRpcServer> rpcsrv)
if (spec != oldspec) {
LOG(warning, "internal state problem: adding [%s at %s] but already had [%s at %s]",
name.c_str(), spec.c_str(), oldname.c_str(), oldspec.c_str());
- if (oldvis != oldman.get()) {
- const std::string &n = oldvis->getName();
- const std::string &s = oldvis->getSpec();
- LOG(warning, "BAD: different old visible: [%s at %s]", n.c_str(), s.c_str());
- }
if (oldres != nullptr) {
const std::string &n = oldres->getName();
const std::string &s = oldres->getSpec();
diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_map.h b/slobrok/src/vespa/slobrok/server/rpc_server_map.h
index 2c69c1b8cde..2e49718887d 100644
--- a/slobrok/src/vespa/slobrok/server/rpc_server_map.h
+++ b/slobrok/src/vespa/slobrok/server/rpc_server_map.h
@@ -1,7 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "visible_map.h"
+#include "named_service.h"
+#include "service_map_history.h"
+
+#include <memory>
+#include <string>
+#include <unordered_map>
namespace slobrok {
@@ -23,7 +28,7 @@ class RpcServerMap
private:
using ManagedRpcServerMap = std::unordered_map<std::string, std::unique_ptr<ManagedRpcServer>>;
using ReservedNameMap = std::unordered_map<std::string, std::unique_ptr<ReservedName>>;
- VisibleMap _visible_map;
+ ServiceMapHistory _visible_map;
ManagedRpcServerMap _myrpcsrv_map;
ReservedNameMap _reservations;
@@ -34,7 +39,7 @@ private:
public:
typedef std::vector<const NamedService *> RpcSrvlist;
- VisibleMap& visibleMap() { return _visible_map; }
+ ServiceMapHistory& visibleMap() { return _visible_map; }
ManagedRpcServer *lookupManaged(const std::string & name) const;
diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.cpp b/slobrok/src/vespa/slobrok/server/rpchooks.cpp
index 63b09894686..70a9a0f268a 100644
--- a/slobrok/src/vespa/slobrok/server/rpchooks.cpp
+++ b/slobrok/src/vespa/slobrok/server/rpchooks.cpp
@@ -504,7 +504,7 @@ RPCHooks::rpc_incrementalFetch(FRT_RPCRequest *req)
uint32_t msTimeout = args[1]._intval32;
req->getStash().create<IncrementalFetch>(_env.getSupervisor(), req,
- _rpcsrvmap.visibleMap(), gencnt).invoke(msTimeout);
+ _rpcsrvmap.visibleMap(), gencnt).invoke(msTimeout);
}
diff --git a/slobrok/src/vespa/slobrok/server/rpcmirror.cpp b/slobrok/src/vespa/slobrok/server/rpcmirror.cpp
index 1529ea93d4e..7139dd47838 100644
--- a/slobrok/src/vespa/slobrok/server/rpcmirror.cpp
+++ b/slobrok/src/vespa/slobrok/server/rpcmirror.cpp
@@ -10,101 +10,67 @@ LOG_SETUP(".rpcmirror");
namespace slobrok {
IncrementalFetch::IncrementalFetch(FRT_Supervisor *orb,
- FRT_RPCRequest *req,
- VisibleMap &map,
- vespalib::GenCnt gen)
- : FNET_Task(orb->GetScheduler()),
- _req(req),
- _map(map),
- _gen(gen)
+ FRT_RPCRequest *req,
+ ServiceMapHistory &smh,
+ vespalib::GenCnt gen)
+ : FNET_Task(orb->GetScheduler()),
+ _req(req),
+ _smh(smh),
+ _gen(gen)
{ }
IncrementalFetch::~IncrementalFetch() { }
-void
-IncrementalFetch::completeReq()
-{
- vespalib::GenCnt newgen = _map.genCnt();
- VisibleMap::MapDiff diff;
+void IncrementalFetch::completeReq(MapDiff diff) {
FRT_Values &dst = *_req->GetReturn();
- if (newgen == _gen) { // no change
- dst.AddInt32(_gen.getAsInt());
- } else if (_map.hasHistory(_gen)) {
- diff = _map.history(_gen);
- dst.AddInt32(_gen.getAsInt());
- } else {
- dst.AddInt32(0);
- diff.updated = _map.allVisible();
- }
+ dst.AddInt32(diff.fromGen.getAsInt());
size_t sz = diff.removed.size();
- FRT_StringValue *rem = dst.AddStringArray(sz);
+ FRT_StringValue *rem = dst.AddStringArray(sz);
for (uint32_t i = 0; i < sz; ++i) {
- dst.SetString(&rem[i], diff.removed[i].c_str());
+ dst.SetString(&rem[i], diff.removed[i].c_str());
}
sz = diff.updated.size();
- FRT_StringValue *names = dst.AddStringArray(sz);
- FRT_StringValue *specs = dst.AddStringArray(sz);
+ FRT_StringValue *names = dst.AddStringArray(sz);
+ FRT_StringValue *specs = dst.AddStringArray(sz);
for (uint32_t i = 0; i < sz; ++i) {
- dst.SetString(&names[i], diff.updated[i]->getName().c_str());
- dst.SetString(&specs[i], diff.updated[i]->getSpec().c_str());
+ dst.SetString(&names[i], diff.updated[i].name.c_str());
+ dst.SetString(&specs[i], diff.updated[i].spec.c_str());
}
- dst.AddInt32(newgen.getAsInt());
+ dst.AddInt32(diff.toGen.getAsInt());
+
LOG(debug, "mirrorFetch %p done (gen %d -> gen %d)",
- this, _gen.getAsInt(), newgen.getAsInt());
+ this, diff.fromGen.getAsInt(), diff.toGen.getAsInt());
_req->Return();
}
-
void
IncrementalFetch::PerformTask()
{
- // cancel update notification
- _map.removeUpdateListener(this);
- completeReq();
-}
-
-
-void
-IncrementalFetch::updated(VisibleMap &map)
-{
- LOG_ASSERT(&map == &_map);
- (void) &map;
- // unschedule timeout task
- Unschedule();
- completeReq();
+ if (_smh.cancel(this)) {
+ completeReq(MapDiff(_gen, {}, {}, _gen));
+ }
}
-void
-IncrementalFetch::aborted(VisibleMap &map)
-{
- LOG_ASSERT(&map == &_map);
- (void) &map;
- // unschedule timeout task
- Unschedule();
- _req->SetError(FRTE_RPC_METHOD_FAILED, "slobrok shutting down");
- _req->Return();
+void IncrementalFetch::handle(MapDiff diff) {
+ Kill(); // unschedule timeout task
+ completeReq(std::move(diff));
}
-
void
IncrementalFetch::invoke(uint32_t msTimeout)
{
_req->Detach();
LOG(debug, "IncrementalFetch %p invoked from %s (gen %d, timeout %d ms)",
this, _req->GetConnection()->GetSpec(), _gen.getAsInt(), msTimeout);
- if (_map.genCnt() != _gen || msTimeout == 0) {
- completeReq();
- } else {
- _map.addUpdateListener(this); // register as update listener
- if (msTimeout > 10000)
- msTimeout = 10000;
- Schedule((double) msTimeout / 1000.0);
- }
+ if (msTimeout > 10000)
+ msTimeout = 10000;
+ Schedule(msTimeout * 0.001);
+ _smh.asyncGenerationDiff(this, _gen);
}
} // namespace slobrok
diff --git a/slobrok/src/vespa/slobrok/server/rpcmirror.h b/slobrok/src/vespa/slobrok/server/rpcmirror.h
index ce7ca5fca32..4761e250fd5 100644
--- a/slobrok/src/vespa/slobrok/server/rpcmirror.h
+++ b/slobrok/src/vespa/slobrok/server/rpcmirror.h
@@ -1,8 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "visible_map.h"
+#include "service_map_history.h"
#include <vespa/fnet/task.h>
+#include <vespa/vespalib/util/gencnt.h>
class FRT_RPCRequest;
class FRT_Supervisor;
@@ -10,24 +11,23 @@ class FRT_Supervisor;
namespace slobrok {
class IncrementalFetch : public FNET_Task,
- public VisibleMap::IUpdateListener
+ public ServiceMapHistory::DiffCompletionHandler
{
private:
- FRT_RPCRequest *_req;
- VisibleMap &_map;
+ FRT_RPCRequest *_req;
+ ServiceMapHistory &_smh;
vespalib::GenCnt _gen;
public:
IncrementalFetch(const IncrementalFetch &) = delete;
IncrementalFetch& operator=(const IncrementalFetch &) = delete;
- IncrementalFetch(FRT_Supervisor *orb, FRT_RPCRequest *req, VisibleMap &map, vespalib::GenCnt gen);
+ IncrementalFetch(FRT_Supervisor *orb, FRT_RPCRequest *req, ServiceMapHistory &smh, vespalib::GenCnt gen);
~IncrementalFetch();
- void completeReq();
+ void completeReq(MapDiff diff);
void PerformTask() override;
- void updated(VisibleMap &map) override;
- void aborted(VisibleMap &map) override;
+ void handle(MapDiff diff) override;
void invoke(uint32_t msTimeout);
};