summaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-09-29 16:36:21 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-09-29 16:36:21 +0200
commitb4656dc85fe215bf79eadedd868be0082fe70ab0 (patch)
treea3e11af2608694114a7fd5288748c7b2508e20d0 /jrt
parent59f27953f5c8cd144f31f45a82d88d21511b8e6a (diff)
Yahoo sets up mac wireless networks such that the local hostname points to an
ip which does not resolve. This works around that problem by finding a resolvable address (while still falling back to localhost if we only get ipv6 addresses, as that causes other problems in docker containers).
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Acceptor.java18
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java15
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java2
-rw-r--r--jrt/src/com/yahoo/jrt/Request.java5
-rw-r--r--jrt/src/com/yahoo/jrt/Spec.java29
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java5
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java9
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java308
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/Register.java67
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java156
10 files changed, 294 insertions, 320 deletions
diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java
index 05a7591ab74..7316f8c620b 100644
--- a/jrt/src/com/yahoo/jrt/Acceptor.java
+++ b/jrt/src/com/yahoo/jrt/Acceptor.java
@@ -13,7 +13,7 @@ import java.util.logging.Logger;
* transport thread. To create an acceptor you need to invoke the
* {@link Supervisor#listen listen} method in the {@link Supervisor}
* class.
- **/
+ */
public class Acceptor {
private class Run implements Runnable {
@@ -34,15 +34,12 @@ public class Acceptor {
private ServerSocketChannel serverChannel;
- Acceptor(Transport parent, Supervisor owner,
- Spec spec) throws ListenFailedException {
-
+ Acceptor(Transport parent, Supervisor owner, Spec spec) throws ListenFailedException {
this.parent = parent;
this.owner = owner;
- if (spec.malformed()) {
- throw new ListenFailedException("Malformed spec");
- }
+ if (spec.malformed())
+ throw new ListenFailedException("Malformed spec '" + spec + "'");
try {
serverChannel = ServerSocketChannel.open();
@@ -55,7 +52,7 @@ public class Acceptor {
if (serverChannel != null) {
try { serverChannel.socket().close(); } catch (Exception x) {}
}
- throw new ListenFailedException("Listen failed", e);
+ throw new ListenFailedException("Failed to listen to " + spec, e);
}
thread.setDaemon(true);
@@ -84,7 +81,7 @@ public class Acceptor {
* @return listening spec, or null if not listening.
**/
public Spec spec() {
- if (!serverChannel.isOpen()) {
+ if ( ! serverChannel.isOpen()) {
return null;
}
return new Spec(serverChannel.socket().getInetAddress().getHostName(),
@@ -94,8 +91,7 @@ public class Acceptor {
private void run() {
while (serverChannel.isOpen()) {
try {
- parent.addConnection(new Connection(parent, owner,
- serverChannel.accept()));
+ parent.addConnection(new Connection(parent, owner, serverChannel.accept()));
parent.sync();
} catch (java.nio.channels.ClosedChannelException x) {
} catch (Exception e) {
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index 7affa875cd6..52964726eb7 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -1,7 +1,6 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
@@ -35,10 +34,8 @@ class Connection extends Target {
private Buffer output = new Buffer(WRITE_SIZE * 2);
private int maxInputSize = 64*1024;
private int maxOutputSize = 64*1024;
- private Map<Integer, ReplyHandler> replyMap
- = new HashMap<Integer, ReplyHandler>();
- private Map<TargetWatcher, TargetWatcher> watchers
- = new IdentityHashMap<TargetWatcher, TargetWatcher>();
+ private Map<Integer, ReplyHandler> replyMap = new HashMap<>();
+ private Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>();
private int activeReqs = 0;
private int writeWork = 0;
private Transport parent;
@@ -52,8 +49,7 @@ class Connection extends Target {
private void setState(int state) {
if (state <= this.state) {
- log.log(Level.WARNING, "Bogus state transition: "
- + this.state + "->" + state);
+ log.log(Level.WARNING, "Bogus state transition: " + this.state + "->" + state);
return;
}
boolean live = (this.state == INITIAL && state == CONNECTED);
@@ -95,8 +91,7 @@ class Connection extends Target {
owner.sessionInit(this);
}
- public Connection(Transport parent, Supervisor owner,
- Spec spec, Object context) {
+ public Connection(Transport parent, Supervisor owner, Spec spec, Object context) {
super(context);
this.parent = parent;
this.owner = owner;
@@ -400,6 +395,6 @@ class Connection extends Target {
if (channel != null) {
return "Connection { " + channel.socket() + " }";
}
- return "Connection { no socket }";
+ return "Connection { no socket, spec " + spec + " }";
}
}
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
index fa43710b1f6..6778e047a8b 100644
--- a/jrt/src/com/yahoo/jrt/Connector.java
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -27,7 +27,7 @@ class Connector {
}
public void connectLater(Connection c) {
- if (!connectQueue.enqueue(c)) {
+ if ( ! connectQueue.enqueue(c)) {
parent.addConnection(c);
}
}
diff --git a/jrt/src/com/yahoo/jrt/Request.java b/jrt/src/com/yahoo/jrt/Request.java
index 99d7df8657e..4786124e56b 100644
--- a/jrt/src/com/yahoo/jrt/Request.java
+++ b/jrt/src/com/yahoo/jrt/Request.java
@@ -14,7 +14,7 @@ package com.yahoo.jrt;
* client/server roles are independent of connection client/server
* roles, since invocations can be performed both ways across a {@link
* Target}.
- **/
+ */
public class Request
{
private String methodName;
@@ -242,8 +242,7 @@ public class Request
if (returnValues.satisfies(returnTypes)) {
return true;
}
- setError(ErrorCode.WRONG_RETURN,
- "checkReturnValues: Wrong return values");
+ setError(ErrorCode.WRONG_RETURN, "checkReturnValues: Wrong return values");
return false;
}
diff --git a/jrt/src/com/yahoo/jrt/Spec.java b/jrt/src/com/yahoo/jrt/Spec.java
index 7ed0aa69920..4c1f07b98a2 100644
--- a/jrt/src/com/yahoo/jrt/Spec.java
+++ b/jrt/src/com/yahoo/jrt/Spec.java
@@ -2,6 +2,8 @@
package com.yahoo.jrt;
+import com.yahoo.net.HostName;
+
import java.net.SocketAddress;
import java.net.InetSocketAddress;
@@ -9,9 +11,9 @@ import java.net.InetSocketAddress;
/**
* A Spec is a network address used for either listening or
* connecting.
- **/
-public class Spec
-{
+ */
+public class Spec {
+
private SocketAddress address;
private String host;
private int port;
@@ -24,11 +26,11 @@ public class Spec
*
* @param spec input string to be parsed
* @see #malformed
- **/
+ */
public Spec(String spec) {
if (spec.startsWith("tcp/")) {
int sep = spec.indexOf(':');
- String portStr = null;
+ String portStr;
if (sep == -1) {
portStr = spec.substring(4);
} else {
@@ -52,7 +54,7 @@ public class Spec
*
* @param host host name
* @param port port number
- **/
+ */
public Spec(String host, int port) {
this.host = host;
this.port = port;
@@ -62,7 +64,7 @@ public class Spec
* Create a Spec from a port number.
*
* @param port port number
- **/
+ */
public Spec(int port) {
this.port = port;
}
@@ -71,7 +73,7 @@ public class Spec
* Obtain the host name of this address
*
* @return host name
- **/
+ */
public String host() {
return host;
}
@@ -80,7 +82,7 @@ public class Spec
* Obtain the port number if this address
*
* @return port number
- **/
+ */
public int port() {
return port;
}
@@ -90,7 +92,7 @@ public class Spec
* you whether that string was malformed.
*
* @return true if this address is malformed
- **/
+ */
public boolean malformed() {
return malformed;
}
@@ -100,7 +102,7 @@ public class Spec
* malformed, this method will return null.
*
* @return socket address
- **/
+ */
SocketAddress address() {
if (malformed) {
return null;
@@ -114,13 +116,13 @@ public class Spec
}
return address;
}
-
+
/**
* Obtain a string representation of this address. The return
* value from this method may be used to create a new Spec.
*
* @return string representation of this address
- **/
+ */
public String toString() {
if (malformed) {
return "MALFORMED";
@@ -130,4 +132,5 @@ public class Spec
}
return "tcp/" + host + ":" + port;
}
+
}
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 85bfed79732..6a9a978fb77 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -229,9 +229,8 @@ public class Transport {
* @param context application context for the new connection
* @param sync perform a synchronous connect in the calling thread
* if this flag is set
- **/
- Connection connect(Supervisor owner, Spec spec,
- Object context, boolean sync) {
+ */
+ Connection connect(Supervisor owner, Spec spec, Object context, boolean sync) {
Connection conn = new Connection(this, owner, spec, context);
if (sync) {
addConnection(conn.connect());
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java
index 3662e6ad5b9..421590e72ce 100644
--- a/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java
@@ -4,8 +4,8 @@ package com.yahoo.jrt.slobrok.api;
/**
* Defines an interface for the name server lookup.
*
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- **/
+ * @author Simon Thoresen
+ */
public interface IMirror {
/**
@@ -21,7 +21,7 @@ public interface IMirror {
* @return a list of all matching services, with corresponding connect specs
* @param pattern The pattern used for matching
**/
- public Mirror.Entry[] lookup(String pattern);
+ Mirror.Entry[] lookup(String pattern);
/**
* Obtain the number of updates seen by this mirror. The value may wrap, but will never become 0 again. This can be
@@ -30,5 +30,6 @@ public interface IMirror {
*
* @return number of slobrok updates seen
**/
- public int updates();
+ int updates();
+
}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java
index 5e62cb61b76..81ec51e2b9e 100644
--- a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java
@@ -1,16 +1,14 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt.slobrok.api;
-
import com.yahoo.jrt.*;
-import java.util.Arrays;
-import java.util.Random;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Logger;
import java.util.logging.Level;
-
/**
* A Mirror object is used to keep track of the services registered
* with a slobrok cluster.
@@ -18,57 +16,19 @@ import java.util.logging.Level;
* Updates to the service repository are fetched in the
* background. Lookups against this object is done using an internal
* mirror of the service repository.
- **/
+ */
public class Mirror implements IMirror {
private static Logger log = Logger.getLogger(Mirror.class.getName());
- /**
- * An Entry contains the name and connection spec for a single
- * service.
- **/
- public static final class Entry implements Comparable<Entry> {
- private final String name;
- private final String spec;
- private final char [] nameArray;
-
- public Entry(String name, String spec) {
- this.name = name;
- this.spec = spec;
- this.nameArray = name.toCharArray();
- }
-
- public boolean equals(Object rhs) {
- if (rhs == null || !(rhs instanceof Entry)) {
- return false;
- }
- Entry e = (Entry) rhs;
- return (name.equals(e.name) && spec.equals(e.spec));
- }
-
- public int hashCode() {
- return (name.hashCode() + spec.hashCode());
- }
-
- public int compareTo(Entry b) {
- int diff = name.compareTo(b.name);
- return diff != 0
- ? diff
- : spec.compareTo(b.spec);
- }
- char [] getNameArray() { return nameArray; }
- public String getName() { return name; }
- public String getSpec() { return spec; }
- }
-
private Supervisor orb;
private SlobrokList slobroks;
private String currSlobrok;
private BackOffPolicy backOff;
private volatile int updates = 0;
- private boolean reqDone = false;
+ private boolean requestDone = false;
private volatile Entry[] specs = new Entry[0];
- private int specsGen = 0;
+ private int specsGeneration = 0;
private Task updateTask = null;
private RequestWaiter reqWait = null;
private Target target = null;
@@ -87,11 +47,11 @@ public class Mirror implements IMirror {
this.slobroks = slobroks;
this.backOff = bop;
updateTask = orb.transport().createTask(new Runnable() {
- public void run() { handleUpdate(); }
+ public void run() { checkForUpdate(); }
});
reqWait = new RequestWaiter() {
public void handleRequestDone(Request req) {
- reqDone = true;
+ requestDone = true;
updateTask.scheduleNow();
}
};
@@ -104,7 +64,7 @@ public class Mirror implements IMirror {
*
* @param orb the Supervisor to use
* @param slobroks slobrok connect spec list
- **/
+ */
public Mirror(Supervisor orb, SlobrokList slobroks) {
this(orb, slobroks, new BackOff());
}
@@ -112,7 +72,7 @@ public class Mirror implements IMirror {
/**
* Shut down the Mirror. This will close any open connections and
* stop the regular mirror updates.
- **/
+ */
public void shutdown() {
updateTask.kill();
orb.transport().perform(new Runnable() {
@@ -122,12 +82,11 @@ public class Mirror implements IMirror {
@Override
public Entry[] lookup(String pattern) {
- ArrayList<Entry> found = new ArrayList<Entry>();
- Entry [] e = specs;
- char [] p = pattern.toCharArray();
- for (int i = 0; i < e.length; i++) {
- if (match(e[i].getNameArray(), p)) {
- found.add(e[i]);
+ ArrayList<Entry> found = new ArrayList<>();
+ char[] p = pattern.toCharArray();
+ for (Entry specEntry : specs) {
+ if (match(specEntry.getNameArray(), p)) {
+ found.add(specEntry);
}
}
return found.toArray(new Entry[found.size()]);
@@ -145,7 +104,7 @@ public class Mirror implements IMirror {
* (or if it never does, time out and tell the user there was no answer from any Service Location Broker).
*
* @return true if the MirrorAPI object has asked for updates from a Slobrok and got any answer back
- **/
+ */
public boolean ready() {
return (updates != 0);
}
@@ -167,7 +126,7 @@ public class Mirror implements IMirror {
* @return true if the name matches the pattern
* @param name the name
* @param pattern the pattern
- **/
+ */
static boolean match(char [] name, char [] pattern) {
int ni = 0;
int pi = 0;
@@ -197,95 +156,58 @@ public class Mirror implements IMirror {
/**
* Invoked by the update task.
- **/
- private void handleUpdate() {
- if (reqDone) {
- reqDone = false;
-
- if (req.errorCode() == ErrorCode.NONE &&
- req.returnValues().satisfies("SSi") &&
- req.returnValues().get(0).count() == req.returnValues().get(1).count())
- {
- Values answer = req.returnValues();
-
- if (specsGen != answer.get(2).asInt32()) {
-
- int numNames = answer.get(0).count();
- String[] n = answer.get(0).asStringArray();
- String[] s = answer.get(1).asStringArray();
- Entry[] newSpecs = new Entry[numNames];
-
- for (int idx = 0; idx < numNames; idx++) {
- newSpecs[idx] = new Entry(n[idx], s[idx]);
- }
-
- specs = newSpecs;
-
- specsGen = answer.get(2).asInt32();
- int u = (updates + 1);
- if (u == 0) {
- u++;
- }
- updates = u;
+ */
+ private void checkForUpdate() {
+ if (requestDone) {
+ handleUpdate();
+ requestDone = false;
+ return;
+ }
+
+ if (target != null && ! slobroks.contains(currSlobrok)) {
+ target.close();
+ target = null;
+ }
+ if (target == null) {
+ currSlobrok = slobroks.nextSlobrokSpec();
+ if (currSlobrok == null) {
+ double delay = backOff.get();
+ updateTask.schedule(delay);
+ if (backOff.shouldWarn(delay)) {
+ log.log(Level.INFO, "no location brokers available "
+ + "(retry in " + delay + " seconds) for: " + slobroks);
}
- backOff.reset();
- updateTask.schedule(0.1); // be nice
- return;
- }
- if (!req.checkReturnTypes("iSSSi")
- || (req.returnValues().get(2).count() !=
- req.returnValues().get(3).count()))
- {
- target.close();
- target = null;
- updateTask.scheduleNow(); // try next slobrok
return;
}
+ target = orb.connect(new Spec(currSlobrok));
+ specsGeneration = 0;
+ }
+ req = new Request("slobrok.incremental.fetch");
+ req.parameters().add(new Int32Value(specsGeneration)); // gencnt
+ req.parameters().add(new Int32Value(5000)); // mstimeout
+ target.invokeAsync(req, 40.0, reqWait);
+ }
+
+ private void handleUpdate() {
+ if (req.errorCode() == ErrorCode.NONE &&
+ req.returnValues().satisfies("SSi") &&
+ req.returnValues().get(0).count() == req.returnValues().get(1).count())
+ {
+ Values answer = req.returnValues();
+ if (specsGeneration != answer.get(2).asInt32()) {
- Values answer = req.returnValues();
+ int numNames = answer.get(0).count();
+ String[] n = answer.get(0).asStringArray();
+ String[] s = answer.get(1).asStringArray();
+ Entry[] newSpecs = new Entry[numNames];
- int diffFrom = answer.get(0).asInt32();
- int diffTo = answer.get(4).asInt32();
-
- if (specsGen != diffTo) {
-
- int nRemoves = answer.get(1).count();
- String[] r = answer.get(1).asStringArray();
-
- int numNames = answer.get(2).count();
- String[] n = answer.get(2).asStringArray();
- String[] s = answer.get(3).asStringArray();
-
-
- Entry[] newSpecs;
- if (diffFrom == 0) {
- newSpecs = new Entry[numNames];
-
- for (int idx = 0; idx < numNames; idx++) {
- newSpecs[idx] = new Entry(n[idx], s[idx]);
- }
- } else {
- java.util.HashMap<String, Entry> map = new java.util.HashMap<String, Entry>();
- for (Entry e : specs) {
- map.put(e.getName(), e);
- }
- for (String rem : r) {
- map.remove(rem);
- }
- for (int idx = 0; idx < numNames; idx++) {
- map.put(n[idx], new Entry(n[idx], s[idx]));
- }
- newSpecs = new Entry[map.size()];
- int idx = 0;
- for (Entry e : map.values()) {
- newSpecs[idx++] = e;
- }
+ for (int idx = 0; idx < numNames; idx++) {
+ newSpecs[idx] = new Entry(n[idx], s[idx]);
}
-
specs = newSpecs;
- specsGen = diffTo;
+ specsGeneration = answer.get(2).asInt32();
int u = (updates + 1);
if (u == 0) {
u++;
@@ -296,34 +218,72 @@ public class Mirror implements IMirror {
updateTask.schedule(0.1); // be nice
return;
}
- if (target != null && ! slobroks.contains(currSlobrok)) {
+ if (!req.checkReturnTypes("iSSSi")
+ || (req.returnValues().get(2).count() !=
+ req.returnValues().get(3).count()))
+ {
target.close();
target = null;
+ updateTask.scheduleNow(); // try next slobrok
+ return;
}
- if (target == null) {
- currSlobrok = slobroks.nextSlobrokSpec();
- if (currSlobrok == null) {
- double delay = backOff.get();
- updateTask.schedule(delay);
- if (backOff.shouldWarn(delay)) {
- log.log(Level.INFO, "no location brokers available "
- + "(retry in " + delay + " seconds) for: " + slobroks);
+
+
+ Values answer = req.returnValues();
+
+ int diffFromGeneration = answer.get(0).asInt32();
+ int diffToGeneration = answer.get(4).asInt32();
+ if (specsGeneration != diffToGeneration) {
+
+ int nRemoves = answer.get(1).count();
+ String[] r = answer.get(1).asStringArray();
+
+ int numNames = answer.get(2).count();
+ String[] n = answer.get(2).asStringArray();
+ String[] s = answer.get(3).asStringArray();
+
+ Entry[] newSpecs;
+ if (diffFromGeneration == 0) {
+ newSpecs = new Entry[numNames];
+
+ for (int idx = 0; idx < numNames; idx++) {
+ newSpecs[idx] = new Entry(n[idx], s[idx]);
+ }
+ } else {
+ Map<String, Entry> map = new HashMap<>();
+ for (Entry e : specs) {
+ map.put(e.getName(), e);
+ }
+ for (String rem : r) {
+ map.remove(rem);
+ }
+ for (int idx = 0; idx < numNames; idx++) {
+ map.put(n[idx], new Entry(n[idx], s[idx]));
+ }
+ newSpecs = new Entry[map.size()];
+ int idx = 0;
+ for (Entry e : map.values()) {
+ newSpecs[idx++] = e;
}
- return;
}
- target = orb.connect(new Spec(currSlobrok));
- specsGen = 0;
+
+ specs = newSpecs;
+
+ specsGeneration = diffToGeneration;
+ int u = (updates + 1);
+ if (u == 0) {
+ u++;
+ }
+ updates = u;
}
- req = new Request("slobrok.incremental.fetch");
- req.parameters().add(new Int32Value(specsGen)); // gencnt
- req.parameters().add(new Int32Value(5000)); // mstimeout
- target.invokeAsync(req, 40.0, reqWait);
+ backOff.reset();
+ updateTask.schedule(0.1); // be nice
}
/**
* Invoked from the transport thread, requested by the shutdown
* method.
- **/
+ */
private void handleShutdown() {
if (req != null) {
req.abort();
@@ -334,4 +294,44 @@ public class Mirror implements IMirror {
target = null;
}
}
+
+ /**
+ * An Entry contains the name and connection spec for a single
+ * service.
+ */
+ public static final class Entry implements Comparable<Entry> {
+
+ private final String name;
+ private final String spec;
+ private final char [] nameArray;
+
+ public Entry(String name, String spec) {
+ this.name = name;
+ this.spec = spec;
+ this.nameArray = name.toCharArray();
+ }
+
+ public boolean equals(Object rhs) {
+ if (rhs == null || !(rhs instanceof Entry)) {
+ return false;
+ }
+ Entry e = (Entry) rhs;
+ return (name.equals(e.name) && spec.equals(e.spec));
+ }
+
+ public int hashCode() {
+ return (name.hashCode() + spec.hashCode());
+ }
+
+ public int compareTo(Entry b) {
+ int diff = name.compareTo(b.name);
+ return diff != 0 ? diff : spec.compareTo(b.spec);
+ }
+
+ char [] getNameArray() { return nameArray; }
+ public String getName() { return name; }
+ public String getSpec() { return spec; }
+
+ }
+
}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Register.java b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java
index 84720501ff8..d1ea7a7f1fa 100644
--- a/jrt/src/com/yahoo/jrt/slobrok/api/Register.java
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java
@@ -1,22 +1,21 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt.slobrok.api;
-
import com.yahoo.jrt.*;
import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import java.util.logging.Logger;
import java.util.logging.Level;
-
/**
* A Register object is used to register and unregister services with
* a slobrok cluster.
*
* The register/unregister operations performed against this object
- * are stored in a todo list that will be performed asynchronously
+ * are stored in a to-do list that will be performed asynchronously
* against the slobrok cluster as soon as possible.
- **/
+ */
public class Register {
private static Logger log = Logger.getLogger(Register.class.getName());
@@ -27,9 +26,9 @@ public class Register {
private String mySpec;
private BackOffPolicy backOff;
private boolean reqDone = false;
- private ArrayList<String> names = new ArrayList<String>();
- private ArrayList<String> pending = new ArrayList<String>();
- private ArrayList<String> unreg = new ArrayList<String>();
+ private List<String> names = new ArrayList<>();
+ private List<String> pending = new ArrayList<>();
+ private List<String> unreg = new ArrayList<>();
private Task updateTask = null;
private RequestWaiter reqWait = null;
private Target target = null;
@@ -39,9 +38,9 @@ public class Register {
/**
* Remove all instances of name from list.
- **/
- private void discard(ArrayList<String> list, String name) {
- ArrayList<String> tmp = new ArrayList<String>();
+ */
+ private void discard(List<String> list, String name) {
+ List<String> tmp = new ArrayList<>();
tmp.add(name);
list.removeAll(tmp);
}
@@ -54,7 +53,7 @@ public class Register {
* @param slobroks slobrok connect spec list
* @param spec the Spec representing hostname and port for this host
* @param bop custom backoff policy, mostly useful for testing
- **/
+ */
public Register(Supervisor orb, SlobrokList slobroks, Spec spec, BackOffPolicy bop) {
this.orb = orb;
this.slobroks = slobroks;
@@ -98,7 +97,7 @@ public class Register {
* @param orb the Supervisor to use
* @param slobroks slobrok connect spec list
* @param spec the Spec representing hostname and port for this host
- **/
+ */
public Register(Supervisor orb, SlobrokList slobroks, Spec spec) {
this(orb, slobroks, spec, new BackOff());
}
@@ -111,9 +110,8 @@ public class Register {
* @param slobroks slobrok connect spec list
* @param myHost the hostname of this host
* @param myPort the port number we are listening to
- **/
- public Register(Supervisor orb, SlobrokList slobroks,
- String myHost, int myPort) {
+ */
+ public Register(Supervisor orb, SlobrokList slobroks, String myHost, int myPort) {
this(orb, slobroks, new Spec(myHost, myPort));
}
@@ -121,7 +119,7 @@ public class Register {
/**
* Shut down the Register. This will close any open connections
* and stop the regular re-registration.
- **/
+ */
public void shutdown() {
updateTask.kill();
orb.transport().perform(new Runnable() {
@@ -133,7 +131,7 @@ public class Register {
* Register a service with the slobrok cluster.
*
* @param name service name
- **/
+ */
public synchronized void registerName(String name) {
if (names.indexOf(name) >= 0) {
return;
@@ -148,7 +146,7 @@ public class Register {
* Unregister a service with the slobrok cluster
*
* @param name service name
- **/
+ */
public synchronized void unregisterName(String name) {
discard(names, name);
discard(pending, name);
@@ -164,15 +162,11 @@ public class Register {
reqDone = false;
if (req.isError()) {
if (req.errorCode() != ErrorCode.METHOD_FAILED) {
- log.log(Level.FINE, "register failed: "
- + req.errorMessage()
- + " (code " + req.errorCode() + ")");
+ log.log(Level.FINE, "register failed: " + req.errorMessage() + " (code " + req.errorCode() + ")");
target.close();
target = null;
} else {
- log.log(Level.WARNING, "register failed: "
- + req.errorMessage()
- + " (code " + req.errorCode() + ")");
+ log.log(Level.WARNING, "register failed: " + req.errorMessage() + " (code " + req.errorCode() + ")");
}
} else {
backOff.reset();
@@ -192,13 +186,10 @@ public class Register {
if (currSlobrok == null) {
double delay = backOff.get();
updateTask.schedule(delay);
- if (backOff.shouldWarn(delay)) {
- log.log(Level.WARNING, "slobrok connection problems "
- + "(retry in " + delay + " seconds) to: " + slobroks);
- } else {
- log.log(Level.FINE, "slobrok retry in " + delay
- + " seconds");
- }
+ if (backOff.shouldWarn(delay))
+ log.log(Level.WARNING, "slobrok connection problems (retry in " + delay + " seconds) to: " + slobroks);
+ else
+ log.log(Level.FINE, "slobrok retry in " + delay + " seconds");
return;
}
target = orb.connect(new Spec(currSlobrok));
@@ -207,16 +198,14 @@ public class Register {
pending.addAll(names);
}
}
- boolean rem = false;
- boolean reg = false;
+ boolean unregister = false;
String name;
synchronized (this) {
if (unreg.size() > 0) {
name = unreg.remove(unreg.size() - 1);
- rem = true;
+ unregister = true;
} else if (pending.size() > 0) {
name = pending.remove(pending.size() - 1);
- reg = true;
} else {
pending.addAll(names);
log.log(Level.FINE, "done, reschedule in 30s");
@@ -225,13 +214,13 @@ public class Register {
}
}
- if (rem) {
+ if (unregister) {
req = new Request("slobrok.unregisterRpcServer");
req.parameters().add(new StringValue(name));
log.log(Level.FINE, "unregister [" + name + "]");
req.parameters().add(new StringValue(mySpec));
target.invokeAsync(req, 35.0, reqWait);
- } else if (reg) {
+ } else { // register
req = new Request("slobrok.registerRpcServer");
req.parameters().add(new StringValue(name));
log.log(Level.FINE, "register [" + name + "]");
@@ -246,8 +235,7 @@ public class Register {
}
private void handleRpcUnreg(Request req) {
- log.log(Level.WARNING, "unregistered name "
- + req.parameters().get(0).asString());
+ log.log(Level.WARNING, "unregistered name " + req.parameters().get(0).asString());
}
/**
@@ -266,4 +254,5 @@ public class Register {
target = null;
}
}
+
}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
index 085489897b5..3c81f9618f8 100644
--- a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
+++ b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
@@ -5,79 +5,16 @@ import com.yahoo.jrt.*;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class Slobrok {
- private class RegisterCallback implements RequestWaiter {
-
- Request registerReq;
- String name;
- String spec;
- Target target;
-
- public RegisterCallback(Request req, String name, String spec) {
- req.detach();
- registerReq = req;
- this.name = name;
- this.spec = spec;
- target = orb.connect(new Spec(spec));
- Request cbReq = new Request("slobrok.callback.listNamesServed");
- target.invokeAsync(cbReq, 5.0, this);
- }
-
- public void handleRequestDone(Request req) {
- if (!req.checkReturnTypes("S")) {
- registerReq.setError(ErrorCode.METHOD_FAILED, "error during register callback: "
- + req.errorMessage());
- registerReq.returnRequest();
- target.close();
- return;
- }
- String[] names = req.returnValues().get(0).asStringArray();
- boolean found = false;
- for (String n : names) {
- if (n.equals(name)) {
- found = true;
- }
- }
- if (!found) {
- registerReq.setError(ErrorCode.METHOD_FAILED, "register failed: "
- + "served names does not contain name");
- registerReq.returnRequest();
- target.close();
- return;
- }
- handleRegisterCallbackDone(registerReq, name, spec, target);
- }
- }
-
- private class FetchMirror implements Runnable {
- public final Request req;
- public final Task task;
-
- public FetchMirror(Request req, int timeout) {
- req.detach();
- this.req = req;
- task = orb.transport().createTask(this);
- task.schedule(((double)timeout)/1000.0);
- }
- public void run() { // timeout
- handleFetchMirrorTimeout(this);
- }
- }
-
- private class TargetMonitor implements TargetWatcher {
- public void notifyTargetInvalid(Target target) {
- handleTargetDown(target);
- }
- }
-
Supervisor orb;
Acceptor listener;
- HashMap<String,String> services = new HashMap<String,String>();
- ArrayList<FetchMirror> pendingFetch = new ArrayList<FetchMirror>();
- HashMap<String,Target> targets = new HashMap<String,Target>();
+ private Map<String,String> services = new HashMap<>();
+ List<FetchMirror> pendingFetch = new ArrayList<>();
+ Map<String,Target> targets = new HashMap<>();
TargetMonitor monitor = new TargetMonitor();
int gencnt = 1;
@@ -123,15 +60,11 @@ public class Slobrok {
handleFetchMirrorFlush();
}
- private void handleRegisterCallbackDone(Request req,
- String name, String spec,
- Target target)
- {
+ private void handleRegisterCallbackDone(Request req, String name, String spec, Target target){
String stored = services.get(name);
if (stored != null) { // too late
- if (!stored.equals(spec)) {
- req.setError(ErrorCode.METHOD_FAILED,
- "service '" + name + "' registered with another spec");
+ if ( ! stored.equals(spec)) {
+ req.setError(ErrorCode.METHOD_FAILED, "service '" + name + "' registered with another spec");
}
req.returnRequest();
target.close();
@@ -153,8 +86,8 @@ public class Slobrok {
}
private void dumpServices(Request req) {
- ArrayList<String> names = new ArrayList<String>();
- ArrayList<String> specs = new ArrayList<String>();
+ List<String> names = new ArrayList<>();
+ List<String> specs = new ArrayList<>();
for (Map.Entry<String,String> entry : services.entrySet()) {
names.add(entry.getKey());
specs.add(entry.getValue());
@@ -225,12 +158,8 @@ public class Slobrok {
if (stored == null) {
new RegisterCallback(req, name, spec);
} else {
- if (stored.equals(spec)) {
- // ok, already stored
- } else {
- req.setError(ErrorCode.METHOD_FAILED,
- "service '" + name + "' registered with another spec");
- }
+ if ( ! stored.equals(spec))
+ req.setError(ErrorCode.METHOD_FAILED, "service '" + name + "' registered with another spec");
}
}
@@ -267,4 +196,67 @@ public class Slobrok {
}
}
+ private class RegisterCallback implements RequestWaiter {
+
+ Request registerReq;
+ String name;
+ String spec;
+ Target target;
+
+ public RegisterCallback(Request req, String name, String spec) {
+ req.detach();
+ registerReq = req;
+ this.name = name;
+ this.spec = spec;
+ target = orb.connect(new Spec(spec));
+ Request cbReq = new Request("slobrok.callback.listNamesServed");
+ target.invokeAsync(cbReq, 5.0, this);
+ }
+
+ @Override
+ public void handleRequestDone(Request req) {
+ if ( ! req.checkReturnTypes("S")) {
+ registerReq.setError(ErrorCode.METHOD_FAILED, "error during register callback: " + req.errorMessage());
+ registerReq.returnRequest();
+ target.close();
+ return;
+ }
+ String[] names = req.returnValues().get(0).asStringArray();
+ boolean found = false;
+ for (String n : names) {
+ if (n.equals(name)) {
+ found = true;
+ }
+ }
+ if (!found) {
+ registerReq.setError(ErrorCode.METHOD_FAILED, "register failed: served names does not contain name");
+ registerReq.returnRequest();
+ target.close();
+ return;
+ }
+ handleRegisterCallbackDone(registerReq, name, spec, target);
+ }
+ }
+
+ private class FetchMirror implements Runnable {
+ public final Request req;
+ public final Task task;
+
+ public FetchMirror(Request req, int timeout) {
+ req.detach();
+ this.req = req;
+ task = orb.transport().createTask(this);
+ task.schedule(((double)timeout)/1000.0);
+ }
+ public void run() { // timeout
+ handleFetchMirrorTimeout(this);
+ }
+ }
+
+ private class TargetMonitor implements TargetWatcher {
+ public void notifyTargetInvalid(Target target) {
+ handleTargetDown(target);
+ }
+ }
+
}