diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-29 16:36:21 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-29 16:36:21 +0200 |
commit | b4656dc85fe215bf79eadedd868be0082fe70ab0 (patch) | |
tree | a3e11af2608694114a7fd5288748c7b2508e20d0 /jrt | |
parent | 59f27953f5c8cd144f31f45a82d88d21511b8e6a (diff) |
Yahoo sets up mac wireless networks such that the local hostname points to an
ip which does not resolve. This works around that problem by finding a resolvable
address (while still falling back to localhost if we only get ipv6 addresses,
as that causes other problems in docker containers).
Diffstat (limited to 'jrt')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Acceptor.java | 18 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connection.java | 15 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connector.java | 2 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Request.java | 5 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Spec.java | 29 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 5 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java | 9 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java | 308 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/slobrok/api/Register.java | 67 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java | 156 |
10 files changed, 294 insertions, 320 deletions
diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java index 05a7591ab74..7316f8c620b 100644 --- a/jrt/src/com/yahoo/jrt/Acceptor.java +++ b/jrt/src/com/yahoo/jrt/Acceptor.java @@ -13,7 +13,7 @@ import java.util.logging.Logger; * transport thread. To create an acceptor you need to invoke the * {@link Supervisor#listen listen} method in the {@link Supervisor} * class. - **/ + */ public class Acceptor { private class Run implements Runnable { @@ -34,15 +34,12 @@ public class Acceptor { private ServerSocketChannel serverChannel; - Acceptor(Transport parent, Supervisor owner, - Spec spec) throws ListenFailedException { - + Acceptor(Transport parent, Supervisor owner, Spec spec) throws ListenFailedException { this.parent = parent; this.owner = owner; - if (spec.malformed()) { - throw new ListenFailedException("Malformed spec"); - } + if (spec.malformed()) + throw new ListenFailedException("Malformed spec '" + spec + "'"); try { serverChannel = ServerSocketChannel.open(); @@ -55,7 +52,7 @@ public class Acceptor { if (serverChannel != null) { try { serverChannel.socket().close(); } catch (Exception x) {} } - throw new ListenFailedException("Listen failed", e); + throw new ListenFailedException("Failed to listen to " + spec, e); } thread.setDaemon(true); @@ -84,7 +81,7 @@ public class Acceptor { * @return listening spec, or null if not listening. **/ public Spec spec() { - if (!serverChannel.isOpen()) { + if ( ! serverChannel.isOpen()) { return null; } return new Spec(serverChannel.socket().getInetAddress().getHostName(), @@ -94,8 +91,7 @@ public class Acceptor { private void run() { while (serverChannel.isOpen()) { try { - parent.addConnection(new Connection(parent, owner, - serverChannel.accept())); + parent.addConnection(new Connection(parent, owner, serverChannel.accept())); parent.sync(); } catch (java.nio.channels.ClosedChannelException x) { } catch (Exception e) { diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index 7affa875cd6..52964726eb7 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -1,7 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; @@ -35,10 +34,8 @@ class Connection extends Target { private Buffer output = new Buffer(WRITE_SIZE * 2); private int maxInputSize = 64*1024; private int maxOutputSize = 64*1024; - private Map<Integer, ReplyHandler> replyMap - = new HashMap<Integer, ReplyHandler>(); - private Map<TargetWatcher, TargetWatcher> watchers - = new IdentityHashMap<TargetWatcher, TargetWatcher>(); + private Map<Integer, ReplyHandler> replyMap = new HashMap<>(); + private Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>(); private int activeReqs = 0; private int writeWork = 0; private Transport parent; @@ -52,8 +49,7 @@ class Connection extends Target { private void setState(int state) { if (state <= this.state) { - log.log(Level.WARNING, "Bogus state transition: " - + this.state + "->" + state); + log.log(Level.WARNING, "Bogus state transition: " + this.state + "->" + state); return; } boolean live = (this.state == INITIAL && state == CONNECTED); @@ -95,8 +91,7 @@ class Connection extends Target { owner.sessionInit(this); } - public Connection(Transport parent, Supervisor owner, - Spec spec, Object context) { + public Connection(Transport parent, Supervisor owner, Spec spec, Object context) { super(context); this.parent = parent; this.owner = owner; @@ -400,6 +395,6 @@ class Connection extends Target { if (channel != null) { return "Connection { " + channel.socket() + " }"; } - return "Connection { no socket }"; + return "Connection { no socket, spec " + spec + " }"; } } diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java index fa43710b1f6..6778e047a8b 100644 --- a/jrt/src/com/yahoo/jrt/Connector.java +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -27,7 +27,7 @@ class Connector { } public void connectLater(Connection c) { - if (!connectQueue.enqueue(c)) { + if ( ! connectQueue.enqueue(c)) { parent.addConnection(c); } } diff --git a/jrt/src/com/yahoo/jrt/Request.java b/jrt/src/com/yahoo/jrt/Request.java index 99d7df8657e..4786124e56b 100644 --- a/jrt/src/com/yahoo/jrt/Request.java +++ b/jrt/src/com/yahoo/jrt/Request.java @@ -14,7 +14,7 @@ package com.yahoo.jrt; * client/server roles are independent of connection client/server * roles, since invocations can be performed both ways across a {@link * Target}. - **/ + */ public class Request { private String methodName; @@ -242,8 +242,7 @@ public class Request if (returnValues.satisfies(returnTypes)) { return true; } - setError(ErrorCode.WRONG_RETURN, - "checkReturnValues: Wrong return values"); + setError(ErrorCode.WRONG_RETURN, "checkReturnValues: Wrong return values"); return false; } diff --git a/jrt/src/com/yahoo/jrt/Spec.java b/jrt/src/com/yahoo/jrt/Spec.java index 7ed0aa69920..4c1f07b98a2 100644 --- a/jrt/src/com/yahoo/jrt/Spec.java +++ b/jrt/src/com/yahoo/jrt/Spec.java @@ -2,6 +2,8 @@ package com.yahoo.jrt; +import com.yahoo.net.HostName; + import java.net.SocketAddress; import java.net.InetSocketAddress; @@ -9,9 +11,9 @@ import java.net.InetSocketAddress; /** * A Spec is a network address used for either listening or * connecting. - **/ -public class Spec -{ + */ +public class Spec { + private SocketAddress address; private String host; private int port; @@ -24,11 +26,11 @@ public class Spec * * @param spec input string to be parsed * @see #malformed - **/ + */ public Spec(String spec) { if (spec.startsWith("tcp/")) { int sep = spec.indexOf(':'); - String portStr = null; + String portStr; if (sep == -1) { portStr = spec.substring(4); } else { @@ -52,7 +54,7 @@ public class Spec * * @param host host name * @param port port number - **/ + */ public Spec(String host, int port) { this.host = host; this.port = port; @@ -62,7 +64,7 @@ public class Spec * Create a Spec from a port number. * * @param port port number - **/ + */ public Spec(int port) { this.port = port; } @@ -71,7 +73,7 @@ public class Spec * Obtain the host name of this address * * @return host name - **/ + */ public String host() { return host; } @@ -80,7 +82,7 @@ public class Spec * Obtain the port number if this address * * @return port number - **/ + */ public int port() { return port; } @@ -90,7 +92,7 @@ public class Spec * you whether that string was malformed. * * @return true if this address is malformed - **/ + */ public boolean malformed() { return malformed; } @@ -100,7 +102,7 @@ public class Spec * malformed, this method will return null. * * @return socket address - **/ + */ SocketAddress address() { if (malformed) { return null; @@ -114,13 +116,13 @@ public class Spec } return address; } - + /** * Obtain a string representation of this address. The return * value from this method may be used to create a new Spec. * * @return string representation of this address - **/ + */ public String toString() { if (malformed) { return "MALFORMED"; @@ -130,4 +132,5 @@ public class Spec } return "tcp/" + host + ":" + port; } + } diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 85bfed79732..6a9a978fb77 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -229,9 +229,8 @@ public class Transport { * @param context application context for the new connection * @param sync perform a synchronous connect in the calling thread * if this flag is set - **/ - Connection connect(Supervisor owner, Spec spec, - Object context, boolean sync) { + */ + Connection connect(Supervisor owner, Spec spec, Object context, boolean sync) { Connection conn = new Connection(this, owner, spec, context); if (sync) { addConnection(conn.connect()); diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java index 3662e6ad5b9..421590e72ce 100644 --- a/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java +++ b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java @@ -4,8 +4,8 @@ package com.yahoo.jrt.slobrok.api; /** * Defines an interface for the name server lookup. * - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - **/ + * @author Simon Thoresen + */ public interface IMirror { /** @@ -21,7 +21,7 @@ public interface IMirror { * @return a list of all matching services, with corresponding connect specs * @param pattern The pattern used for matching **/ - public Mirror.Entry[] lookup(String pattern); + Mirror.Entry[] lookup(String pattern); /** * Obtain the number of updates seen by this mirror. The value may wrap, but will never become 0 again. This can be @@ -30,5 +30,6 @@ public interface IMirror { * * @return number of slobrok updates seen **/ - public int updates(); + int updates(); + } diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java index 5e62cb61b76..81ec51e2b9e 100644 --- a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java +++ b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java @@ -1,16 +1,14 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt.slobrok.api; - import com.yahoo.jrt.*; -import java.util.Arrays; -import java.util.Random; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.logging.Logger; import java.util.logging.Level; - /** * A Mirror object is used to keep track of the services registered * with a slobrok cluster. @@ -18,57 +16,19 @@ import java.util.logging.Level; * Updates to the service repository are fetched in the * background. Lookups against this object is done using an internal * mirror of the service repository. - **/ + */ public class Mirror implements IMirror { private static Logger log = Logger.getLogger(Mirror.class.getName()); - /** - * An Entry contains the name and connection spec for a single - * service. - **/ - public static final class Entry implements Comparable<Entry> { - private final String name; - private final String spec; - private final char [] nameArray; - - public Entry(String name, String spec) { - this.name = name; - this.spec = spec; - this.nameArray = name.toCharArray(); - } - - public boolean equals(Object rhs) { - if (rhs == null || !(rhs instanceof Entry)) { - return false; - } - Entry e = (Entry) rhs; - return (name.equals(e.name) && spec.equals(e.spec)); - } - - public int hashCode() { - return (name.hashCode() + spec.hashCode()); - } - - public int compareTo(Entry b) { - int diff = name.compareTo(b.name); - return diff != 0 - ? diff - : spec.compareTo(b.spec); - } - char [] getNameArray() { return nameArray; } - public String getName() { return name; } - public String getSpec() { return spec; } - } - private Supervisor orb; private SlobrokList slobroks; private String currSlobrok; private BackOffPolicy backOff; private volatile int updates = 0; - private boolean reqDone = false; + private boolean requestDone = false; private volatile Entry[] specs = new Entry[0]; - private int specsGen = 0; + private int specsGeneration = 0; private Task updateTask = null; private RequestWaiter reqWait = null; private Target target = null; @@ -87,11 +47,11 @@ public class Mirror implements IMirror { this.slobroks = slobroks; this.backOff = bop; updateTask = orb.transport().createTask(new Runnable() { - public void run() { handleUpdate(); } + public void run() { checkForUpdate(); } }); reqWait = new RequestWaiter() { public void handleRequestDone(Request req) { - reqDone = true; + requestDone = true; updateTask.scheduleNow(); } }; @@ -104,7 +64,7 @@ public class Mirror implements IMirror { * * @param orb the Supervisor to use * @param slobroks slobrok connect spec list - **/ + */ public Mirror(Supervisor orb, SlobrokList slobroks) { this(orb, slobroks, new BackOff()); } @@ -112,7 +72,7 @@ public class Mirror implements IMirror { /** * Shut down the Mirror. This will close any open connections and * stop the regular mirror updates. - **/ + */ public void shutdown() { updateTask.kill(); orb.transport().perform(new Runnable() { @@ -122,12 +82,11 @@ public class Mirror implements IMirror { @Override public Entry[] lookup(String pattern) { - ArrayList<Entry> found = new ArrayList<Entry>(); - Entry [] e = specs; - char [] p = pattern.toCharArray(); - for (int i = 0; i < e.length; i++) { - if (match(e[i].getNameArray(), p)) { - found.add(e[i]); + ArrayList<Entry> found = new ArrayList<>(); + char[] p = pattern.toCharArray(); + for (Entry specEntry : specs) { + if (match(specEntry.getNameArray(), p)) { + found.add(specEntry); } } return found.toArray(new Entry[found.size()]); @@ -145,7 +104,7 @@ public class Mirror implements IMirror { * (or if it never does, time out and tell the user there was no answer from any Service Location Broker). * * @return true if the MirrorAPI object has asked for updates from a Slobrok and got any answer back - **/ + */ public boolean ready() { return (updates != 0); } @@ -167,7 +126,7 @@ public class Mirror implements IMirror { * @return true if the name matches the pattern * @param name the name * @param pattern the pattern - **/ + */ static boolean match(char [] name, char [] pattern) { int ni = 0; int pi = 0; @@ -197,95 +156,58 @@ public class Mirror implements IMirror { /** * Invoked by the update task. - **/ - private void handleUpdate() { - if (reqDone) { - reqDone = false; - - if (req.errorCode() == ErrorCode.NONE && - req.returnValues().satisfies("SSi") && - req.returnValues().get(0).count() == req.returnValues().get(1).count()) - { - Values answer = req.returnValues(); - - if (specsGen != answer.get(2).asInt32()) { - - int numNames = answer.get(0).count(); - String[] n = answer.get(0).asStringArray(); - String[] s = answer.get(1).asStringArray(); - Entry[] newSpecs = new Entry[numNames]; - - for (int idx = 0; idx < numNames; idx++) { - newSpecs[idx] = new Entry(n[idx], s[idx]); - } - - specs = newSpecs; - - specsGen = answer.get(2).asInt32(); - int u = (updates + 1); - if (u == 0) { - u++; - } - updates = u; + */ + private void checkForUpdate() { + if (requestDone) { + handleUpdate(); + requestDone = false; + return; + } + + if (target != null && ! slobroks.contains(currSlobrok)) { + target.close(); + target = null; + } + if (target == null) { + currSlobrok = slobroks.nextSlobrokSpec(); + if (currSlobrok == null) { + double delay = backOff.get(); + updateTask.schedule(delay); + if (backOff.shouldWarn(delay)) { + log.log(Level.INFO, "no location brokers available " + + "(retry in " + delay + " seconds) for: " + slobroks); } - backOff.reset(); - updateTask.schedule(0.1); // be nice - return; - } - if (!req.checkReturnTypes("iSSSi") - || (req.returnValues().get(2).count() != - req.returnValues().get(3).count())) - { - target.close(); - target = null; - updateTask.scheduleNow(); // try next slobrok return; } + target = orb.connect(new Spec(currSlobrok)); + specsGeneration = 0; + } + req = new Request("slobrok.incremental.fetch"); + req.parameters().add(new Int32Value(specsGeneration)); // gencnt + req.parameters().add(new Int32Value(5000)); // mstimeout + target.invokeAsync(req, 40.0, reqWait); + } + + private void handleUpdate() { + if (req.errorCode() == ErrorCode.NONE && + req.returnValues().satisfies("SSi") && + req.returnValues().get(0).count() == req.returnValues().get(1).count()) + { + Values answer = req.returnValues(); + if (specsGeneration != answer.get(2).asInt32()) { - Values answer = req.returnValues(); + int numNames = answer.get(0).count(); + String[] n = answer.get(0).asStringArray(); + String[] s = answer.get(1).asStringArray(); + Entry[] newSpecs = new Entry[numNames]; - int diffFrom = answer.get(0).asInt32(); - int diffTo = answer.get(4).asInt32(); - - if (specsGen != diffTo) { - - int nRemoves = answer.get(1).count(); - String[] r = answer.get(1).asStringArray(); - - int numNames = answer.get(2).count(); - String[] n = answer.get(2).asStringArray(); - String[] s = answer.get(3).asStringArray(); - - - Entry[] newSpecs; - if (diffFrom == 0) { - newSpecs = new Entry[numNames]; - - for (int idx = 0; idx < numNames; idx++) { - newSpecs[idx] = new Entry(n[idx], s[idx]); - } - } else { - java.util.HashMap<String, Entry> map = new java.util.HashMap<String, Entry>(); - for (Entry e : specs) { - map.put(e.getName(), e); - } - for (String rem : r) { - map.remove(rem); - } - for (int idx = 0; idx < numNames; idx++) { - map.put(n[idx], new Entry(n[idx], s[idx])); - } - newSpecs = new Entry[map.size()]; - int idx = 0; - for (Entry e : map.values()) { - newSpecs[idx++] = e; - } + for (int idx = 0; idx < numNames; idx++) { + newSpecs[idx] = new Entry(n[idx], s[idx]); } - specs = newSpecs; - specsGen = diffTo; + specsGeneration = answer.get(2).asInt32(); int u = (updates + 1); if (u == 0) { u++; @@ -296,34 +218,72 @@ public class Mirror implements IMirror { updateTask.schedule(0.1); // be nice return; } - if (target != null && ! slobroks.contains(currSlobrok)) { + if (!req.checkReturnTypes("iSSSi") + || (req.returnValues().get(2).count() != + req.returnValues().get(3).count())) + { target.close(); target = null; + updateTask.scheduleNow(); // try next slobrok + return; } - if (target == null) { - currSlobrok = slobroks.nextSlobrokSpec(); - if (currSlobrok == null) { - double delay = backOff.get(); - updateTask.schedule(delay); - if (backOff.shouldWarn(delay)) { - log.log(Level.INFO, "no location brokers available " - + "(retry in " + delay + " seconds) for: " + slobroks); + + + Values answer = req.returnValues(); + + int diffFromGeneration = answer.get(0).asInt32(); + int diffToGeneration = answer.get(4).asInt32(); + if (specsGeneration != diffToGeneration) { + + int nRemoves = answer.get(1).count(); + String[] r = answer.get(1).asStringArray(); + + int numNames = answer.get(2).count(); + String[] n = answer.get(2).asStringArray(); + String[] s = answer.get(3).asStringArray(); + + Entry[] newSpecs; + if (diffFromGeneration == 0) { + newSpecs = new Entry[numNames]; + + for (int idx = 0; idx < numNames; idx++) { + newSpecs[idx] = new Entry(n[idx], s[idx]); + } + } else { + Map<String, Entry> map = new HashMap<>(); + for (Entry e : specs) { + map.put(e.getName(), e); + } + for (String rem : r) { + map.remove(rem); + } + for (int idx = 0; idx < numNames; idx++) { + map.put(n[idx], new Entry(n[idx], s[idx])); + } + newSpecs = new Entry[map.size()]; + int idx = 0; + for (Entry e : map.values()) { + newSpecs[idx++] = e; } - return; } - target = orb.connect(new Spec(currSlobrok)); - specsGen = 0; + + specs = newSpecs; + + specsGeneration = diffToGeneration; + int u = (updates + 1); + if (u == 0) { + u++; + } + updates = u; } - req = new Request("slobrok.incremental.fetch"); - req.parameters().add(new Int32Value(specsGen)); // gencnt - req.parameters().add(new Int32Value(5000)); // mstimeout - target.invokeAsync(req, 40.0, reqWait); + backOff.reset(); + updateTask.schedule(0.1); // be nice } /** * Invoked from the transport thread, requested by the shutdown * method. - **/ + */ private void handleShutdown() { if (req != null) { req.abort(); @@ -334,4 +294,44 @@ public class Mirror implements IMirror { target = null; } } + + /** + * An Entry contains the name and connection spec for a single + * service. + */ + public static final class Entry implements Comparable<Entry> { + + private final String name; + private final String spec; + private final char [] nameArray; + + public Entry(String name, String spec) { + this.name = name; + this.spec = spec; + this.nameArray = name.toCharArray(); + } + + public boolean equals(Object rhs) { + if (rhs == null || !(rhs instanceof Entry)) { + return false; + } + Entry e = (Entry) rhs; + return (name.equals(e.name) && spec.equals(e.spec)); + } + + public int hashCode() { + return (name.hashCode() + spec.hashCode()); + } + + public int compareTo(Entry b) { + int diff = name.compareTo(b.name); + return diff != 0 ? diff : spec.compareTo(b.spec); + } + + char [] getNameArray() { return nameArray; } + public String getName() { return name; } + public String getSpec() { return spec; } + + } + } diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Register.java b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java index 84720501ff8..d1ea7a7f1fa 100644 --- a/jrt/src/com/yahoo/jrt/slobrok/api/Register.java +++ b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java @@ -1,22 +1,21 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt.slobrok.api; - import com.yahoo.jrt.*; import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.logging.Logger; import java.util.logging.Level; - /** * A Register object is used to register and unregister services with * a slobrok cluster. * * The register/unregister operations performed against this object - * are stored in a todo list that will be performed asynchronously + * are stored in a to-do list that will be performed asynchronously * against the slobrok cluster as soon as possible. - **/ + */ public class Register { private static Logger log = Logger.getLogger(Register.class.getName()); @@ -27,9 +26,9 @@ public class Register { private String mySpec; private BackOffPolicy backOff; private boolean reqDone = false; - private ArrayList<String> names = new ArrayList<String>(); - private ArrayList<String> pending = new ArrayList<String>(); - private ArrayList<String> unreg = new ArrayList<String>(); + private List<String> names = new ArrayList<>(); + private List<String> pending = new ArrayList<>(); + private List<String> unreg = new ArrayList<>(); private Task updateTask = null; private RequestWaiter reqWait = null; private Target target = null; @@ -39,9 +38,9 @@ public class Register { /** * Remove all instances of name from list. - **/ - private void discard(ArrayList<String> list, String name) { - ArrayList<String> tmp = new ArrayList<String>(); + */ + private void discard(List<String> list, String name) { + List<String> tmp = new ArrayList<>(); tmp.add(name); list.removeAll(tmp); } @@ -54,7 +53,7 @@ public class Register { * @param slobroks slobrok connect spec list * @param spec the Spec representing hostname and port for this host * @param bop custom backoff policy, mostly useful for testing - **/ + */ public Register(Supervisor orb, SlobrokList slobroks, Spec spec, BackOffPolicy bop) { this.orb = orb; this.slobroks = slobroks; @@ -98,7 +97,7 @@ public class Register { * @param orb the Supervisor to use * @param slobroks slobrok connect spec list * @param spec the Spec representing hostname and port for this host - **/ + */ public Register(Supervisor orb, SlobrokList slobroks, Spec spec) { this(orb, slobroks, spec, new BackOff()); } @@ -111,9 +110,8 @@ public class Register { * @param slobroks slobrok connect spec list * @param myHost the hostname of this host * @param myPort the port number we are listening to - **/ - public Register(Supervisor orb, SlobrokList slobroks, - String myHost, int myPort) { + */ + public Register(Supervisor orb, SlobrokList slobroks, String myHost, int myPort) { this(orb, slobroks, new Spec(myHost, myPort)); } @@ -121,7 +119,7 @@ public class Register { /** * Shut down the Register. This will close any open connections * and stop the regular re-registration. - **/ + */ public void shutdown() { updateTask.kill(); orb.transport().perform(new Runnable() { @@ -133,7 +131,7 @@ public class Register { * Register a service with the slobrok cluster. * * @param name service name - **/ + */ public synchronized void registerName(String name) { if (names.indexOf(name) >= 0) { return; @@ -148,7 +146,7 @@ public class Register { * Unregister a service with the slobrok cluster * * @param name service name - **/ + */ public synchronized void unregisterName(String name) { discard(names, name); discard(pending, name); @@ -164,15 +162,11 @@ public class Register { reqDone = false; if (req.isError()) { if (req.errorCode() != ErrorCode.METHOD_FAILED) { - log.log(Level.FINE, "register failed: " - + req.errorMessage() - + " (code " + req.errorCode() + ")"); + log.log(Level.FINE, "register failed: " + req.errorMessage() + " (code " + req.errorCode() + ")"); target.close(); target = null; } else { - log.log(Level.WARNING, "register failed: " - + req.errorMessage() - + " (code " + req.errorCode() + ")"); + log.log(Level.WARNING, "register failed: " + req.errorMessage() + " (code " + req.errorCode() + ")"); } } else { backOff.reset(); @@ -192,13 +186,10 @@ public class Register { if (currSlobrok == null) { double delay = backOff.get(); updateTask.schedule(delay); - if (backOff.shouldWarn(delay)) { - log.log(Level.WARNING, "slobrok connection problems " - + "(retry in " + delay + " seconds) to: " + slobroks); - } else { - log.log(Level.FINE, "slobrok retry in " + delay - + " seconds"); - } + if (backOff.shouldWarn(delay)) + log.log(Level.WARNING, "slobrok connection problems (retry in " + delay + " seconds) to: " + slobroks); + else + log.log(Level.FINE, "slobrok retry in " + delay + " seconds"); return; } target = orb.connect(new Spec(currSlobrok)); @@ -207,16 +198,14 @@ public class Register { pending.addAll(names); } } - boolean rem = false; - boolean reg = false; + boolean unregister = false; String name; synchronized (this) { if (unreg.size() > 0) { name = unreg.remove(unreg.size() - 1); - rem = true; + unregister = true; } else if (pending.size() > 0) { name = pending.remove(pending.size() - 1); - reg = true; } else { pending.addAll(names); log.log(Level.FINE, "done, reschedule in 30s"); @@ -225,13 +214,13 @@ public class Register { } } - if (rem) { + if (unregister) { req = new Request("slobrok.unregisterRpcServer"); req.parameters().add(new StringValue(name)); log.log(Level.FINE, "unregister [" + name + "]"); req.parameters().add(new StringValue(mySpec)); target.invokeAsync(req, 35.0, reqWait); - } else if (reg) { + } else { // register req = new Request("slobrok.registerRpcServer"); req.parameters().add(new StringValue(name)); log.log(Level.FINE, "register [" + name + "]"); @@ -246,8 +235,7 @@ public class Register { } private void handleRpcUnreg(Request req) { - log.log(Level.WARNING, "unregistered name " - + req.parameters().get(0).asString()); + log.log(Level.WARNING, "unregistered name " + req.parameters().get(0).asString()); } /** @@ -266,4 +254,5 @@ public class Register { target = null; } } + } diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java index 085489897b5..3c81f9618f8 100644 --- a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java +++ b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java @@ -5,79 +5,16 @@ import com.yahoo.jrt.*; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class Slobrok { - private class RegisterCallback implements RequestWaiter { - - Request registerReq; - String name; - String spec; - Target target; - - public RegisterCallback(Request req, String name, String spec) { - req.detach(); - registerReq = req; - this.name = name; - this.spec = spec; - target = orb.connect(new Spec(spec)); - Request cbReq = new Request("slobrok.callback.listNamesServed"); - target.invokeAsync(cbReq, 5.0, this); - } - - public void handleRequestDone(Request req) { - if (!req.checkReturnTypes("S")) { - registerReq.setError(ErrorCode.METHOD_FAILED, "error during register callback: " - + req.errorMessage()); - registerReq.returnRequest(); - target.close(); - return; - } - String[] names = req.returnValues().get(0).asStringArray(); - boolean found = false; - for (String n : names) { - if (n.equals(name)) { - found = true; - } - } - if (!found) { - registerReq.setError(ErrorCode.METHOD_FAILED, "register failed: " - + "served names does not contain name"); - registerReq.returnRequest(); - target.close(); - return; - } - handleRegisterCallbackDone(registerReq, name, spec, target); - } - } - - private class FetchMirror implements Runnable { - public final Request req; - public final Task task; - - public FetchMirror(Request req, int timeout) { - req.detach(); - this.req = req; - task = orb.transport().createTask(this); - task.schedule(((double)timeout)/1000.0); - } - public void run() { // timeout - handleFetchMirrorTimeout(this); - } - } - - private class TargetMonitor implements TargetWatcher { - public void notifyTargetInvalid(Target target) { - handleTargetDown(target); - } - } - Supervisor orb; Acceptor listener; - HashMap<String,String> services = new HashMap<String,String>(); - ArrayList<FetchMirror> pendingFetch = new ArrayList<FetchMirror>(); - HashMap<String,Target> targets = new HashMap<String,Target>(); + private Map<String,String> services = new HashMap<>(); + List<FetchMirror> pendingFetch = new ArrayList<>(); + Map<String,Target> targets = new HashMap<>(); TargetMonitor monitor = new TargetMonitor(); int gencnt = 1; @@ -123,15 +60,11 @@ public class Slobrok { handleFetchMirrorFlush(); } - private void handleRegisterCallbackDone(Request req, - String name, String spec, - Target target) - { + private void handleRegisterCallbackDone(Request req, String name, String spec, Target target){ String stored = services.get(name); if (stored != null) { // too late - if (!stored.equals(spec)) { - req.setError(ErrorCode.METHOD_FAILED, - "service '" + name + "' registered with another spec"); + if ( ! stored.equals(spec)) { + req.setError(ErrorCode.METHOD_FAILED, "service '" + name + "' registered with another spec"); } req.returnRequest(); target.close(); @@ -153,8 +86,8 @@ public class Slobrok { } private void dumpServices(Request req) { - ArrayList<String> names = new ArrayList<String>(); - ArrayList<String> specs = new ArrayList<String>(); + List<String> names = new ArrayList<>(); + List<String> specs = new ArrayList<>(); for (Map.Entry<String,String> entry : services.entrySet()) { names.add(entry.getKey()); specs.add(entry.getValue()); @@ -225,12 +158,8 @@ public class Slobrok { if (stored == null) { new RegisterCallback(req, name, spec); } else { - if (stored.equals(spec)) { - // ok, already stored - } else { - req.setError(ErrorCode.METHOD_FAILED, - "service '" + name + "' registered with another spec"); - } + if ( ! stored.equals(spec)) + req.setError(ErrorCode.METHOD_FAILED, "service '" + name + "' registered with another spec"); } } @@ -267,4 +196,67 @@ public class Slobrok { } } + private class RegisterCallback implements RequestWaiter { + + Request registerReq; + String name; + String spec; + Target target; + + public RegisterCallback(Request req, String name, String spec) { + req.detach(); + registerReq = req; + this.name = name; + this.spec = spec; + target = orb.connect(new Spec(spec)); + Request cbReq = new Request("slobrok.callback.listNamesServed"); + target.invokeAsync(cbReq, 5.0, this); + } + + @Override + public void handleRequestDone(Request req) { + if ( ! req.checkReturnTypes("S")) { + registerReq.setError(ErrorCode.METHOD_FAILED, "error during register callback: " + req.errorMessage()); + registerReq.returnRequest(); + target.close(); + return; + } + String[] names = req.returnValues().get(0).asStringArray(); + boolean found = false; + for (String n : names) { + if (n.equals(name)) { + found = true; + } + } + if (!found) { + registerReq.setError(ErrorCode.METHOD_FAILED, "register failed: served names does not contain name"); + registerReq.returnRequest(); + target.close(); + return; + } + handleRegisterCallbackDone(registerReq, name, spec, target); + } + } + + private class FetchMirror implements Runnable { + public final Request req; + public final Task task; + + public FetchMirror(Request req, int timeout) { + req.detach(); + this.req = req; + task = orb.transport().createTask(this); + task.schedule(((double)timeout)/1000.0); + } + public void run() { // timeout + handleFetchMirrorTimeout(this); + } + } + + private class TargetMonitor implements TargetWatcher { + public void notifyTargetInvalid(Target target) { + handleTargetDown(target); + } + } + } |