diff options
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpc_server_map.cpp | 11 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpc_server_map.h | 11 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpchooks.cpp | 2 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpcmirror.cpp | 90 | ||||
-rw-r--r-- | slobrok/src/vespa/slobrok/server/rpcmirror.h | 16 |
5 files changed, 47 insertions, 83 deletions
diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp b/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp index e19a91cd78d..0f0fae61d4c 100644 --- a/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp +++ b/slobrok/src/vespa/slobrok/server/rpc_server_map.cpp @@ -67,9 +67,7 @@ RpcServerMap::add(NamedService *rpcsrv) LOG_ASSERT(_myrpcsrv_map.find(name) == _myrpcsrv_map.end()); removeReservation(name); - - LOG_ASSERT(_visible_map.lookup(name) == nullptr); - _visible_map.addNew(rpcsrv); + _visible_map.update(ServiceMapping{name, rpcsrv->getSpec()}); } void @@ -82,7 +80,7 @@ RpcServerMap::addNew(std::unique_ptr<ManagedRpcServer> rpcsrv) if (oldman) { const ReservedName *oldres = _reservations[name].get(); - const NamedService *oldvis = _visible_map.remove(name); + _visible_map.remove(name); const std::string &spec = rpcsrv->getSpec(); const std::string &oldname = oldman->getName(); @@ -90,11 +88,6 @@ RpcServerMap::addNew(std::unique_ptr<ManagedRpcServer> rpcsrv) if (spec != oldspec) { LOG(warning, "internal state problem: adding [%s at %s] but already had [%s at %s]", name.c_str(), spec.c_str(), oldname.c_str(), oldspec.c_str()); - if (oldvis != oldman.get()) { - const std::string &n = oldvis->getName(); - const std::string &s = oldvis->getSpec(); - LOG(warning, "BAD: different old visible: [%s at %s]", n.c_str(), s.c_str()); - } if (oldres != nullptr) { const std::string &n = oldres->getName(); const std::string &s = oldres->getSpec(); diff --git a/slobrok/src/vespa/slobrok/server/rpc_server_map.h b/slobrok/src/vespa/slobrok/server/rpc_server_map.h index 2c69c1b8cde..2e49718887d 100644 --- a/slobrok/src/vespa/slobrok/server/rpc_server_map.h +++ b/slobrok/src/vespa/slobrok/server/rpc_server_map.h @@ -1,7 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "visible_map.h" +#include "named_service.h" +#include "service_map_history.h" + +#include <memory> +#include <string> +#include <unordered_map> namespace slobrok { @@ -23,7 +28,7 @@ class RpcServerMap private: using ManagedRpcServerMap = std::unordered_map<std::string, std::unique_ptr<ManagedRpcServer>>; using ReservedNameMap = std::unordered_map<std::string, std::unique_ptr<ReservedName>>; - VisibleMap _visible_map; + ServiceMapHistory _visible_map; ManagedRpcServerMap _myrpcsrv_map; ReservedNameMap _reservations; @@ -34,7 +39,7 @@ private: public: typedef std::vector<const NamedService *> RpcSrvlist; - VisibleMap& visibleMap() { return _visible_map; } + ServiceMapHistory& visibleMap() { return _visible_map; } ManagedRpcServer *lookupManaged(const std::string & name) const; diff --git a/slobrok/src/vespa/slobrok/server/rpchooks.cpp b/slobrok/src/vespa/slobrok/server/rpchooks.cpp index 63b09894686..70a9a0f268a 100644 --- a/slobrok/src/vespa/slobrok/server/rpchooks.cpp +++ b/slobrok/src/vespa/slobrok/server/rpchooks.cpp @@ -504,7 +504,7 @@ RPCHooks::rpc_incrementalFetch(FRT_RPCRequest *req) uint32_t msTimeout = args[1]._intval32; req->getStash().create<IncrementalFetch>(_env.getSupervisor(), req, - _rpcsrvmap.visibleMap(), gencnt).invoke(msTimeout); + _rpcsrvmap.visibleMap(), gencnt).invoke(msTimeout); } diff --git a/slobrok/src/vespa/slobrok/server/rpcmirror.cpp b/slobrok/src/vespa/slobrok/server/rpcmirror.cpp index 1529ea93d4e..7139dd47838 100644 --- a/slobrok/src/vespa/slobrok/server/rpcmirror.cpp +++ b/slobrok/src/vespa/slobrok/server/rpcmirror.cpp @@ -10,101 +10,67 @@ LOG_SETUP(".rpcmirror"); namespace slobrok { IncrementalFetch::IncrementalFetch(FRT_Supervisor *orb, - FRT_RPCRequest *req, - VisibleMap &map, - vespalib::GenCnt gen) - : FNET_Task(orb->GetScheduler()), - _req(req), - _map(map), - _gen(gen) + FRT_RPCRequest *req, + ServiceMapHistory &smh, + vespalib::GenCnt gen) + : FNET_Task(orb->GetScheduler()), + _req(req), + _smh(smh), + _gen(gen) { } IncrementalFetch::~IncrementalFetch() { } -void -IncrementalFetch::completeReq() -{ - vespalib::GenCnt newgen = _map.genCnt(); - VisibleMap::MapDiff diff; +void IncrementalFetch::completeReq(MapDiff diff) { FRT_Values &dst = *_req->GetReturn(); - if (newgen == _gen) { // no change - dst.AddInt32(_gen.getAsInt()); - } else if (_map.hasHistory(_gen)) { - diff = _map.history(_gen); - dst.AddInt32(_gen.getAsInt()); - } else { - dst.AddInt32(0); - diff.updated = _map.allVisible(); - } + dst.AddInt32(diff.fromGen.getAsInt()); size_t sz = diff.removed.size(); - FRT_StringValue *rem = dst.AddStringArray(sz); + FRT_StringValue *rem = dst.AddStringArray(sz); for (uint32_t i = 0; i < sz; ++i) { - dst.SetString(&rem[i], diff.removed[i].c_str()); + dst.SetString(&rem[i], diff.removed[i].c_str()); } sz = diff.updated.size(); - FRT_StringValue *names = dst.AddStringArray(sz); - FRT_StringValue *specs = dst.AddStringArray(sz); + FRT_StringValue *names = dst.AddStringArray(sz); + FRT_StringValue *specs = dst.AddStringArray(sz); for (uint32_t i = 0; i < sz; ++i) { - dst.SetString(&names[i], diff.updated[i]->getName().c_str()); - dst.SetString(&specs[i], diff.updated[i]->getSpec().c_str()); + dst.SetString(&names[i], diff.updated[i].name.c_str()); + dst.SetString(&specs[i], diff.updated[i].spec.c_str()); } - dst.AddInt32(newgen.getAsInt()); + dst.AddInt32(diff.toGen.getAsInt()); + LOG(debug, "mirrorFetch %p done (gen %d -> gen %d)", - this, _gen.getAsInt(), newgen.getAsInt()); + this, diff.fromGen.getAsInt(), diff.toGen.getAsInt()); _req->Return(); } - void IncrementalFetch::PerformTask() { - // cancel update notification - _map.removeUpdateListener(this); - completeReq(); -} - - -void -IncrementalFetch::updated(VisibleMap &map) -{ - LOG_ASSERT(&map == &_map); - (void) ↦ - // unschedule timeout task - Unschedule(); - completeReq(); + if (_smh.cancel(this)) { + completeReq(MapDiff(_gen, {}, {}, _gen)); + } } -void -IncrementalFetch::aborted(VisibleMap &map) -{ - LOG_ASSERT(&map == &_map); - (void) ↦ - // unschedule timeout task - Unschedule(); - _req->SetError(FRTE_RPC_METHOD_FAILED, "slobrok shutting down"); - _req->Return(); +void IncrementalFetch::handle(MapDiff diff) { + Kill(); // unschedule timeout task + completeReq(std::move(diff)); } - void IncrementalFetch::invoke(uint32_t msTimeout) { _req->Detach(); LOG(debug, "IncrementalFetch %p invoked from %s (gen %d, timeout %d ms)", this, _req->GetConnection()->GetSpec(), _gen.getAsInt(), msTimeout); - if (_map.genCnt() != _gen || msTimeout == 0) { - completeReq(); - } else { - _map.addUpdateListener(this); // register as update listener - if (msTimeout > 10000) - msTimeout = 10000; - Schedule((double) msTimeout / 1000.0); - } + if (msTimeout > 10000) + msTimeout = 10000; + Schedule(msTimeout * 0.001); + _smh.asyncGenerationDiff(this, _gen); } } // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/rpcmirror.h b/slobrok/src/vespa/slobrok/server/rpcmirror.h index ce7ca5fca32..4761e250fd5 100644 --- a/slobrok/src/vespa/slobrok/server/rpcmirror.h +++ b/slobrok/src/vespa/slobrok/server/rpcmirror.h @@ -1,8 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "visible_map.h" +#include "service_map_history.h" #include <vespa/fnet/task.h> +#include <vespa/vespalib/util/gencnt.h> class FRT_RPCRequest; class FRT_Supervisor; @@ -10,24 +11,23 @@ class FRT_Supervisor; namespace slobrok { class IncrementalFetch : public FNET_Task, - public VisibleMap::IUpdateListener + public ServiceMapHistory::DiffCompletionHandler { private: - FRT_RPCRequest *_req; - VisibleMap &_map; + FRT_RPCRequest *_req; + ServiceMapHistory &_smh; vespalib::GenCnt _gen; public: IncrementalFetch(const IncrementalFetch &) = delete; IncrementalFetch& operator=(const IncrementalFetch &) = delete; - IncrementalFetch(FRT_Supervisor *orb, FRT_RPCRequest *req, VisibleMap &map, vespalib::GenCnt gen); + IncrementalFetch(FRT_Supervisor *orb, FRT_RPCRequest *req, ServiceMapHistory &smh, vespalib::GenCnt gen); ~IncrementalFetch(); - void completeReq(); + void completeReq(MapDiff diff); void PerformTask() override; - void updated(VisibleMap &map) override; - void aborted(VisibleMap &map) override; + void handle(MapDiff diff) override; void invoke(uint32_t msTimeout); }; |