diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-16 23:24:17 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-17 10:52:58 +0200 |
commit | b527d0d66226ee4a1d037df237c416e2e8ea46a0 (patch) | |
tree | 8a061db6c10f86aeb11fb0788489b1c74ebde15b /messagebus | |
parent | 8c959405273b5da2be8478cfb81fb7d29e2dd438 (diff) |
Remove some java OSS stuff too.
Diffstat (limited to 'messagebus')
5 files changed, 0 insertions, 647 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSClient.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSClient.java deleted file mode 100755 index aff7a40dbe6..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSClient.java +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.jrt.*; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -/** - * This class keeps track of OOS information obtained from a single server. This class is used by the OOSManager class. - * Note that since this class is only used inside the transport thread it has no synchronization. Using it directly will - * lead to race conditions and possible crashes. - * - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSClient implements Runnable, RequestWaiter { - - private Supervisor orb; - private Target target = null; - private Request request = null; - private boolean requestDone = false; - private Spec spec; - private Task task; - private List<String> oosList = new ArrayList<String>(); - private int requestGen = 0; - private int listGen = 0; - private int dumpGen = 0; - private boolean shutdown = false; - - /** - * Create a new OOSClient polling oos information from the given server. - * - * @param orb The object used for RPC operations. - * @param spec The fnet connect spec for oos server. - */ - public OOSClient(Supervisor orb, Spec spec) { - this.orb = orb; - this.spec = spec; - - task = this.orb.transport().createTask(this); - task.scheduleNow(); - } - - /** - * Handle a server reply. - */ - private void handleReply() { - if (!request.checkReturnTypes("Si")) { - if (target != null) { - target.close(); - target = null; - } - task.schedule(1.0); - return; - } - - Values ret = request.returnValues(); - int retGen = ret.get(1).asInt32(); - if (requestGen != retGen) { - List<String> oos = new ArrayList<String>(); - oos.addAll(Arrays.asList(ret.get(0).asStringArray())); - oosList = oos; - requestGen = retGen; - listGen = retGen; - } - task.schedule(0.1); - } - - /** - * Handle server (re)connect. - */ - private void handleConnect() { - if (target == null) { - target = orb.connect(spec); - requestGen = 0; - } - } - - /** - * Handle server invocation. - */ - private void handleInvoke() { - if (target == null) { - throw new IllegalStateException("Attempting to invoke a request on a null target."); - } - request = new Request("fleet.getOOSList"); - request.parameters().add(new Int32Value(requestGen)); - request.parameters().add(new Int32Value(60000)); - target.invokeAsync(request, 70.0, this); - } - - /** - * Implements runnable. Performs overall server poll logic. - */ - public void run() { - if (shutdown) { - task.kill(); - if (target != null) { - target.close(); - } - } else if (requestDone) { - requestDone = false; - handleReply(); - } else { - handleConnect(); - handleInvoke(); - } - } - - /** - * Shut down this OOS client. Invoking this method will take down any active connections and block further activity - * from this object. - */ - public void shutdown() { - shutdown = true; - task.scheduleNow(); - } - - /** - * From FRT_IRequestWait, picks up server replies. - * - * @param request The request that has completed. - */ - public void handleRequestDone(Request request) { - if (request != this.request || requestDone) { - throw new IllegalStateException("Multiple invocations of RequestDone()."); - } - requestDone = true; - task.scheduleNow(); - } - - /** - * Obtain the connect spec of the OOS server this client is talking to. - * - * @return OOS server connect spec - */ - public Spec getSpec() { - return spec; - } - - /** - * Check if this client has changed. A client has changed if it has obtain now information after the dumpState - * method was last invoked. - * - * @return True is this client has changed. - */ - public boolean isChanged() { - return listGen != dumpGen; - } - - /** - * Returns whether or not this client has receieved any reply at all from the server it is connected to. - * - * @return True if initial request has returned. - */ - public boolean isReady() { - return listGen != 0; - } - - /** - * Dump the current oos information known by this client into the given string set. - * - * @param dst The object used to aggregate oos information. - */ - public void dumpState(Set<String> dst) { - dst.addAll(oosList); - dumpGen = listGen; - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSManager.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSManager.java deleted file mode 100755 index b19543b20b8..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSManager.java +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.jrt.Spec; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Task; -import com.yahoo.jrt.slobrok.api.Mirror; - -import java.util.*; - -/** - * This class keeps track of OOS information. A set of servers having OOS information are identified by looking up a - * service pattern in the slobrok. These servers are then polled for information. The information is compiled into a - * local repository for fast lookup. - * - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSManager implements Runnable { - - // An internal flag that indicates whether or not this manager is disabled. This is used to short-circuit any - // requests made when the service pattern is null. - private boolean disabled; - - // Whether or not this manager has received status information from all connected clients. - private boolean ready; - - // The JRT supervisor object. - private final Supervisor orb; - - // The JRT slobrok mirror object. - private final Mirror mirror; - - // A transport task object used for scheduling this. - private Task task; - - // The service pattern used to resolve what services registered in slobrok resolve to OOS servers. - private final String servicePattern; - - // A map of OOS clients that each poll a single OOS server. This map will contain an entry for each service that - // the service pattern resolves to. - private Map<String, OOSClient> clients = Collections.emptyMap(); - - // A set of out-of-service service names. - private volatile Set<String> oosSet; - - // The generation of the current slobrok resolve. - private int slobrokGen = 0; - - // A local copy of the services that the service pattern resolved to after the previous slobrok lookup. This is used - // to avoid updating the internal list every time slobrok's generation differs, but instead only when the service - // pattern resolves to something different. - private List<Mirror.Entry> services; - - /** - * Create a new OOSManager. The given service pattern will be looked up in the given slobrok mirror. The resulting - * set of services will be polled for oos information. - * - * @param orb The object used for RPC operations. - * @param mirror The slobrok mirror. - * @param servicePattern The service pattern for oos servers. - */ - public OOSManager(Supervisor orb, Mirror mirror, String servicePattern) { - this.orb = orb; - this.mirror = mirror; - this.servicePattern = servicePattern; - - disabled = (servicePattern == null || servicePattern.isEmpty()); - ready = disabled; - - if (!disabled) { - task = orb.transport().createTask(this); - task.scheduleNow(); - } - } - - /** - * Method invoked when this object is run as a task. This method will update the oos information held by this - * object. - */ - public void run() { - boolean changed = updateFromSlobrok(); - boolean allOk = mirror.ready(); - for (OOSClient client : clients.values()) { - if (client.isChanged()) { - changed = true; - } - if (!client.isReady()) { - allOk = false; - } - } - if (changed) { - Set<String> oos = new LinkedHashSet<String>(); - for (OOSClient client : clients.values()) { - client.dumpState(oos); - } - oosSet = oos; - } - if (allOk && !ready) { - ready = true; - } - task.schedule(ready ? 1.0 : 0.1); - } - - /** - * This method will check the local slobrok mirror to make sure that its clients are connected to the appropriate - * services. If anything changes this method returns true. - * - * @return True if anything changed. - */ - private boolean updateFromSlobrok() { - if (slobrokGen == mirror.updates()) { - return false; - } - slobrokGen = mirror.updates(); - List<Mirror.Entry> newServices = Arrays.asList(mirror.lookup(servicePattern)); - Collections.sort(newServices, new Comparator<Mirror.Entry>() { - public int compare(Mirror.Entry lhs, Mirror.Entry rhs) { - return lhs.compareTo(rhs); - } - }); - if (newServices.equals(services)) { - return false; - } - Map<String, OOSClient> newClients = new HashMap<String, OOSClient>(); - for (Mirror.Entry service : newServices) { - OOSClient client = clients.remove(service.getSpec()); - if (client == null) { - client = new OOSClient(orb, new Spec(service.getSpec())); - } - newClients.put(service.getSpec(), client); - } - for (OOSClient client : clients.values()) { - client.shutdown(); - } - services = newServices; - clients = newClients; - return true; - } - - /** - * Returns whether or not some initial state has been returned. - * - * @return True, if initial state has been found. - */ - public boolean isReady() { - return ready; - } - - /** - * Returns whether or not the given service has been marked as out of service. - * - * @param service The service to check. - * @return True if the service is out of service. - */ - @SuppressWarnings({ "RedundantIfStatement" }) - public boolean isOOS(String service) { - if (disabled) { - return false; - } - Set<String> s = oosSet; - if (s == null) { - return false; - } - if (!s.contains(service)) { - return false; - } - return true; - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSServer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSServer.java deleted file mode 100755 index ce3f5460610..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSServer.java +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc.test; - -import com.yahoo.jrt.*; -import com.yahoo.jrt.slobrok.api.SlobrokList; -import com.yahoo.jrt.slobrok.api.Register; -import com.yahoo.jrt.slobrok.server.Slobrok; - -import java.util.ArrayList; -import java.util.List; - -/** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSServer { - private int getCnt = 1; - private List<String> state = new ArrayList<String>(); - private Supervisor orb; - private Register register; - private Acceptor listener; - - public OOSServer(Slobrok slobrok, String service, OOSState state) { - orb = new Supervisor(new Transport()); - orb.addMethod(new Method("fleet.getOOSList", "ii", "Si", - new MethodHandler() { - public void invoke(Request request) { - rpc_poll(request); - } - }) - .methodDesc("Fetch OOS information.") - .paramDesc(0, "gencnt", "Generation already known by client.") - .paramDesc(1, "timeout", "How many milliseconds to wait for changes before returning if nothing has changed (max=10000).") - .returnDesc(0, "names", "List of services that are OOS (empty if generation has not changed).") - .returnDesc(1, "newgen", "Generation of the returned list.")); - try { - listener = orb.listen(new Spec(0)); - } - catch (ListenFailedException e) { - orb.transport().shutdown().join(); - throw new RuntimeException(e); - } - SlobrokList slist = new SlobrokList(); - slist.setup(new String[] { new Spec("localhost", slobrok.port()).toString() }); - register = new Register(orb, slist, "localhost", listener.port()); - register.registerName(service); - setState(state); - } - - public void shutdown() { - register.shutdown(); - listener.shutdown().join(); - orb.transport().shutdown().join(); - } - - public void setState(OOSState state) { - List<String> newState = new ArrayList<String>(); - for (String service : state.getServices()) { - if (state.isOOS(service)) { - newState.add(service); - } - } - synchronized(this) { - this.state = newState; - if (++getCnt == 0) { - getCnt = 1; - } - } - } - - private void rpc_poll(Request request) { - synchronized(this) { - request.returnValues() - .add(new StringArray(state.toArray(new String[state.size()]))) - .add(new Int32Value(getCnt)); - } - } - - public int getPort() { - return listener.port(); - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSState.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSState.java deleted file mode 100755 index ee9de59f085..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/test/OOSState.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc.test; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -/** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSState { - private Map<String, Boolean> data = new LinkedHashMap<String, Boolean>(); - - public OOSState add(String service, boolean oos) { - data.put(service, oos); - return this; - } - - public Set<String> getServices() { - return data.keySet(); - } - - public boolean isOOS(String service) { - return data.containsKey(service) && data.get(service); - } -} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java deleted file mode 100755 index a6270f6558d..00000000000 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.network.Identity; -import com.yahoo.messagebus.network.rpc.test.OOSServer; -import com.yahoo.messagebus.network.rpc.test.OOSState; -import com.yahoo.messagebus.network.rpc.test.TestServer; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.test.Receptor; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; - -import java.net.UnknownHostException; - -/** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class OOSTestCase extends junit.framework.TestCase { - - private static class MyServer extends TestServer implements MessageHandler { - DestinationSession session; - - public MyServer(String name, Slobrok slobrok, String oosServerPattern) - throws ListenFailedException, UnknownHostException - { - super(new MessageBusParams().setRetryPolicy(null).addProtocol(new SimpleProtocol()), - new RPCNetworkParams() - .setIdentity(new Identity(name)) - .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)) - .setOOSServerPattern(oosServerPattern)); - session = mb.createDestinationSession("session", true, this); - } - - public boolean destroy() { - session.destroy(); - return super.destroy(); - } - - public void handleMessage(Message msg) { - session.acknowledge(msg); - } - } - - private static void assertError(SourceSession src, String dst, int error) { - Message msg = new SimpleMessage("msg"); - msg.getTrace().setLevel(9); - assertTrue(src.send(msg, Route.parse(dst)).isAccepted()); - Reply reply = ((Receptor) src.getReplyHandler()).getReply(60); - assertNotNull(reply); - System.out.println(reply.getTrace()); - if (error == ErrorCode.NONE) { - assertFalse(reply.hasErrors()); - } else { - assertTrue(reply.hasErrors()); - assertEquals(error, reply.getError(0).getCode()); - } - } - - public void testOOS() throws ListenFailedException, UnknownHostException { - Slobrok slobrok = new Slobrok(); - TestServer srcServer = new TestServer("src", null, slobrok, "oos/*", null); - SourceSession srcSession = srcServer.mb.createSourceSession(new Receptor()); - - MyServer dst1 = new MyServer("dst1", slobrok, null); - MyServer dst2 = new MyServer("dst2", slobrok, null); - MyServer dst3 = new MyServer("dst3", slobrok, null); - MyServer dst4 = new MyServer("dst4", slobrok, null); - MyServer dst5 = new MyServer("dst5", slobrok, null); - assertTrue(srcServer.waitSlobrok("*/session", 5)); - - // Ensure that normal sending is ok. - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.NONE); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Ensure that 2 OOS services report properly. - OOSServer oosServer = new OOSServer(slobrok, "oos/1", new OOSState() - .add("dst2/session", true) - .add("dst3/session", true)); - assertTrue(srcServer.waitSlobrok("oos/*", 1)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst3/session", true))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Ensure that 1 OOS service may come up while other stays down. - oosServer.setState(new OOSState().add("dst2/session", true)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst3/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Add another OOS server and make sure that it works properly. - OOSServer oosServer2 = new OOSServer(slobrok, "oos/2", new OOSState() - .add("dst4/session", true) - .add("dst5/session", true)); - assertTrue(srcServer.waitSlobrok("oos/*", 2)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst4/session", true) - .add("dst5/session", true))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS); - oosServer2.shutdown(); - - // Ensure that shutting down one OOS server will properly propagate. - assertTrue(srcServer.waitSlobrok("oos/*", 1)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst1/session", false) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - // Now add two new OOS servers and make sure that works too. - OOSServer oosServer3 = new OOSServer(slobrok, "oos/3", new OOSState() - .add("dst2/session", true) - .add("dst4/session", true)); - OOSServer oosServer4 = new OOSServer(slobrok, "oos/4", new OOSState() - .add("dst2/session", true) - .add("dst3/session", true) - .add("dst5/session", true)); - assertTrue(srcServer.waitSlobrok("oos/*", 3)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst2/session", true) - .add("dst3/session", true) - .add("dst4/session", true) - .add("dst5/session", true))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS); - - // Modify the state of the two new servers and make sure it propagates. - oosServer3.setState(new OOSState() - .add("dst2/session", true)); - oosServer4.setState(new OOSState() - .add("dst1/session", true)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst1/session", true) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - oosServer3.shutdown(); - oosServer4.shutdown(); - - // Ensure that shutting down the two latest OOS servers works properly. - assertTrue(srcServer.waitSlobrok("oos/*", 1)); - assertTrue(srcServer.waitState(new OOSState() - .add("dst1/session", false) - .add("dst2/session", true) - .add("dst3/session", false) - .add("dst4/session", false) - .add("dst5/session", false))); - assertError(srcSession, "dst1/session", ErrorCode.NONE); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - assertError(srcSession, "dst3/session", ErrorCode.NONE); - assertError(srcSession, "dst4/session", ErrorCode.NONE); - assertError(srcSession, "dst5/session", ErrorCode.NONE); - - dst2.destroy(); - assertTrue(srcServer.waitSlobrok("*/session", 4)); - assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); - - srcSession.destroy(); - dst1.destroy(); - dst2.destroy(); - dst3.destroy(); - dst4.destroy(); - dst5.destroy(); - } -} |