diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2017-10-17 12:19:11 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-17 12:19:11 +0200 |
commit | 81aa88dc8eee9ed7573c819d96a4900b402c3456 (patch) | |
tree | efbe93f23b97284d5fb6026b9485e539c0600a63 | |
parent | 6c9b8f942e91a1d9ddf3205ef2817944e94864cb (diff) | |
parent | 08d060f66b380bb85665aff79aadc589ecad6b80 (diff) |
Merge pull request #3779 from vespa-engine/balder/remove-oosmanager-c++
Balder/remove oosmanager c++
47 files changed, 97 insertions, 1815 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index bc7e254a351..42f1014a6c1 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -110,7 +110,6 @@ public final class SessionCache extends AbstractComponent { final RPCNetworkParams netParams = new RPCNetworkParams() .setSlobrokConfigId(slobrokConfigId) .setIdentity(new Identity(identity)) - .setOOSServerPattern("search/cluster.*/rtx/*/clustercontroller") .setListenPort(mbusConfig.port()); return SharedMessageBus.newInstance(mbusParams, netParams); } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java index b0f64002de5..ff237d46b90 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java @@ -120,7 +120,7 @@ public class PolicyTestCase { Slobrok slobrok = new Slobrok(); List<TestServer> servers = new ArrayList<>(); for (int i = 0; i < 10; ++i) { - TestServer server = new TestServer("docproc/cluster.default/" + i, null, slobrok, null, + TestServer server = new TestServer("docproc/cluster.default/" + i, null, slobrok, new DocumentProtocol(manager)); server.net.registerSession("chain.default"); servers.add(server); @@ -146,7 +146,7 @@ public class PolicyTestCase { public void requireThatExternPolicyMergesOneReplyAsProtocol() throws Exception { PolicyTestFrame frame = newPutDocumentFrame("doc:scheme:"); Slobrok slobrok = new Slobrok(); - TestServer server = new TestServer("docproc/cluster.default/0", null, slobrok, null, + TestServer server = new TestServer("docproc/cluster.default/0", null, slobrok, new DocumentProtocol(manager)); server.net.registerSession("chain.default"); setupExternPolicy(frame, slobrok, "docproc/cluster.default/*/chain.default", 1); @@ -159,7 +159,7 @@ public class PolicyTestCase { public void testExternSend() throws Exception { // Setup local source node. Slobrok local = new Slobrok(); - TestServer src = new TestServer("src", null, local, null, new DocumentProtocol(manager)); + TestServer src = new TestServer("src", null, local, new DocumentProtocol(manager)); SourceSession ss = src.mb.createSourceSession(new Receptor(), new SourceSessionParams().setTimeout(TIMEOUT)); // Setup remote cluster with routing config. @@ -168,9 +168,9 @@ public class PolicyTestCase { new RoutingTableSpec(DocumentProtocol.NAME) .addRoute(new RouteSpec("default").addHop("dst")) .addHop(new HopSpec("dst", "dst/session")), - slobrok, null, new DocumentProtocol(manager)); + slobrok, new DocumentProtocol(manager)); IntermediateSession is = itr.mb.createIntermediateSession("session", true, new Receptor(), new Receptor()); - TestServer dst = new TestServer("dst", null, slobrok, null, new DocumentProtocol(manager)); + TestServer dst = new TestServer("dst", null, slobrok, new DocumentProtocol(manager)); DestinationSession ds = dst.mb.createDestinationSession("session", true, new Receptor()); // Send message from local node to remote cluster and resolve route there. @@ -201,14 +201,14 @@ public class PolicyTestCase { @Test public void testExternMultipleSlobroks() throws ListenFailedException { Slobrok local = new Slobrok(); - TestServer srcServer = new TestServer("src", null, local, null, new DocumentProtocol(manager)); + TestServer srcServer = new TestServer("src", null, local, new DocumentProtocol(manager)); SourceSession srcSession = srcServer.mb.createSourceSession(new Receptor(), new SourceSessionParams().setTimeout(TIMEOUT)); Slobrok extern = new Slobrok(); String spec = "tcp/localhost:" + extern.port(); - TestServer dstServer = new TestServer("dst", null, extern, null, new DocumentProtocol(manager)); + TestServer dstServer = new TestServer("dst", null, extern, new DocumentProtocol(manager)); Receptor dstHandler = new Receptor(); DestinationSession dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler); @@ -229,7 +229,7 @@ public class PolicyTestCase { extern = new Slobrok(); spec += ",tcp/localhost:" + extern.port(); - dstServer = new TestServer("dst", null, extern, null, new DocumentProtocol(manager)); + dstServer = new TestServer("dst", null, extern, new DocumentProtocol(manager)); dstHandler = new Receptor(); dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler); diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 3629604aeea..9d38247ebfd 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -38,6 +38,7 @@ using document::DocumentUpdate; using document::readDocumenttypesConfig; using slobrok::api::IMirrorAPI; using namespace documentapi; +using vespalib::make_string; class Test : public vespalib::TestApp { private: @@ -48,8 +49,7 @@ private: private: bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected); bool tryDistribution(TestFrame &frame, const string &id, const string &expected); - void tryWasFound(TestFrame &frame, uint32_t expectedRecipients, - uint32_t foundMask, bool expectedFound); + void tryWasFound(TestFrame &frame, uint32_t expectedRecipients, uint32_t foundMask, bool expectedFound); void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1); StoragePolicy &setupStoragePolicy(TestFrame &frame, const string ¶m, @@ -229,8 +229,7 @@ Test::requireThatExternPolicySelectsFromExternSlobrok() std::vector<mbus::TestServer*> servers; for (uint32_t i = 0; i < 10; ++i) { mbus::TestServer *server = new mbus::TestServer( - mbus::Identity(vespalib::make_string("docproc/cluster.default/%d", i)), - mbus::RoutingSpec(), slobrok, "", + mbus::Identity(make_string("docproc/cluster.default/%d", i)), mbus::RoutingSpec(), slobrok, mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); servers.push_back(server); server->net.registerSession("chain.default"); @@ -257,8 +256,7 @@ Test::requireThatExternPolicyMergesOneReplyAsProtocol() TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("doc:scheme:")); mbus::Slobrok slobrok; - mbus::TestServer server(mbus::Identity("docproc/cluster.default/0"), - mbus::RoutingSpec(), slobrok, "", + mbus::TestServer server(mbus::Identity("docproc/cluster.default/0"), mbus::RoutingSpec(), slobrok, mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); server.net.registerSession("chain.default"); setupExternPolicy(frame, slobrok, "docproc/cluster.default/0/chain.default", 1); @@ -322,9 +320,8 @@ Test::testExternSend() { // Setup local source node. mbus::Slobrok local; - mbus::TestServer src(mbus::Identity("src"), mbus::RoutingSpec(), local, "", - mbus::IProtocol::SP( - new DocumentProtocol(_loadTypes, _repo))); + mbus::TestServer src(mbus::Identity("src"), mbus::RoutingSpec(), local, + std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::Receptor sr; mbus::SourceSession::UP ss = src.mb.createSourceSession(sr, mbus::SourceSessionParams().setTimeout(60)); @@ -333,13 +330,12 @@ Test::testExternSend() .addTable(mbus::RoutingTableSpec(DocumentProtocol::NAME) .addRoute(mbus::RouteSpec("default").addHop("dst")) .addHop(mbus::HopSpec("dst", "dst/session"))), - slobrok, "", mbus::IProtocol::SP( - new DocumentProtocol(_loadTypes, _repo))); + slobrok, std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::Receptor ir; mbus::IntermediateSession::UP is = itr.mb.createIntermediateSession("session", true, ir, ir); - mbus::TestServer dst(mbus::Identity("dst"), mbus::RoutingSpec(), slobrok, "", - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + mbus::TestServer dst(mbus::Identity("dst"), mbus::RoutingSpec(), slobrok, + std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::Receptor dr; mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); @@ -349,14 +345,14 @@ Test::testExternSend() msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:tcp/localhost:%d;itr/session] default", slobrok.port()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); - ASSERT_TRUE((msg = ir.getMessage(600)).get() != NULL); + ASSERT_TRUE((msg = ir.getMessage(600))); is->forward(std::move(msg)); - ASSERT_TRUE((msg = dr.getMessage(600)).get() != NULL); + ASSERT_TRUE((msg = dr.getMessage(600))); ds->acknowledge(std::move(msg)); mbus::Reply::UP reply = ir.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); is->forward(std::move(reply)); - ASSERT_TRUE((reply = sr.getReply(600)).get() != NULL); + ASSERT_TRUE((reply = sr.getReply(600))); fprintf(stderr, "%s", reply->getTrace().toString().c_str()); } @@ -365,8 +361,8 @@ void Test::testExternMultipleSlobroks() { mbus::Slobrok local; - mbus::TestServer src(mbus::Identity("src"), mbus::RoutingSpec(), local, "", - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + mbus::TestServer src(mbus::Identity("src"), mbus::RoutingSpec(), local, + std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::Receptor sr; mbus::SourceSession::UP ss = src.mb.createSourceSession(sr, mbus::SourceSessionParams().setTimeout(60)); @@ -376,33 +372,33 @@ Test::testExternMultipleSlobroks() mbus::Slobrok ext; spec.append(vespalib::make_string("tcp/localhost:%d", ext.port())); - mbus::TestServer dst(mbus::Identity("dst"), mbus::RoutingSpec(), ext, "", - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + mbus::TestServer dst(mbus::Identity("dst"), mbus::RoutingSpec(), ext, + std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); - ASSERT_TRUE((msg = dr.getMessage(600)).get() != NULL); + ASSERT_TRUE((msg = dr.getMessage(600))); ds->acknowledge(std::move(msg)); mbus::Reply::UP reply = sr.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); } { mbus::Slobrok ext; spec.append(vespalib::make_string(",tcp/localhost:%d", ext.port())); - mbus::TestServer dst(mbus::Identity("dst"), mbus::RoutingSpec(), ext, "", - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + mbus::TestServer dst(mbus::Identity("dst"), mbus::RoutingSpec(), ext, + std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); - ASSERT_TRUE((msg = dr.getMessage(600)).get() != NULL); + ASSERT_TRUE((msg = dr.getMessage(600))); ds->acknowledge(std::move(msg)); mbus::Reply::UP reply = sr.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); } } @@ -412,8 +408,7 @@ Test::testLocalService() // Prepare message. TestFrame frame(_repo, "docproc/cluster.default"); frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:")))))); + new Document(*_docType, DocumentId("doc:scheme:")))))); // Test select with proper address. for (uint32_t i = 0; i < 10; ++i) { @@ -993,7 +988,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState() for (uint32_t i = 0; i < 5; ++i) { mbus::TestServer *srv = new mbus::TestServer( mbus::Identity(vespalib::make_string("storage/cluster.mycluster/distributor/%d", i)), - mbus::RoutingSpec(), slobrok, "", + mbus::RoutingSpec(), slobrok, mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); servers.push_back(srv); srv->net.registerSession("default"); @@ -1050,7 +1045,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() for (uint32_t i = 0; i < 5; ++i) { mbus::TestServer *srv = new mbus::TestServer( mbus::Identity(vespalib::make_string("storage/cluster.mycluster/distributor/%d", i)), - mbus::RoutingSpec(), slobrok, "", + mbus::RoutingSpec(), slobrok, mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); servers.push_back(srv); srv->net.registerSession("default"); @@ -1090,7 +1085,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() mbus::Slobrok slobrok; mbus::TestServer server(mbus::Identity("storage/cluster.mycluster/distributor/0"), - mbus::RoutingSpec(), slobrok, "", + mbus::RoutingSpec(), slobrok, mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); server.net.registerSession("default"); @@ -1211,8 +1206,8 @@ Test::testSubsetServiceCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(barFrame.getReceptor().getReply(600)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); } bool @@ -1227,7 +1222,7 @@ Test::trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> } else { frame.select(leaf, 0); } - if(frame.getReceptor().getReply(600).get() == NULL) { + if( ! frame.getReceptor().getReply(600)) { LOG(error, "Reply failed to propagate to reply handler."); return false; } @@ -1252,6 +1247,6 @@ Test::isErrorPolicy(const string &name, const string ¶m) DocumentProtocol protocol(_loadTypes, _repo); mbus::IRoutingPolicy::UP policy = protocol.createPolicy(name, param); - return policy.get() != NULL && dynamic_cast<ErrorPolicy*>(policy.get()) != NULL; + return policy && dynamic_cast<ErrorPolicy*>(policy.get()) != nullptr; } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSClient.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSClient.java deleted file mode 100755 index aff7a40dbe6..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSClient.java +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.jrt.*; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -/** - * This class keeps track of OOS information obtained from a single server. This class is used by the OOSManager class. - * Note that since this class is only used inside the transport thread it has no synchronization. Using it directly will - * lead to race conditions and possible crashes. - * - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSClient implements Runnable, RequestWaiter { - - private Supervisor orb; - private Target target = null; - private Request request = null; - private boolean requestDone = false; - private Spec spec; - private Task task; - private List<String> oosList = new ArrayList<String>(); - private int requestGen = 0; - private int listGen = 0; - private int dumpGen = 0; - private boolean shutdown = false; - - /** - * Create a new OOSClient polling oos information from the given server. - * - * @param orb The object used for RPC operations. - * @param spec The fnet connect spec for oos server. - */ - public OOSClient(Supervisor orb, Spec spec) { - this.orb = orb; - this.spec = spec; - - task = this.orb.transport().createTask(this); - task.scheduleNow(); - } - - /** - * Handle a server reply. - */ - private void handleReply() { - if (!request.checkReturnTypes("Si")) { - if (target != null) { - target.close(); - target = null; - } - task.schedule(1.0); - return; - } - - Values ret = request.returnValues(); - int retGen = ret.get(1).asInt32(); - if (requestGen != retGen) { - List<String> oos = new ArrayList<String>(); - oos.addAll(Arrays.asList(ret.get(0).asStringArray())); - oosList = oos; - requestGen = retGen; - listGen = retGen; - } - task.schedule(0.1); - } - - /** - * Handle server (re)connect. - */ - private void handleConnect() { - if (target == null) { - target = orb.connect(spec); - requestGen = 0; - } - } - - /** - * Handle server invocation. - */ - private void handleInvoke() { - if (target == null) { - throw new IllegalStateException("Attempting to invoke a request on a null target."); - } - request = new Request("fleet.getOOSList"); - request.parameters().add(new Int32Value(requestGen)); - request.parameters().add(new Int32Value(60000)); - target.invokeAsync(request, 70.0, this); - } - - /** - * Implements runnable. Performs overall server poll logic. - */ - public void run() { - if (shutdown) { - task.kill(); - if (target != null) { - target.close(); - } - } else if (requestDone) { - requestDone = false; - handleReply(); - } else { - handleConnect(); - handleInvoke(); - } - } - - /** - * Shut down this OOS client. Invoking this method will take down any active connections and block further activity - * from this object. - */ - public void shutdown() { - shutdown = true; - task.scheduleNow(); - } - - /** - * From FRT_IRequestWait, picks up server replies. - * - * @param request The request that has completed. - */ - public void handleRequestDone(Request request) { - if (request != this.request || requestDone) { - throw new IllegalStateException("Multiple invocations of RequestDone()."); - } - requestDone = true; - task.scheduleNow(); - } - - /** - * Obtain the connect spec of the OOS server this client is talking to. - * - * @return OOS server connect spec - */ - public Spec getSpec() { - return spec; - } - - /** - * Check if this client has changed. A client has changed if it has obtain now information after the dumpState - * method was last invoked. - * - * @return True is this client has changed. - */ - public boolean isChanged() { - return listGen != dumpGen; - } - - /** - * Returns whether or not this client has receieved any reply at all from the server it is connected to. - * - * @return True if initial request has returned. - */ - public boolean isReady() { - return listGen != 0; - } - - /** - * Dump the current oos information known by this client into the given string set. - * - * @param dst The object used to aggregate oos information. - */ - public void dumpState(Set<String> dst) { - dst.addAll(oosList); - dumpGen = listGen; - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSManager.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSManager.java deleted file mode 100755 index b19543b20b8..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSManager.java +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.jrt.Spec; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Task; -import com.yahoo.jrt.slobrok.api.Mirror; - -import java.util.*; - -/** - * This class keeps track of OOS information. A set of servers having OOS information are identified by looking up a - * service pattern in the slobrok. These servers are then polled for information. The information is compiled into a - * local repository for fast lookup. - * - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSManager implements Runnable { - - // An internal flag that indicates whether or not this manager is disabled. This is used to short-circuit any - // requests made when the service pattern is null. - private boolean disabled; - - // Whether or not this manager has received status information from all connected clients. - private boolean ready; - - // The JRT supervisor object. - private final Supervisor orb; - - // The JRT slobrok mirror object. - private final Mirror mirror; - - // A transport task object used for scheduling this. - private Task task; - - // The service pattern used to resolve what services registered in slobrok resolve to OOS servers. - private final String servicePattern; - - // A map of OOS clients that each poll a single OOS server. This map will contain an entry for each service that - // the service pattern resolves to. - private Map<String, OOSClient> clients = Collections.emptyMap(); - - // A set of out-of-service service names. - private volatile Set<String> oosSet; - - // The generation of the current slobrok resolve. - private int slobrokGen = 0; - - // A local copy of the services that the service pattern resolved to after the previous slobrok lookup. This is used - // to avoid updating the internal list every time slobrok's generation differs, but instead only when the service - // pattern resolves to something different. - private List<Mirror.Entry> services; - - /** - * Create a new OOSManager. The given service pattern will be looked up in the given slobrok mirror. The resulting - * set of services will be polled for oos information. - * - * @param orb The object used for RPC operations. - * @param mirror The slobrok mirror. - * @param servicePattern The service pattern for oos servers. - */ - public OOSManager(Supervisor orb, Mirror mirror, String servicePattern) { - this.orb = orb; - this.mirror = mirror; - this.servicePattern = servicePattern; - - disabled = (servicePattern == null || servicePattern.isEmpty()); - ready = disabled; - - if (!disabled) { - task = orb.transport().createTask(this); - task.scheduleNow(); - } - } - - /** - * Method invoked when this object is run as a task. This method will update the oos information held by this - * object. - */ - public void run() { - boolean changed = updateFromSlobrok(); - boolean allOk = mirror.ready(); - for (OOSClient client : clients.values()) { - if (client.isChanged()) { - changed = true; - } - if (!client.isReady()) { - allOk = false; - } - } - if (changed) { - Set<String> oos = new LinkedHashSet<String>(); - for (OOSClient client : clients.values()) { - client.dumpState(oos); - } - oosSet = oos; - } - if (allOk && !ready) { - ready = true; - } - task.schedule(ready ? 1.0 : 0.1); - } - - /** - * This method will check the local slobrok mirror to make sure that its clients are connected to the appropriate - * services. If anything changes this method returns true. - * - * @return True if anything changed. - */ - private boolean updateFromSlobrok() { - if (slobrokGen == mirror.updates()) { - return false; - } - slobrokGen = mirror.updates(); - List<Mirror.Entry> newServices = Arrays.asList(mirror.lookup(servicePattern)); - Collections.sort(newServices, new Comparator<Mirror.Entry>() { - public int compare(Mirror.Entry lhs, Mirror.Entry rhs) { - return lhs.compareTo(rhs); - } - }); - if (newServices.equals(services)) { - return false; - } - Map<String, OOSClient> newClients = new HashMap<String, OOSClient>(); - for (Mirror.Entry service : newServices) { - OOSClient client = clients.remove(service.getSpec()); - if (client == null) { - client = new OOSClient(orb, new Spec(service.getSpec())); - } - newClients.put(service.getSpec(), client); - } - for (OOSClient client : clients.values()) { - client.shutdown(); - } - services = newServices; - clients = newClients; - return true; - } - - /** - * Returns whether or not some initial state has been returned. - * - * @return True, if initial state has been found. - */ - public boolean isReady() { - return ready; - } - - /** - * Returns whether or not the given service has been marked as out of service. - * - * @param service The service to check. - * @return True if the service is out of service. - */ - @SuppressWarnings({ "RedundantIfStatement" }) - public boolean isOOS(String service) { - if (disabled) { - return false; - } - Set<String> s = oosSet; - if (s == null) { - return false; - } - if (!s.contains(service)) { - return false; - } - return true; - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index bb0b7bdd878..99863b449be 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -56,7 +56,6 @@ public class RPCNetwork implements Network, MethodHandler { private static final Logger log = Logger.getLogger(RPCNetwork.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final Identity identity; - private final OOSManager oosManager; private final Supervisor orb; private final RPCTargetPool targetPool; private final RPCServicePool servicePool; @@ -105,7 +104,6 @@ public class RPCNetwork implements Network, MethodHandler { task.jrtTask.scheduleNow(); register = new Register(orb, slobrokConfig.getSlobroks(), identity.getHostname(), listener.port()); mirror = new Mirror(orb, slobrokConfig.getSlobroks()); - oosManager = new OOSManager(orb, mirror, params.getOOSServerPattern()); } /** @@ -141,7 +139,7 @@ public class RPCNetwork implements Network, MethodHandler { @Override public boolean waitUntilReady(double seconds) { for (int i = 0; i < seconds * 100; ++i) { - if (mirror.ready() && oosManager.isReady()) { + if (mirror.ready()) { return true; } try { @@ -305,10 +303,6 @@ public class RPCNetwork implements Network, MethodHandler { * @return Any error encountered, or null. */ public Error resolveServiceAddress(RoutingNode recipient, String serviceName) { - if (oosManager.isOOS(serviceName)) { - return new Error(ErrorCode.SERVICE_OOS, - "The service '" + serviceName + "' has been marked as out of service."); - } RPCServiceAddress ret = servicePool.resolve(serviceName); if (ret == null) { return new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, @@ -402,15 +396,6 @@ public class RPCNetwork implements Network, MethodHandler { return orb; } - /** - * Returns the oos manager object so that it can be manually queried about out-of-service services. - * - * @return The oos manager. - */ - public OOSManager getOOSManager() { - return oosManager; - } - ExecutorService getExecutor() { return executor; } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index d35fbc01d90..0d4cee5a939 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -15,7 +15,6 @@ public class RPCNetworkParams { private Identity identity = new Identity(""); private String slobrokConfigId = "admin/slobrok.0"; private SlobroksConfig slobroksConfig = null; - private String oosServerPattern = ""; private int listenPort = 0; private int maxInputBufferSize = 256 * 1024; private int maxOutputBufferSize = 256 * 1024; @@ -37,7 +36,6 @@ public class RPCNetworkParams { identity = new Identity(params.identity); slobrokConfigId = params.slobrokConfigId; slobroksConfig = params.slobroksConfig; - oosServerPattern = params.oosServerPattern; listenPort = params.listenPort; connectionExpireSecs = params.connectionExpireSecs; maxInputBufferSize = params.maxInputBufferSize; @@ -105,26 +103,6 @@ public class RPCNetworkParams { } /** - * Returns the config id pattern used to lookup OOS servers. - * - * @return The config id. - */ - public String getOOSServerPattern() { - return oosServerPattern; - } - - /** - * Sets the config id pattern used to lookup OOS servers. - * - * @param oosServerPattern The server pattern. - * @return This, to allow chaining. - */ - public RPCNetworkParams setOOSServerPattern(String oosServerPattern) { - this.oosServerPattern = oosServerPattern; - return this; - } - - /** * Returns the port to listen to. * * @return The port. diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSServer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSServer.java deleted file mode 100755 index ce3f5460610..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSServer.java +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc.test; - -import com.yahoo.jrt.*; -import com.yahoo.jrt.slobrok.api.SlobrokList; -import com.yahoo.jrt.slobrok.api.Register; -import com.yahoo.jrt.slobrok.server.Slobrok; - -import java.util.ArrayList; -import java.util.List; - -/** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSServer { - private int getCnt = 1; - private List<String> state = new ArrayList<String>(); - private Supervisor orb; - private Register register; - private Acceptor listener; - - public OOSServer(Slobrok slobrok, String service, OOSState state) { - orb = new Supervisor(new Transport()); - orb.addMethod(new Method("fleet.getOOSList", "ii", "Si", - new MethodHandler() { - public void invoke(Request request) { - rpc_poll(request); - } - }) - .methodDesc("Fetch OOS information.") - .paramDesc(0, "gencnt", "Generation already known by client.") - .paramDesc(1, "timeout", "How many milliseconds to wait for changes before returning if nothing has changed (max=10000).") - .returnDesc(0, "names", "List of services that are OOS (empty if generation has not changed).") - .returnDesc(1, "newgen", "Generation of the returned list.")); - try { - listener = orb.listen(new Spec(0)); - } - catch (ListenFailedException e) { - orb.transport().shutdown().join(); - throw new RuntimeException(e); - } - SlobrokList slist = new SlobrokList(); - slist.setup(new String[] { new Spec("localhost", slobrok.port()).toString() }); - register = new Register(orb, slist, "localhost", listener.port()); - register.registerName(service); - setState(state); - } - - public void shutdown() { - register.shutdown(); - listener.shutdown().join(); - orb.transport().shutdown().join(); - } - - public void setState(OOSState state) { - List<String> newState = new ArrayList<String>(); - for (String service : state.getServices()) { - if (state.isOOS(service)) { - newState.add(service); - } - } - synchronized(this) { - this.state = newState; - if (++getCnt == 0) { - getCnt = 1; - } - } - } - - private void rpc_poll(Request request) { - synchronized(this) { - request.returnValues() - .add(new StringArray(state.toArray(new String[state.size()]))) - .add(new Int32Value(getCnt)); - } - } - - public int getPort() { - return listener.port(); - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSState.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSState.java deleted file mode 100755 index ee9de59f085..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSState.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc.test; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -/** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSState { - private Map<String, Boolean> data = new LinkedHashMap<String, Boolean>(); - - public OOSState add(String service, boolean oos) { - data.put(service, oos); - return this; - } - - public Set<String> getServices() { - return data.keySet(); - } - - public boolean isOOS(String service) { - return data.containsKey(service) && data.get(service); - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/TestServer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/TestServer.java index 1745babe5c3..f36f7612f0e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/TestServer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/TestServer.java @@ -38,15 +38,13 @@ public class TestServer { * @param name The service name prefix for this server. * @param table The routing table spec to be used, may be null for no routing. * @param slobrok The slobrok to register with (local). - * @param oosServerPattern the string pattern for oos servers, may be null for deactivate. * @param protocol The protocol that this server should support in addition to SimpleProtocol. */ - public TestServer(String name, RoutingTableSpec table, Slobrok slobrok, String oosServerPattern, Protocol protocol) { + public TestServer(String name, RoutingTableSpec table, Slobrok slobrok, Protocol protocol) { this(new MessageBusParams().addProtocol(new SimpleProtocol()), new RPCNetworkParams() .setIdentity(new Identity(name)) - .setSlobrokConfigId(getSlobrokConfig(slobrok)) - .setOOSServerPattern(oosServerPattern)); + .setSlobrokConfigId(getSlobrokConfig(slobrok))); if (protocol != null) { mb.putProtocol(protocol); } @@ -153,43 +151,6 @@ public class TestServer { return false; } - /** - * Wait for some service to go out-of-service. - * - * @param service The service to wait for. - * @return Whether or not the service went out-of-service. - */ - public boolean waitOOS(String service) { - return waitState(new OOSState().add(service, true)); - } - - /** - * Wait for a required OOS state. - * - * @param oosState The state to wait for. - * @return Whether or not the required state was reached. - */ - public boolean waitState(OOSState oosState) { - for (int i = 0; i < 1000 && !Thread.currentThread().isInterrupted(); ++i) { - boolean done = true; - for (String service : oosState.getServices()) { - if (net.getOOSManager().isOOS(service) != oosState.isOOS(service)) { - done = false; - } - } - if (done) { - return true; - } - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - // ignore - } - } - return false; - } - public static class VersionedRPCNetwork extends RPCNetwork { private Version version = Vtag.currentVersion; diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java index cc968e0c843..89c641808f5 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java @@ -38,9 +38,9 @@ public class ErrorTestCase { table.addRoute("test", Arrays.asList("itr", "dst")); Slobrok slobrok = new Slobrok(); - TestServer src = new TestServer("test/src", table, slobrok, null, null); - TestServer itr = new TestServer("test/itr", table, slobrok, null, null); - TestServer dst = new TestServer("test/dst", table, slobrok, null, null); + TestServer src = new TestServer("test/src", table, slobrok, null); + TestServer itr = new TestServer("test/itr", table, slobrok, null); + TestServer dst = new TestServer("test/dst", table, slobrok, null); Receptor ss_rr = new Receptor(); SourceSession ss = src.mb.createSourceSession(ss_rr); diff --git a/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java index 96ca309a429..ed765a146c6 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java @@ -56,18 +56,18 @@ public class MessageBusTestCase { Slobrok slobrok = new Slobrok(); List<TestServer> servers = new ArrayList<>(); - TestServer srcServer = new TestServer("feeder", null, slobrok, null, null); + TestServer srcServer = new TestServer("feeder", null, slobrok, null); servers.add(srcServer); SourceSession src = servers.get(0).mb.createSourceSession(new Receptor()); List<IntermediateSession> sessions = new ArrayList<>(); for (int i = 0; i < 10; ++i) { - TestServer server = new TestServer("intermediate/" + i, null, slobrok, null, null); + TestServer server = new TestServer("intermediate/" + i, null, slobrok, null); servers.add(server); sessions.add(server.mb.createIntermediateSession("session", true, new Receptor(), new Receptor())); } - TestServer dstServer = new TestServer("destination", null, slobrok, null, null); + TestServer dstServer = new TestServer("destination", null, slobrok, null); DestinationSession dst = dstServer.mb.createDestinationSession("session", true, new Receptor()); assertTrue(srcServer.waitSlobrok("intermediate/*/session", sessions.size())); diff --git a/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java index 23410a70842..b760ee6d13f 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java @@ -15,8 +15,8 @@ public class RoutableTestCase extends junit.framework.TestCase { public void testMessageContext() throws ListenFailedException, UnknownHostException { Slobrok slobrok = new Slobrok(); - TestServer srcServer = new TestServer("src", null, slobrok, null, null); - TestServer dstServer = new TestServer("dst", null, slobrok, null, null); + TestServer srcServer = new TestServer("src", null, slobrok, null); + TestServer dstServer = new TestServer("dst", null, slobrok, null); SourceSession srcSession = srcServer.mb.createSourceSession( new Receptor(), new SourceSessionParams().setTimeout(600.0)); diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java index 5b0f142c224..6c298f5f3b5 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java @@ -29,8 +29,8 @@ public class ThrottlerTestCase extends junit.framework.TestCase { table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session")); table.addRoute("test", Arrays.asList("dst")); slobrok = new Slobrok(); - src = new TestServer("test/src", table, slobrok, null, null); - dst = new TestServer("test/dst", table, slobrok, null, null); + src = new TestServer("test/src", table, slobrok, null); + dst = new TestServer("test/dst", table, slobrok, null); } public void tearDown() { diff --git a/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java index a88da4c2e42..007f7a70d14 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java @@ -33,9 +33,9 @@ public class TraceTripTestCase extends junit.framework.TestCase { .addRoute("test", Arrays.asList("pxy", "dst")); slobrok = new Slobrok(); - src = new TestServer("test/src", table, slobrok, null, null); - pxy = new TestServer("test/pxy", table, slobrok, null, null); - dst = new TestServer("test/dst", table, slobrok, null, null); + src = new TestServer("test/src", table, slobrok, null); + pxy = new TestServer("test/pxy", table, slobrok, null); + dst = new TestServer("test/dst", table, slobrok, null); } public void tearDown() { diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java index f9db12a77ee..603d8433524 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java @@ -34,9 +34,9 @@ public class BasicNetworkTestCase extends junit.framework.TestCase { table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session")); table.addRoute("test", Arrays.asList("pxy", "dst")); slobrok = new Slobrok(); - src = new TestServer("test/src", table, slobrok, null, null); - pxy = new TestServer("test/pxy", table, slobrok, null, null); - dst = new TestServer("test/dst", table, slobrok, null, null); + src = new TestServer("test/src", table, slobrok, null); + pxy = new TestServer("test/pxy", table, slobrok, null); + dst = new TestServer("test/dst", table, slobrok, null); } public void tearDown() { diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java index a985de2ee08..c9a4c38a682 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java @@ -19,10 +19,10 @@ public class LoadBalanceTestCase extends junit.framework.TestCase { public void testLoadBalance() throws ListenFailedException, UnknownHostException { Slobrok slobrok = new Slobrok(); - TestServer src = new TestServer("src", null, slobrok, null, null); - TestServer dst1 = new TestServer("dst/1", null, slobrok, null, null); - TestServer dst2 = new TestServer("dst/2", null, slobrok, null, null); - TestServer dst3 = new TestServer("dst/3", null, slobrok, null, null); + TestServer src = new TestServer("src", null, slobrok, null); + TestServer dst1 = new TestServer("dst/1", null, slobrok, null); + TestServer dst2 = new TestServer("dst/2", null, slobrok, null); + TestServer dst3 = new TestServer("dst/3", null, slobrok, null); // set up handlers final QueueAdapter sq = new QueueAdapter(); diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java deleted file mode 100755 index a6270f6558d..00000000000 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.network.Identity; -import com.yahoo.messagebus.network.rpc.test.OOSServer; -import com.yahoo.messagebus.network.rpc.test.OOSState; -import com.yahoo.messagebus.network.rpc.test.TestServer; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.test.Receptor; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; - -import java.net.UnknownHostException; - -/** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSTestCase extends junit.framework.TestCase { - - private static class MyServer extends TestServer implements MessageHandler { - DestinationSession session; - - public MyServer(String name, Slobrok slobrok, String oosServerPattern) - throws ListenFailedException, UnknownHostException - { - super(new MessageBusParams().setRetryPolicy(null).addProtocol(new SimpleProtocol()), - new RPCNetworkParams() - .setIdentity(new Identity(name)) - .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)) - .setOOSServerPattern(oosServerPattern)); - session = mb.createDestinationSession("session", true, this); - } - - public boolean destroy() { - session.destroy(); - return super.destroy(); - } - - public void handleMessage(Message msg) { - session.acknowledge(msg); - } - } - - private static void assertError(SourceSession src, String dst, int error) { - Message msg = new SimpleMessage("msg"); - msg.getTrace().setLevel(9); - assertTrue(src.send(msg, Route.parse(dst)).isAccepted()); - Reply reply = ((Receptor) src.getReplyHandler()).getReply(60); - assertNotNull(reply); - System.out.println(reply.getTrace()); - if (error == ErrorCode.NONE) { - assertFalse(reply.hasErrors()); - } else { - assertTrue(reply.hasErrors()); - assertEquals(error, reply.getError(0).getCode()); - } - } - - public void testOOS() throws ListenFailedException, UnknownHostException { - Slobrok slobrok = new Slobrok(); - TestServer srcServer = new TestServer("src", null, slobrok, "oos/*", null); - SourceSession srcSession = srcServer.mb.createSourceSession(new Receptor()); - - MyServer dst1 = new MyServer("dst1", slobrok, null); - MyServer dst2 = new MyServer("dst2", slobrok, null); - MyServer dst3 = new MyServer("dst3", slobrok, null); - MyServer dst4 = new MyServer("dst4", slobrok, null); - MyServer dst5 = new MyServer("dst5", slobrok, null); - assertTrue(srcServer.waitSlobrok("*/session", 5)); - - // Ensure that normal sending is ok. - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.NONE); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Ensure that 2 OOS services report properly. - OOSServer oosServer = new OOSServer(slobrok, "oos/1", new OOSState() - .add("dst2/session", true) - .add("dst3/session", true)); - assertTrue(srcServer.waitSlobrok("oos/*", 1)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst3/session", true))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Ensure that 1 OOS service may come up while other stays down. - oosServer.setState(new OOSState().add("dst2/session", true)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst3/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Add another OOS server and make sure that it works properly. - OOSServer oosServer2 = new OOSServer(slobrok, "oos/2", new OOSState() - .add("dst4/session", true) - .add("dst5/session", true)); - assertTrue(srcServer.waitSlobrok("oos/*", 2)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst4/session", true) - .add("dst5/session", true))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS); - oosServer2.shutdown(); - - // Ensure that shutting down one OOS server will properly propagate. - assertTrue(srcServer.waitSlobrok("oos/*", 1)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst1/session", false) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Now add two new OOS servers and make sure that works too. - OOSServer oosServer3 = new OOSServer(slobrok, "oos/3", new OOSState() - .add("dst2/session", true) - .add("dst4/session", true)); - OOSServer oosServer4 = new OOSServer(slobrok, "oos/4", new OOSState() - .add("dst2/session", true) - .add("dst3/session", true) - .add("dst5/session", true)); - assertTrue(srcServer.waitSlobrok("oos/*", 3)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst3/session", true) - .add("dst4/session", true) - .add("dst5/session", true))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS); - - // Modify the state of the two new servers and make sure it propagates. - oosServer3.setState(new OOSState() - .add("dst2/session", true)); - oosServer4.setState(new OOSState() - .add("dst1/session", true)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst1/session", true) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - oosServer3.shutdown(); - oosServer4.shutdown(); - - // Ensure that shutting down the two latest OOS servers works properly. - assertTrue(srcServer.waitSlobrok("oos/*", 1)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst1/session", false) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - dst2.destroy(); - assertTrue(srcServer.waitSlobrok("*/session", 4)); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - - srcSession.destroy(); - dst1.destroy(); - dst2.destroy(); - dst3.destroy(); - dst4.destroy(); - dst5.destroy(); - } -} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java index 020611b695d..26b32eeb418 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java @@ -97,7 +97,7 @@ public class TargetPoolTestCase extends junit.framework.TestCase { } private RPCServiceAddress registerServer() throws ListenFailedException, UnknownHostException { - servers.add(new TestServer("srv" + servers.size(), null, slobrok, null, null)); + servers.add(new TestServer("srv" + servers.size(), null, slobrok, null)); return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec()); } diff --git a/messagebus/src/tests/CMakeLists.txt b/messagebus/src/tests/CMakeLists.txt index 2f315fda133..cb2a403f55d 100644 --- a/messagebus/src/tests/CMakeLists.txt +++ b/messagebus/src/tests/CMakeLists.txt @@ -13,7 +13,6 @@ add_subdirectory(loadbalance) add_subdirectory(messagebus) add_subdirectory(messageordering) add_subdirectory(messenger) -add_subdirectory(oos) add_subdirectory(protocolrepository) add_subdirectory(queue) add_subdirectory(replygate) diff --git a/messagebus/src/tests/oos/.gitignore b/messagebus/src/tests/oos/.gitignore deleted file mode 100644 index a4771a9176b..00000000000 --- a/messagebus/src/tests/oos/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -.depend -Makefile -oos_test -messagebus_oos_test_app diff --git a/messagebus/src/tests/oos/CMakeLists.txt b/messagebus/src/tests/oos/CMakeLists.txt deleted file mode 100644 index 9fd8e198c58..00000000000 --- a/messagebus/src/tests/oos/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(messagebus_oos_test_app TEST - SOURCES - oos.cpp - DEPENDS - messagebus_messagebus-test - messagebus -) -vespa_add_test(NAME messagebus_oos_test_app NO_VALGRIND COMMAND messagebus_oos_test_app) diff --git a/messagebus/src/tests/oos/DESC b/messagebus/src/tests/oos/DESC deleted file mode 100644 index 16cd7a2f30d..00000000000 --- a/messagebus/src/tests/oos/DESC +++ /dev/null @@ -1 +0,0 @@ -oos test. Take a look at oos.cpp for details. diff --git a/messagebus/src/tests/oos/FILES b/messagebus/src/tests/oos/FILES deleted file mode 100644 index 08cf509e1fd..00000000000 --- a/messagebus/src/tests/oos/FILES +++ /dev/null @@ -1 +0,0 @@ -oos.cpp diff --git a/messagebus/src/tests/oos/oos.cpp b/messagebus/src/tests/oos/oos.cpp deleted file mode 100644 index 7606c291877..00000000000 --- a/messagebus/src/tests/oos/oos.cpp +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/messagebus/destinationsession.h> -#include <vespa/messagebus/errorcode.h> -#include <vespa/messagebus/intermediatesession.h> -#include <vespa/messagebus/messagebus.h> -#include <vespa/messagebus/routablequeue.h> -#include <vespa/messagebus/routing/routingspec.h> -#include <vespa/messagebus/sourcesession.h> -#include <vespa/messagebus/sourcesessionparams.h> -#include <vespa/messagebus/testlib/oosserver.h> -#include <vespa/messagebus/testlib/receptor.h> -#include <vespa/messagebus/testlib/simplemessage.h> -#include <vespa/messagebus/testlib/simpleprotocol.h> -#include <vespa/messagebus/testlib/simplereply.h> -#include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/messagebus/testlib/testserver.h> -#include <vespa/vespalib/testkit/testapp.h> - -using namespace mbus; - -struct Handler : public IMessageHandler -{ - DestinationSession::UP session; - Handler(MessageBus &mb) : session() { - session = mb.createDestinationSession("session", true, *this); - } - ~Handler() { - session.reset(); - } - void handleMessage(Message::UP msg) override { - session->acknowledge(std::move(msg)); - } -}; - - -class Test : public vespalib::TestApp { -private: - SourceSession::UP _session; - RoutableQueue _handler; - - bool checkError(const string &dst, uint32_t error); - -public: - Test(); - ~Test(); - int Main() override; -}; - -TEST_APPHOOK(Test); - -Test::Test() : - _session(), - _handler() -{} - -Test::~Test() {} -bool -Test::checkError(const string &dst, uint32_t error) -{ - if (!EXPECT_TRUE(_session.get() != NULL)) { - return false; - } - Message::UP msg(new SimpleMessage("msg")); - msg->getTrace().setLevel(9); - if (!EXPECT_TRUE(_session->send(std::move(msg), Route::parse(dst)).isAccepted())) { - return false; - } - Routable::UP reply = _handler.dequeue(10000); - if (!EXPECT_TRUE(reply.get() != NULL)) { - return false; - } - if (!EXPECT_TRUE(reply->isReply())) { - return false; - } - Reply &ref = static_cast<Reply&>(*reply); - printf("%s", ref.getTrace().toString().c_str()); - if (error == ErrorCode::NONE) { - if (!EXPECT_TRUE(!ref.hasErrors())) { - return false; - } - } else { - if (!EXPECT_TRUE(ref.hasErrors())) { - return false; - } - if (!EXPECT_EQUAL(error, ref.getError(0).getCode())) { - return false; - } - } - return true; -} - -int -Test::Main() -{ - TEST_INIT("oos_test"); - - Slobrok slobrok; - TestServer src(Identity(""), RoutingSpec(), slobrok, "oos/*"); - TestServer dst1(Identity("dst1"), RoutingSpec(), slobrok); - TestServer dst2(Identity("dst2"), RoutingSpec(), slobrok); - TestServer dst3(Identity("dst3"), RoutingSpec(), slobrok); - TestServer dst4(Identity("dst4"), RoutingSpec(), slobrok); - TestServer dst5(Identity("dst5"), RoutingSpec(), slobrok); - Handler h1(dst1.mb); - Handler h2(dst2.mb); - Handler h3(dst3.mb); - Handler h4(dst4.mb); - Handler h5(dst5.mb); - EXPECT_TRUE(src.waitSlobrok("*/session", 5)); - - _session = src.mb.createSourceSession(_handler); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE)); - TEST_FLUSH(); - OOSServer oosServer(slobrok, "oos/1", OOSState() - .add("dst2/session") - .add("dst3/session")); - EXPECT_TRUE(src.waitSlobrok("oos/*", 1)); - EXPECT_TRUE(src.waitState(OOSState() - .add("dst2/session") - .add("dst3/session"))); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); // test 9 - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); // return without reply?!? - EXPECT_TRUE(checkError("dst3/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE)); - TEST_FLUSH(); - oosServer.setState(OOSState() - .add("dst2/session")); - EXPECT_TRUE(src.waitState(OOSState() - .add("dst2/session", true) - .add("dst3/session", false))); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE)); - TEST_FLUSH(); - { - OOSServer oosServer2(slobrok, "oos/2", OOSState() - .add("dst4/session") - .add("dst5/session")); - EXPECT_TRUE(src.waitSlobrok("oos/*", 2)); - EXPECT_TRUE(src.waitState(OOSState() - .add("dst2/session") - .add("dst4/session") - .add("dst5/session"))); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::SERVICE_OOS)); - TEST_FLUSH(); - } - EXPECT_TRUE(src.waitSlobrok("oos/*", 1)); - EXPECT_TRUE(src.waitState(OOSState() - .add("dst1/session", false) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE)); - TEST_FLUSH(); - { - OOSServer oosServer3(slobrok, "oos/3", OOSState() - .add("dst2/session") - .add("dst4/session")); - OOSServer oosServer4(slobrok, "oos/4", OOSState() - .add("dst2/session") - .add("dst3/session") - .add("dst5/session")); - EXPECT_TRUE(src.waitSlobrok("oos/*", 3)); - EXPECT_TRUE(src.waitState(OOSState() - .add("dst2/session") - .add("dst3/session") - .add("dst4/session") - .add("dst5/session"))); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst3/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::SERVICE_OOS)); - TEST_FLUSH(); - oosServer3.setState(OOSState() - .add("dst2/session")); - oosServer4.setState(OOSState() - .add("dst1/session")); - EXPECT_TRUE(src.waitState(OOSState() - .add("dst1/session", true) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE)); - TEST_FLUSH(); - } - EXPECT_TRUE(src.waitSlobrok("oos/*", 1)); - EXPECT_TRUE(src.waitState(OOSState() - .add("dst1/session", false) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); - EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE)); - EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE)); - - h2.session.reset(); - EXPECT_TRUE(src.waitSlobrok("*/session", 4)); - EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); - - _session.reset(); - TEST_DONE(); -} diff --git a/messagebus/src/tests/oospolicy/.gitignore b/messagebus/src/tests/oospolicy/.gitignore deleted file mode 100644 index 3bd6e47e0bc..00000000000 --- a/messagebus/src/tests/oospolicy/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -.depend -Makefile -oospolicy_test diff --git a/messagebus/src/vespa/messagebus/network/CMakeLists.txt b/messagebus/src/vespa/messagebus/network/CMakeLists.txt index 750ff20240f..4c8e146b8eb 100644 --- a/messagebus/src/vespa/messagebus/network/CMakeLists.txt +++ b/messagebus/src/vespa/messagebus/network/CMakeLists.txt @@ -2,8 +2,6 @@ vespa_add_library(messagebus_network OBJECT SOURCES identity.cpp - oosclient.cpp - oosmanager.cpp rpcnetwork.cpp rpcnetworkparams.cpp rpcsend.cpp diff --git a/messagebus/src/vespa/messagebus/network/oosclient.cpp b/messagebus/src/vespa/messagebus/network/oosclient.cpp deleted file mode 100644 index d9f67101ea2..00000000000 --- a/messagebus/src/vespa/messagebus/network/oosclient.cpp +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "oosclient.h" -#include <vespa/fnet/frt/supervisor.h> - -namespace mbus { - -void -OOSClient::handleReply() -{ - if (!_req->CheckReturnTypes("Si")) { - _target->SubRef(); - _target = 0; - Schedule(1.0); - return; - } - FRT_Values &ret = *(_req->GetReturn()); - uint32_t retGen = ret[1]._intval32; - if (_reqGen != retGen) { - StringList oos; - uint32_t numNames = ret[0]._string_array._len; - FRT_StringValue *names = ret[0]._string_array._pt; - for (uint32_t idx = 0; idx < numNames; ++idx) { - oos.push_back(string(names[idx]._str)); - } - _oosList.swap(oos); - _reqGen = retGen; - _listGen = retGen; - } - Schedule(0.1); -} - -void -OOSClient::handleConnect() -{ - if (_target == 0) { - _target = _orb.GetTarget(_spec.c_str()); - _reqGen = 0; - } -} - -void -OOSClient::handleInvoke() -{ - assert(_target != 0); - _req = _orb.AllocRPCRequest(_req); - _req->SetMethodName("fleet.getOOSList"); - _req->GetParams()->AddInt32(_reqGen); // gencnt - _req->GetParams()->AddInt32(60000); // mstimeout - _target->InvokeAsync(_req, 70.0, this); -} - -void -OOSClient::PerformTask() -{ - if (_reqDone) { - _reqDone = false; - handleReply(); - return; - } - handleConnect(); - handleInvoke(); -} - -void -OOSClient::RequestDone(FRT_RPCRequest *req) -{ - assert(req == _req && !_reqDone); - (void) req; - _reqDone = true; - ScheduleNow(); -} - -OOSClient::OOSClient(FRT_Supervisor &orb, - const string &mySpec) - : FNET_Task(orb.GetScheduler()), - _orb(orb), - _spec(mySpec), - _oosList(), - _reqGen(0), - _listGen(0), - _dumpGen(0), - _reqDone(false), - _target(0), - _req(0) -{ - ScheduleNow(); -} - -OOSClient::~OOSClient() -{ - Kill(); - if (_req != 0) { - _req->Abort(); - _req->SubRef(); - } - if (_target != 0) { - _target->SubRef(); - } -} - -void -OOSClient::dumpState(StringSet &dst) -{ - dst.insert(_oosList.begin(), _oosList.end()); - _dumpGen = _listGen; -} - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/oosclient.h b/messagebus/src/vespa/messagebus/network/oosclient.h deleted file mode 100644 index 80cf6015783..00000000000 --- a/messagebus/src/vespa/messagebus/network/oosclient.h +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/fnet/frt/invoker.h> -#include <vespa/fnet/frt/target.h> -#include <vespa/messagebus/common.h> -#include <vector> -#include <set> - -namespace mbus { - -/** - * This class keeps track of OOS information obtained from a single - * server. This class is used by the OOSManager class. Note that since - * this class is only used inside the transport thread it has no - * synchronization. Using it directly will lead to race conditions and - * possible crashes. - **/ -class OOSClient : public FNET_Task, - public FRT_IRequestWait -{ -private: - typedef std::vector<string> StringList; - - FRT_Supervisor &_orb; - string _spec; - StringList _oosList; - uint32_t _reqGen; // server gen used for request - uint32_t _listGen; // server gen of the oosList - uint32_t _dumpGen; // server gen used for the last dump - bool _reqDone; - FRT_Target *_target; - FRT_RPCRequest *_req; - - OOSClient(const OOSClient &); - OOSClient &operator=(const OOSClient &); - - /** - * Handle a server reply. - **/ - void handleReply(); - - /** - * Handle server (re)connect. - **/ - void handleConnect(); - - /** - * Handle server invocation. - **/ - void handleInvoke(); - - /** - * From FNET_Task, performs overall server poll logic. - **/ - void PerformTask() override; - - /** - * From FRT_IRequestWait, picks up server replies. - * - * @param req the request that has completed - **/ - void RequestDone(FRT_RPCRequest *req) override; - -public: - /** - * Data structure used to aggregate OOS information - **/ - typedef std::set<string> StringSet; - - /** - * Convenience typedef for a shared pointer to a OOSClient object. - **/ - typedef std::shared_ptr<OOSClient> SP; - - /** - * Create a new OOSClient polling oos information from the given - * server. - * - * @param orb object used for RPC operations - * @param spec fnet connect spec for oos server - **/ - OOSClient(FRT_Supervisor &orb, const string &spec); - - /** - * Destructor. - **/ - virtual ~OOSClient(); - - /** - * Obtain the connect spec of the OOS server this client is - * talking to. - * - * @return OOS server connect spec - **/ - const string &getSpec() const { return _spec; } - - /** - * Check if this client has changed. A client has changed if it - * has obtain now information after the dumpState method was last - * invoked. - * - * @return true is this client has changed - **/ - bool isChanged() const { return (_listGen != _dumpGen); } - - /** - * Returns whether or not this client has receieved any reply - * at all from the server it is connected to. - * - * @return True if initial request has returned. - */ - bool isReady() const { return _listGen != 0; } - - /** - * Dump the current oos information known by this client into the - * given string set. - * - * @param dst object used to aggregate oos information - **/ - void dumpState(StringSet &dst); -}; - -} // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.cpp b/messagebus/src/vespa/messagebus/network/oosmanager.cpp deleted file mode 100644 index 250df147675..00000000000 --- a/messagebus/src/vespa/messagebus/network/oosmanager.cpp +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "oosmanager.h" -#include "rpcnetwork.h" -#include <algorithm> -#include <vespa/fnet/frt/frt.h> - -namespace mbus { - -OOSClient::SP -OOSManager::getClient(const string &spec) -{ - for (uint32_t i = 0; i < _clients.size(); ++i) { - if (_clients[i]->getSpec() == spec) { - return _clients[i]; - } - } - return OOSClient::SP(new OOSClient(_orb, spec)); -} - -void -OOSManager::PerformTask() -{ - bool changed = false; - if (_slobrokGen != _mirror.updates()) { - _slobrokGen = _mirror.updates(); - SpecList newServices = _mirror.lookup(_servicePattern); - std::sort(newServices.begin(), newServices.end()); - if (newServices != _services) { - ClientList newClients; - for (uint32_t i = 0; i < newServices.size(); ++i) { - newClients.push_back(getClient(newServices[i].second)); - } - _services.swap(newServices); - _clients.swap(newClients); - changed = true; - } - } - bool allOk = _mirror.ready(); - for (uint32_t i = 0; i < _clients.size(); ++i) { - if (_clients[i]->isChanged()) { - changed = true; - } - if (!_clients[i]->isReady()) { - allOk = false; - } - } - if (changed) { - OOSSet oos(new StringSet()); - for (uint32_t i = 0; i < _clients.size(); ++i) { - _clients[i]->dumpState(*oos); - } - vespalib::LockGuard guard(_lock); - _oosSet.swap(oos); - } - if (allOk && !_ready) { - _ready = true; - } - Schedule(_ready ? 1.0 : 0.1); -} - -OOSManager::OOSManager(FRT_Supervisor &orb, - IMirrorAPI &mirror, - const string &servicePattern) - : FNET_Task(orb.GetScheduler()), - _orb(orb), - _mirror(mirror), - _disabled(servicePattern.empty()), - _ready(_disabled), - _lock("mbus::OOSManager::_lock", false), - _servicePattern(servicePattern), - _slobrokGen(0), - _clients(), - _oosSet() -{ - if (!_disabled) { - ScheduleNow(); - } -} - -OOSManager::~OOSManager() -{ - Kill(); -} - -bool -OOSManager::isOOS(const string &service) -{ - if (_disabled) { - return false; - } - vespalib::LockGuard guard(_lock); - if (_oosSet.get() == nullptr) { - return false; - } - if (_oosSet->find(service) == _oosSet->end()) { - return false; - } - return true; -} - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.h b/messagebus/src/vespa/messagebus/network/oosmanager.h deleted file mode 100644 index eac00b93896..00000000000 --- a/messagebus/src/vespa/messagebus/network/oosmanager.h +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "oosclient.h" -#include <vespa/fnet/task.h> -#include <vespa/slobrok/imirrorapi.h> -#include <vespa/vespalib/util/sync.h> -#include <set> - -class FRT_Supervisor; - -namespace mbus { - -class RPCNetwork; - -/** - * This class keeps track of OOS information. A set of servers having OOS information are identified by looking up a - * service pattern in the slobrok. These servers are then polled for information. The information is compiled into a - * local repository for fast lookup. - */ -class OOSManager : public FNET_Task { -public: - using IMirrorAPI = slobrok::api::IMirrorAPI; - using SpecList = IMirrorAPI::SpecList; - using ClientList = std::vector<OOSClient::SP>; - using StringSet = std::set<string>; - using OOSSet = std::shared_ptr<StringSet>; - -private: - FRT_Supervisor &_orb; - IMirrorAPI &_mirror; - bool _disabled; - bool _ready; - vespalib::Lock _lock; - string _servicePattern; - uint32_t _slobrokGen; - SpecList _services; - ClientList _clients; - OOSSet _oosSet; - - /** - * Reuse or create a client against the given server. - * - * @param spec The connection spec of the OOS server we want to talk to. - * @return A shared oosclient object. - */ - OOSClient::SP getClient(const string &spec); - - /** - * Method invoked when this object is run as a task. This method will update the oos information held by - * this object. - */ - void PerformTask() override; - -public: - /** - * Create a new OOSManager. The given service pattern will be looked up in the given slobrok mirror. The - * resulting set of services will be polled for oos information. - * - * @param orb The supervisor used for RPC operations. - * @param mirror The slobrok mirror. - * @param servicePattern The service pattern for oos servers. - */ - OOSManager(FRT_Supervisor &orb, - IMirrorAPI &mirror, - const string &servicePattern); - - /** - * Destructor. - */ - virtual ~OOSManager(); - - /** - * Returns whether or not some initial state has been returned. - * - * @return True, if initial state has been found. - */ - bool isReady() const { return _ready; } - - /** - * Returns whether or not the given service has been marked as out of service. - * - * @param service The service to check. - * @return True if the service is out of service. - */ - bool isOOS(const string &service); -}; - -} // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 8ff7ac87edc..fae8d73d540 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpcnetwork.h" #include "rpcservicepool.h" -#include "oosmanager.h" #include "rpcsendv1.h" #include "rpcsendv2.h" #include "rpctargetpool.h" @@ -19,11 +18,13 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> +#include <thread> #include <vespa/log/log.h> LOG_SETUP(".rpcnetwork"); using vespalib::make_string; +using namespace std::chrono_literals; namespace { @@ -44,7 +45,7 @@ public: _gate() { ScheduleNow(); } - ~SyncTask() {} + ~SyncTask() = default; void await() { _gate.await(); @@ -106,7 +107,7 @@ RPCNetwork::TargetPoolTask::PerformTask() } RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : - _owner(0), + _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), _transport(std::make_unique<FNET_Transport>()), @@ -118,7 +119,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), - _oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())), _requestedPort(params.getListenPort()), _executor(std::make_unique<vespalib::ThreadStackExecutor>(4,65536)), _sendV1(std::make_unique<RPCSendV1>()), @@ -177,7 +177,7 @@ RPCNetwork::getVersion() const void RPCNetwork::attach(INetworkOwner &owner) { - LOG_ASSERT(_owner == 0); + LOG_ASSERT(_owner == nullptr); _owner = &owner; _sendV1->attach(*this); @@ -239,19 +239,16 @@ RPCNetwork::waitUntilReady(double seconds) const if (configurator->poll()) { hasConfig = true; } - if (_mirror->ready() && _oosManager->isReady()) { + if (_mirror->ready()) { return true; } - FastOS_Thread::Sleep(10); + std::this_thread::sleep_for(10ms); } if (! hasConfig) { LOG(error, "failed to get config for slobroks in %d seconds", (int)seconds); } else if (! _mirror->ready()) { - std::string brokers = brokerList.logString(); - LOG(error, "mirror (of %s) failed to become ready in %d seconds", - brokers.c_str(), (int)seconds); - } else if (! _oosManager->isReady()) { - LOG(error, "OOS manager failed to become ready in %d seconds", (int)seconds); + auto brokers = brokerList.logString(); + LOG(error, "mirror (of %s) failed to become ready in %d seconds", brokers.c_str(), (int)seconds); } return false; } @@ -259,9 +256,8 @@ RPCNetwork::waitUntilReady(double seconds) const void RPCNetwork::registerSession(const string &session) { - if (_ident.getServicePrefix().size() == 0) { - LOG(warning, "The session (%s) will not be registered" - "in the Slobrok since this network has no identity.", + if (_ident.getServicePrefix().empty()) { + LOG(warning, "The session (%s) will not be registered in the Slobrok since this network has no identity.", session.c_str()); return; } @@ -274,7 +270,7 @@ RPCNetwork::registerSession(const string &session) void RPCNetwork::unregisterSession(const string &session) { - if (_ident.getServicePrefix().size() == 0) { + if (_ident.getServicePrefix().empty()) { return; } string name = _ident.getServicePrefix(); @@ -299,13 +295,8 @@ RPCNetwork::allocServiceAddress(RoutingNode &recipient) Error RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceName) { - if (_oosManager->isOOS(serviceName)) { - return Error(ErrorCode::SERVICE_OOS, - make_string("The service '%s' has been marked as out of service.", - serviceName.c_str())); - } RPCServiceAddress::UP ret = _servicePool->resolve(serviceName); - if (ret.get() == nullptr) { + if ( ! ret) { return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE, make_string("The address of service '%s' could not be resolved. It is not currently " "registered with the Vespa name server. " @@ -313,7 +304,7 @@ RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceN serviceName.c_str())); } RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret); - if (target.get() == nullptr) { + if ( ! target) { return Error(ErrorCode::CONNECTION_ERROR, make_string("Failed to connect to service '%s'.", serviceName.c_str())); } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 13fab018c3b..e29d01c8b04 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -23,7 +23,6 @@ namespace slobrok { namespace mbus { -class OOSManager; class RPCServicePool; class RPCTargetPool; class RPCNetworkParams; @@ -72,7 +71,6 @@ private: std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - std::unique_ptr<OOSManager> _oosManager; int _requestedPort; std::unique_ptr<vespalib::ThreadStackExecutor> _executor; std::unique_ptr<RPCSendAdapter> _sendV1; @@ -187,14 +185,6 @@ public: FNET_Scheduler &getScheduler() { return _scheduler; } /** - * Obtain a reference to the internal OOS manager object. This will be - * mostly used for testing. - * - * @return internal OOS manager - **/ - OOSManager &getOOSManager() { return *_oosManager; } - - /** * Obtain a reference to the internal supervisor. This is used by * the request adapters to register FRT methods. * diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index df35d51cb54..2ea18b952d4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -7,7 +7,6 @@ namespace mbus { RPCNetworkParams::RPCNetworkParams() : _identity(Identity("")), _slobrokConfig("admin/slobrok.0"), - _oosServerPattern(""), _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index bfc624a6523..0a4ed806c27 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -16,7 +16,6 @@ private: using CompressionConfig = vespalib::compression::CompressionConfig; Identity _identity; config::ConfigUri _slobrokConfig; - string _oosServerPattern; int _listenPort; uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; @@ -78,26 +77,6 @@ public: } /** - * Returns the config id pattern used to lookup OOS servers. - * - * @return The config id. - */ - const string &getOOSServerPattern() const { - return _oosServerPattern; - } - - /** - * Sets the config id pattern used to lookup OOS servers. - * - * @param oosServerPattern The server pattern. - * @return This, to allow chaining. - */ - RPCNetworkParams &setOOSServerPattern(const string &oosServerPattern) { - _oosServerPattern = oosServerPattern; - return *this; - } - - /** * Returns the port to listen to. * * @return The port. diff --git a/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt b/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt index 83106db43ce..cfc47b2db25 100644 --- a/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt +++ b/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt @@ -2,8 +2,6 @@ vespa_add_library(messagebus_messagebus-test SOURCES custompolicy.cpp - oosserver.cpp - oosstate.cpp receptor.cpp simplemessage.cpp simpleprotocol.cpp diff --git a/messagebus/src/vespa/messagebus/testlib/oosserver.cpp b/messagebus/src/vespa/messagebus/testlib/oosserver.cpp deleted file mode 100644 index a3d3a580666..00000000000 --- a/messagebus/src/vespa/messagebus/testlib/oosserver.cpp +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "oosserver.h" -#include "slobrok.h" - -namespace mbus { - -OOSServer::OOSServer(const Slobrok &slobrok, const string service, - const OOSState &state) - : _lock("mbus::OOSServer::_lock", false), - _orb(), - _port(0), - _regAPI(_orb, slobrok::ConfiguratorFactory(slobrok.config())), - _genCnt(1), - _state() -{ - setState(state); - { - FRT_ReflectionBuilder rb(&_orb); - //------------------------------------------------------------------- - rb.DefineMethod("fleet.getOOSList", "ii", "Si", true, - FRT_METHOD(OOSServer::rpc_poll), this); - rb.MethodDesc("fetch OOS information"); - rb.ParamDesc("gencnt", "generation already known by client"); - rb.ParamDesc("timeout", "How many milliseconds to wait for changes " - "before returning if nothing has changed (max=10000)"); - rb.ReturnDesc("names", "list of services that are OOS " - "(empty if generation has not changed)"); - rb.ReturnDesc("newgen", "generation of the returned list"); - //------------------------------------------------------------------- - } - _orb.Listen(0); - _port = _orb.GetListenPort(); - _orb.Start(); - _regAPI.registerName(service); -} - -OOSServer::~OOSServer() -{ - _orb.ShutDown(true); -} - -int -OOSServer::port() const -{ - return _port; -} - -void -OOSServer::rpc_poll(FRT_RPCRequest *req) -{ - vespalib::LockGuard guard(_lock); - FRT_Values &dst = *req->GetReturn(); - FRT_StringValue *names = dst.AddStringArray(_state.size()); - for (uint32_t i = 0; i < _state.size(); ++i) { - dst.SetString(&names[i], _state[i].c_str()); - } - dst.AddInt32(_genCnt); -} - -void -OOSServer::setState(const OOSState &state) -{ - std::vector<string> newState; - for (OOSState::ITR itr = state.begin(); - itr != state.end(); ++itr) - { - if (itr->second) { - newState.push_back(itr->first); - } - } - vespalib::LockGuard guard(_lock); - _state = newState; - ++_genCnt; - if (_genCnt == 0) { - ++_genCnt; - } -} - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/oosserver.h b/messagebus/src/vespa/messagebus/testlib/oosserver.h deleted file mode 100644 index 512b3c43e7a..00000000000 --- a/messagebus/src/vespa/messagebus/testlib/oosserver.h +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/vespalib/util/sync.h> -#include <vespa/slobrok/sbregister.h> -#include <vespa/fnet/frt/supervisor.h> -#include <string> -#include <vector> -#include "oosstate.h" - -namespace mbus { - -class Slobrok; - -class OOSServer : public FRT_Invokable -{ -private: - OOSServer(const OOSServer &); - OOSServer &operator=(const OOSServer &); - - vespalib::Lock _lock; - FRT_Supervisor _orb; - int _port; - slobrok::api::RegisterAPI _regAPI; - uint32_t _genCnt; - std::vector<string> _state; - -public: - OOSServer(const Slobrok &slobrok, const string service, - const OOSState &state = OOSState()); - ~OOSServer(); - int port() const; - void rpc_poll(FRT_RPCRequest *req); - void setState(const OOSState &state); -}; - -} // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/testlib/oosstate.cpp b/messagebus/src/vespa/messagebus/testlib/oosstate.cpp deleted file mode 100644 index d4258a31ff0..00000000000 --- a/messagebus/src/vespa/messagebus/testlib/oosstate.cpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "oosstate.h" - -namespace mbus { - -OOSState::OOSState() - : _data() -{ } - -OOSState & -OOSState::add(const string &service, bool oos) -{ - _data.push_back(std::make_pair(service, oos)); - return *this; -} - -OOSState::ITR -OOSState::begin() const -{ - return _data.begin(); -} - -OOSState::ITR -OOSState::end() const -{ - return _data.end(); -} - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/oosstate.h b/messagebus/src/vespa/messagebus/testlib/oosstate.h deleted file mode 100644 index 9dfc58d4070..00000000000 --- a/messagebus/src/vespa/messagebus/testlib/oosstate.h +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vector> -#include <vespa/messagebus/common.h> - -namespace mbus { - -class OOSState -{ -public: - typedef std::vector<std::pair<string, bool> > TYPE; - typedef TYPE::const_iterator ITR; - -private: - TYPE _data; - -public: - OOSState(); - OOSState &add(const string &service, bool oos = true); - ITR begin() const; - ITR end() const; -}; - -} // namespace mbus - diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp index dbc741f2dd4..a2489aac9ce 100644 --- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp +++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp @@ -1,10 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "testserver.h" -#include "oosstate.h" #include "simpleprotocol.h" #include "slobrok.h" #include "slobrokstate.h" -#include <vespa/messagebus/network/oosmanager.h> #include <vespa/vespalib/component/vtag.h> namespace mbus { @@ -24,12 +22,10 @@ VersionedRPCNetwork::setVersion(const vespalib::Version &version) TestServer::TestServer(const Identity &ident, const RoutingSpec &spec, const Slobrok &slobrok, - const string &oosServerPattern, IProtocol::SP protocol) : net(RPCNetworkParams() .setIdentity(ident) - .setSlobrokConfig(slobrok.config()) - .setOOSServerPattern(oosServerPattern)), + .setSlobrokConfig(slobrok.config())), mb(net, ProtocolSet().add(IProtocol::SP(new SimpleProtocol())).add(protocol)) { mb.setupRouting(spec); @@ -50,12 +46,6 @@ TestServer::waitSlobrok(const string &pattern, uint32_t cnt) } bool -TestServer::waitOOS(const string &service) -{ - return waitState(OOSState().add(service, true)); -} - -bool TestServer::waitState(const SlobrokState &slobrokState) { for (uint32_t i = 0; i < 12000; ++i) { @@ -76,24 +66,4 @@ TestServer::waitState(const SlobrokState &slobrokState) return false; } -bool -TestServer::waitState(const OOSState &oosState) -{ - for (uint32_t i = 0; i < 12000; ++i) { - bool done = true; - for (OOSState::ITR itr = oosState.begin(); - itr != oosState.end(); ++itr) - { - if (net.getOOSManager().isOOS(itr->first) != itr->second) { - done = false; - } - } - if (done) { - return true; - } - FastOS_Thread::Sleep(10); - } - return false; -} - } diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.h b/messagebus/src/vespa/messagebus/testlib/testserver.h index 400e2b274c5..757e74c3554 100644 --- a/messagebus/src/vespa/messagebus/testlib/testserver.h +++ b/messagebus/src/vespa/messagebus/testlib/testserver.h @@ -13,7 +13,6 @@ class Identity; class RoutingTableSpec; class Slobrok; class SlobrokState; -class OOSState; class VersionedRPCNetwork : public RPCNetwork { private: @@ -36,20 +35,13 @@ public: VersionedRPCNetwork net; MessageBus mb; - TestServer(const Identity &ident, - const RoutingSpec &spec, - const Slobrok &slobrok, - const string &oosServerPattern = "", + TestServer(const Identity &ident, const RoutingSpec &spec, const Slobrok &slobrok, IProtocol::SP protocol = IProtocol::SP()); - TestServer(const MessageBusParams &mbusParams, - const RPCNetworkParams &netParams); + TestServer(const MessageBusParams &mbusParams, const RPCNetworkParams &netParams); ~TestServer(); bool waitSlobrok(const string &pattern, uint32_t cnt = 1); - bool waitOOS(const string &service); - bool waitState(const SlobrokState &slobrokState); - bool waitState(const OOSState &oosState); }; } // namespace mbus diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp index fe4600aea58..e24a65103a7 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp +++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp @@ -142,12 +142,6 @@ Application::parseArgs() } else { throw config::InvalidConfigException("Missing value for parameter 'listenport'."); } - } else if (strcasecmp(_argv[arg], "--oosserverpattern") == 0) { - if (++arg < _argc) { - _params.getRPCNetworkParams().setOOSServerPattern(_argv[arg]); - } else { - throw config::InvalidConfigException("Missing value for parameter 'oosserverpattern'."); - } } else if (strcasecmp(_argv[arg], "--protocol") == 0) { if (++arg < _argc) { _params.setProtocol(_argv[arg]); @@ -197,7 +191,6 @@ Application::printHelp() const " --hops Prints a list of all available hops.\n" " --identity <id> Sets the identity of message bus.\n" " --listenport <num> Sets the port message bus will listen to.\n" - " --oosserverpattern <id> Sets the out-of-service server pattern for message bus.\n" " --protocol <name> Sets the name of the protocol whose routing to inspect.\n" " --route <name> Prints detailed information about route <name>.\n" " --routes Prints a list of all available routes.\n" @@ -214,14 +207,11 @@ Application::verifyRoute(const mbus::Route &route, std::set<std::string> &errors std::string str = route.getHop(i).toString(); mbus::HopBlueprint hop = getHop(str); std::set<std::string> hopErrors; - std::vector<std::string> services, oos; - if (!verifyHop(hop, services, oos, hopErrors)) { + if (!verifyHop(hop, hopErrors)) { for (std::set<std::string>::iterator err = hopErrors.begin(); err != hopErrors.end(); ++err) { - errors.insert(vespalib::make_string("for hop '%s', %s", - str.c_str(), - err->c_str())); + errors.insert(vespalib::make_string("for hop '%s', %s", str.c_str(), err->c_str())); } } } @@ -229,8 +219,7 @@ Application::verifyRoute(const mbus::Route &route, std::set<std::string> &errors } bool -Application::verifyHop(const mbus::HopBlueprint &hop, std::vector<std::string> &services, - std::vector<std::string> &oos, std::set<std::string> &errors) const +Application::verifyHop(const mbus::HopBlueprint &hop, std::set<std::string> &errors) const { // _P_A_R_A_N_O_I_A_ if (!hop.hasDirectives()) { @@ -252,9 +241,8 @@ Application::verifyHop(const mbus::HopBlueprint &hop, std::vector<std::string> & const mbus::RoutingTable &table = *_mbus->getRoutingTable(_params.getProtocol()); if (hop.getDirective(0)->getType() == mbus::IHopDirective::TYPE_ROUTE) { const mbus::RouteDirective &dir = static_cast<const mbus::RouteDirective &>(*hop.getDirective(0)); - if (table.getRoute(dir.getName()) == NULL) { - errors.insert(vespalib::make_string("route '%s' not found", - dir.getName().c_str())); + if (table.getRoute(dir.getName()) == nullptr) { + errors.insert(vespalib::make_string("route '%s' not found", dir.getName().c_str())); return false; } else { return true; @@ -262,9 +250,9 @@ Application::verifyHop(const mbus::HopBlueprint &hop, std::vector<std::string> & } std::string selector = hop.create()->toString(); - if (table.getHop(selector) != NULL) { + if (table.getHop(selector) != nullptr) { return true; - } else if (table.getRoute(selector) != NULL) { + } else if (table.getRoute(selector) != nullptr) { return true; } @@ -275,18 +263,6 @@ Application::verifyHop(const mbus::HopBlueprint &hop, std::vector<std::string> & return false; } - // Check OOS status of all matches. - for (slobrok::api::IMirrorAPI::SpecList::iterator it = lst.begin(); - it != lst.end(); ++it) - { - services.push_back(it->first); - if (_net->verifyOOS(it->first)) { - oos.push_back(it->first); - } - } - if (oos.size() == lst.size()) { - errors.insert("matching service(s) out of service"); - } return errors.empty(); } @@ -299,8 +275,7 @@ Application::printDump() const it.isValid(); it.next()) { std::set<std::string> errors; - std::vector<std::string> services, oos; - bool ok = verifyHop(it.getHop(), services, oos, errors); + bool ok = verifyHop(it.getHop(), errors); printf(" <hop name='%s' selector='%s'", it.getName().c_str(), it.getHop().create()->toString().c_str()); if (it.getHop().getIgnoreResult()) { @@ -346,9 +321,7 @@ Application::printDump() const for (slobrok::api::IMirrorAPI::SpecList::iterator it = services.begin(); it != services.end(); ++it) { - printf(" <service name='%s' spec='%s' %s/>\n", - it->first.c_str(), it->second.c_str(), - _net->verifyOOS(it->first) ? "state='oos' " : ""); + printf(" <service name='%s' spec='%s'/>\n", it->first.c_str(), it->second.c_str()); } printf("</services>\n"); } @@ -383,8 +356,7 @@ Application::printHops() const hops[i].c_str(), hop.create()->toString().c_str()); std::set<std::string> errors; - std::vector<std::string> services, oos; - if (_params.getVerify() && verifyHop(hop, services, oos, errors)) { + if (_params.getVerify() && verifyHop(hop, errors)) { printf(" (verified)\n"); } else { printf("\n"); diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.h b/vespaclient/src/vespa/vespaclient/vesparoute/application.h index 60c950bca83..697319befcc 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/application.h +++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.h @@ -60,8 +60,7 @@ private: bool verifyRoute(const mbus::Route &route, std::set<std::string> &errors) const; /** Verifies the content of the given hop. */ - bool verifyHop(const mbus::HopBlueprint &hop, std::vector<std::string> &services, - std::vector<std::string> &oos, std::set<std::string> &errors) const; + bool verifyHop(const mbus::HopBlueprint &hop, std::set<std::string> &errors) const; public: Application(); diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.cpp index d9d8d0c4056..cc0d37a9d34 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.cpp +++ b/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.cpp @@ -1,9 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mynetwork.h" -#include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/sendproxy.h> -#include <vespa/messagebus/network/oosmanager.h> class MyServiceAddress : public mbus::IServiceAddress { @@ -11,22 +9,15 @@ private: std::string _address; public: - MyServiceAddress(const std::string &address) : - _address(address) { - // empty - } - - const std::string &getAddress() { - return _address; - } + MyServiceAddress(const std::string &address) : _address(address) {} + + const std::string &getAddress() { return _address; } }; MyNetwork::MyNetwork(const mbus::RPCNetworkParams ¶ms) : mbus::RPCNetwork(params), _nodes() -{ - // empty -} +{} bool @@ -42,12 +33,6 @@ MyNetwork::freeServiceAddress(mbus::RoutingNode &recipient) recipient.setServiceAddress(mbus::IServiceAddress::UP()); } -bool -MyNetwork::verifyOOS(const std::string &address) -{ - return getOOSManager().isOOS(address); -} - void MyNetwork::send(const mbus::Message &msg, const std::vector<mbus::RoutingNode*> &nodes) { diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.h b/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.h index 69b1f66ca4a..846df394db0 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.h +++ b/vespaclient/src/vespa/vespaclient/vesparoute/mynetwork.h @@ -25,14 +25,6 @@ public: void send(const mbus::Message &msg, const std::vector<mbus::RoutingNode*> &recipients) override; /** - * Returns whether or not the given address is actually out of service. - * - * @param address The address to check. - * @return True if the address is out of service. - */ - bool verifyOOS(const std::string &address); - - /** * Removes and returns the list of recipients that was most recently sent to. * * @param contexts The list to move the contexts to. diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/params.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/params.cpp index eb372863f89..45a14691648 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/params.cpp +++ b/vespaclient/src/vespa/vespaclient/vesparoute/params.cpp @@ -16,14 +16,9 @@ Params::Params() : _lstServices(false), _dump(false), _verify(false) -{ - _rpcParams.setOOSServerPattern("search/*/rtx/*/clustercontroller"); // magic -} +{} -Params::~Params() -{ - // empty -} +Params::~Params() = default; } |