summaryrefslogtreecommitdiffstats
path: root/jrt/src
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /jrt/src
Publish
Diffstat (limited to 'jrt/src')
-rw-r--r--jrt/src/com/yahoo/jrt/Acceptor.java145
-rw-r--r--jrt/src/com/yahoo/jrt/Buffer.java126
-rw-r--r--jrt/src/com/yahoo/jrt/Closer.java54
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java405
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java76
-rw-r--r--jrt/src/com/yahoo/jrt/DataArray.java64
-rw-r--r--jrt/src/com/yahoo/jrt/DataValue.java51
-rw-r--r--jrt/src/com/yahoo/jrt/DoubleArray.java54
-rw-r--r--jrt/src/com/yahoo/jrt/DoubleValue.java48
-rw-r--r--jrt/src/com/yahoo/jrt/EndOfQueueException.java19
-rw-r--r--jrt/src/com/yahoo/jrt/ErrorCode.java52
-rw-r--r--jrt/src/com/yahoo/jrt/ErrorPacket.java52
-rw-r--r--jrt/src/com/yahoo/jrt/FatalErrorHandler.java21
-rw-r--r--jrt/src/com/yahoo/jrt/FloatArray.java54
-rw-r--r--jrt/src/com/yahoo/jrt/FloatValue.java49
-rw-r--r--jrt/src/com/yahoo/jrt/Int16Array.java54
-rw-r--r--jrt/src/com/yahoo/jrt/Int16Value.java49
-rw-r--r--jrt/src/com/yahoo/jrt/Int32Array.java55
-rw-r--r--jrt/src/com/yahoo/jrt/Int32Value.java48
-rw-r--r--jrt/src/com/yahoo/jrt/Int64Array.java54
-rw-r--r--jrt/src/com/yahoo/jrt/Int64Value.java49
-rw-r--r--jrt/src/com/yahoo/jrt/Int8Array.java52
-rw-r--r--jrt/src/com/yahoo/jrt/Int8Value.java48
-rw-r--r--jrt/src/com/yahoo/jrt/InvocationClient.java95
-rw-r--r--jrt/src/com/yahoo/jrt/InvocationServer.java65
-rw-r--r--jrt/src/com/yahoo/jrt/InvokeProxy.java47
-rw-r--r--jrt/src/com/yahoo/jrt/ListenFailedException.java31
-rw-r--r--jrt/src/com/yahoo/jrt/MandatoryMethods.java98
-rw-r--r--jrt/src/com/yahoo/jrt/Method.java290
-rw-r--r--jrt/src/com/yahoo/jrt/MethodCreateException.java34
-rw-r--r--jrt/src/com/yahoo/jrt/MethodHandler.java21
-rw-r--r--jrt/src/com/yahoo/jrt/Packet.java50
-rw-r--r--jrt/src/com/yahoo/jrt/PacketInfo.java109
-rw-r--r--jrt/src/com/yahoo/jrt/Queue.java132
-rw-r--r--jrt/src/com/yahoo/jrt/ReplyHandler.java9
-rw-r--r--jrt/src/com/yahoo/jrt/ReplyPacket.java42
-rw-r--r--jrt/src/com/yahoo/jrt/Request.java281
-rw-r--r--jrt/src/com/yahoo/jrt/RequestPacket.java52
-rw-r--r--jrt/src/com/yahoo/jrt/RequestWaiter.java18
-rw-r--r--jrt/src/com/yahoo/jrt/Scheduler.java136
-rw-r--r--jrt/src/com/yahoo/jrt/SessionHandler.java66
-rw-r--r--jrt/src/com/yahoo/jrt/SingleRequestWaiter.java19
-rw-r--r--jrt/src/com/yahoo/jrt/Spec.java133
-rw-r--r--jrt/src/com/yahoo/jrt/StringArray.java82
-rw-r--r--jrt/src/com/yahoo/jrt/StringValue.java68
-rw-r--r--jrt/src/com/yahoo/jrt/Supervisor.java320
-rw-r--r--jrt/src/com/yahoo/jrt/Target.java147
-rw-r--r--jrt/src/com/yahoo/jrt/TargetWatcher.java19
-rw-r--r--jrt/src/com/yahoo/jrt/Task.java99
-rw-r--r--jrt/src/com/yahoo/jrt/ThreadQueue.java63
-rw-r--r--jrt/src/com/yahoo/jrt/TieBreaker.java25
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java401
-rw-r--r--jrt/src/com/yahoo/jrt/Value.java290
-rw-r--r--jrt/src/com/yahoo/jrt/Values.java140
-rw-r--r--jrt/src/com/yahoo/jrt/package-info.java5
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/BackOff.java30
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/BackOffPolicy.java38
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java34
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java337
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/Register.java269
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/SlobrokList.java94
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/api/package-info.java5
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/package-info.java5
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java270
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/server/package-info.java5
-rw-r--r--jrt/src/com/yahoo/jrt/tool/RpcInvoker.java107
-rw-r--r--jrt/src/com/yahoo/jrt/tool/package-info.java5
67 files changed, 6265 insertions, 0 deletions
diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java
new file mode 100644
index 00000000000..05a7591ab74
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Acceptor.java
@@ -0,0 +1,145 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.channels.ServerSocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * A class used to listen on a network socket. A separate thread is
+ * used to accept connections and register them with the underlying
+ * transport thread. To create an acceptor you need to invoke the
+ * {@link Supervisor#listen listen} method in the {@link Supervisor}
+ * class.
+ **/
+public class Acceptor {
+
+ private class Run implements Runnable {
+ public void run() {
+ try {
+ Acceptor.this.run();
+ } catch (Throwable problem) {
+ parent.handleFailure(problem, Acceptor.this);
+ }
+ }
+ }
+
+ private static Logger log = Logger.getLogger(Acceptor.class.getName());
+
+ private Thread thread = new Thread(new Run(), "<acceptor>");
+ private Transport parent;
+ private Supervisor owner;
+
+ private ServerSocketChannel serverChannel;
+
+ Acceptor(Transport parent, Supervisor owner,
+ Spec spec) throws ListenFailedException {
+
+ this.parent = parent;
+ this.owner = owner;
+
+ if (spec.malformed()) {
+ throw new ListenFailedException("Malformed spec");
+ }
+
+ try {
+ serverChannel = ServerSocketChannel.open();
+ serverChannel.configureBlocking(true);
+ if (spec.port() != 0) {
+ serverChannel.socket().setReuseAddress(true);
+ }
+ serverChannel.socket().bind(spec.address(), 500);
+ } catch (Exception e) {
+ if (serverChannel != null) {
+ try { serverChannel.socket().close(); } catch (Exception x) {}
+ }
+ throw new ListenFailedException("Listen failed", e);
+ }
+
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Obtain the local port number this Acceptor is listening to. If
+ * this Acceptor is no longer listening (it has been shut down),
+ * -1 will be returned.
+ *
+ * @return listening port, or -1 if not listening
+ **/
+ public int port() {
+ if (!serverChannel.isOpen()) {
+ return -1;
+ }
+ return serverChannel.socket().getLocalPort();
+ }
+
+ /**
+ * Obtain the Spec for the local port and host interface this Acceptor
+ * is listening to. If this Acceptor is no longer listening (it has
+ * been shut down), null will be returned.
+ *
+ * @return listening spec, or null if not listening.
+ **/
+ public Spec spec() {
+ if (!serverChannel.isOpen()) {
+ return null;
+ }
+ return new Spec(serverChannel.socket().getInetAddress().getHostName(),
+ serverChannel.socket().getLocalPort());
+ }
+
+ private void run() {
+ while (serverChannel.isOpen()) {
+ try {
+ parent.addConnection(new Connection(parent, owner,
+ serverChannel.accept()));
+ parent.sync();
+ } catch (java.nio.channels.ClosedChannelException x) {
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Error accepting connection", e);
+ }
+ }
+ }
+
+ /**
+ * Initiate controlled shutdown of the acceptor thread
+ *
+ * @return this object, to enable chaining with {@link #join join}
+ **/
+ public Acceptor shutdown() {
+ try {
+ serverChannel.socket().close();
+ } catch (Exception e1) {
+ log.log(Level.WARNING, "Error closing server socket", e1);
+ Thread.yield(); // throw some salt over the shoulder
+ try {
+ serverChannel.socket().close();
+ } catch (Exception e2) {
+ log.log(Level.WARNING, "Error closing server socket", e2);
+ Thread.yield(); // throw some salt over the shoulder
+ try {
+ serverChannel.socket().close();
+ } catch (Exception e3) {
+ log.log(Level.WARNING, "Error closing server socket", e3);
+ throw new Error("Error closing server socket 3 times", e3);
+ }
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Wait for the acceptor thread to finish
+ **/
+ public void join() {
+ while (true) {
+ try {
+ thread.join();
+ return;
+ } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Buffer.java b/jrt/src/com/yahoo/jrt/Buffer.java
new file mode 100644
index 00000000000..53e91d80b4d
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Buffer.java
@@ -0,0 +1,126 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+class Buffer {
+
+ static final int MAX_IO = 65000;
+
+ private ByteBuffer buf;
+ private int readPos;
+ private int writePos;
+ private boolean readMode;
+
+ private void setReadMode() {
+ if (readMode) {
+ buf.limit(writePos);
+ return;
+ }
+ writePos = buf.position();
+ buf.position(readPos);
+ buf.limit(writePos);
+ readMode = true;
+ }
+
+ private void setWriteMode() {
+ if (!readMode) {
+ buf.limit(buf.capacity());
+ return;
+ }
+ readPos = buf.position();
+ buf.limit(buf.capacity());
+ buf.position(writePos);
+ readMode = false;
+ }
+
+ private void ensureFree(int minFree) {
+ // assumes setWriteMode called just before
+ if (buf.remaining() >= minFree) {
+ return;
+ }
+ writePos = buf.position();
+ int used = writePos - readPos;
+ int free = buf.remaining() + readPos;
+ if (free >= minFree && free >= used) {
+ buf.position(readPos);
+ buf.limit(writePos);
+ buf.compact();
+ readPos = 0;
+ } else {
+ int size = buf.capacity() * 2;
+ if (buf.capacity() + free < minFree) {
+ size = buf.capacity() + minFree;
+ }
+ ByteBuffer tmp = ByteBuffer.allocate(size);
+ tmp.order(buf.order());
+ buf.position(readPos);
+ buf.limit(writePos);
+ tmp.put(buf);
+ buf = tmp;
+ readPos = 0;
+ }
+ }
+
+ public Buffer(int size) {
+ buf = ByteBuffer.allocate(size);
+ readPos = 0;
+ writePos = 0;
+ readMode = false;
+ }
+
+ public boolean shrink(int size) {
+ int rpos = readMode? buf.position() : readPos;
+ int wpos = readMode? writePos : buf.position();
+ int used = wpos - rpos;
+ if (used > size || buf.capacity() <= size) {
+ return false;
+ }
+ ByteBuffer tmp = ByteBuffer.allocate(size);
+ tmp.order(buf.order());
+ buf.position(rpos);
+ buf.limit(wpos);
+ tmp.put(buf);
+ buf = tmp;
+ readPos = 0;
+ writePos = used;
+ buf.position(readMode? readPos : writePos);
+ buf.limit(readMode? writePos : buf.capacity());
+ return true;
+ }
+
+ public int bytes() {
+ return (readMode)
+ ? (writePos - buf.position())
+ : (buf.position() - readPos);
+ }
+
+ public ByteBuffer getReadable() {
+ setReadMode();
+ return buf;
+ }
+
+ public ByteBuffer getWritable(int minFree) {
+ setWriteMode();
+ ensureFree(minFree);
+ return buf;
+ }
+
+ public ByteBuffer getChannelReadable() {
+ ByteBuffer bb = getReadable();
+ if (bb.remaining() > MAX_IO) {
+ bb.limit(bb.position() + MAX_IO);
+ }
+ return bb;
+ }
+
+ public ByteBuffer getChannelWritable(int minFree) {
+ ByteBuffer bb = getWritable(minFree);
+ if (bb.remaining() > MAX_IO) {
+ bb.limit(bb.position() + MAX_IO);
+ }
+ return bb;
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Closer.java b/jrt/src/com/yahoo/jrt/Closer.java
new file mode 100644
index 00000000000..2c49d0abdaa
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Closer.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class Closer {
+
+ private class Run implements Runnable {
+ public void run() {
+ try {
+ Closer.this.run();
+ } catch (Throwable problem) {
+ parent.handleFailure(problem, Closer.this);
+ }
+ }
+ }
+
+ private Thread thread = new Thread(new Run(), "<closer>");
+ private Transport parent;
+ private ThreadQueue closeQueue = new ThreadQueue();
+
+ public Closer(Transport parent) {
+ this.parent = parent;
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void closeLater(Connection c) {
+ if (!closeQueue.enqueue(c)) {
+ c.closeSocket();
+ }
+ }
+
+ private void run() {
+ try {
+ while (true) {
+ ((Connection)closeQueue.dequeue()).closeSocket();
+ }
+ } catch (EndOfQueueException e) {}
+ }
+
+ public Closer shutdown() {
+ closeQueue.close();
+ return this;
+ }
+
+ public void join() {
+ while (true) {
+ try {
+ thread.join();
+ return;
+ } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
new file mode 100644
index 00000000000..7affa875cd6
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -0,0 +1,405 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+class Connection extends Target {
+
+ private static Logger log = Logger.getLogger(Connection.class.getName());
+
+ private static final int READ_SIZE = 8192;
+ private static final int READ_REDO = 10;
+ private static final int WRITE_SIZE = 8192;
+ private static final int WRITE_REDO = 10;
+
+ private static final int INITIAL = 0;
+ private static final int CONNECTED = 1;
+ private static final int CLOSED = 2;
+
+ private int state = INITIAL;
+ private Queue queue = new Queue();
+ private Queue myQueue = new Queue();
+ private Buffer input = new Buffer(READ_SIZE * 2);
+ private Buffer output = new Buffer(WRITE_SIZE * 2);
+ private int maxInputSize = 64*1024;
+ private int maxOutputSize = 64*1024;
+ private Map<Integer, ReplyHandler> replyMap
+ = new HashMap<Integer, ReplyHandler>();
+ private Map<TargetWatcher, TargetWatcher> watchers
+ = new IdentityHashMap<TargetWatcher, TargetWatcher>();
+ private int activeReqs = 0;
+ private int writeWork = 0;
+ private Transport parent;
+ private Supervisor owner;
+ private Spec spec;
+ private SocketChannel channel;
+ private boolean server;
+ private AtomicLong requestId = new AtomicLong(0);
+ private SelectionKey selectionKey;
+ private Exception lostReason = null;
+
+ private void setState(int state) {
+ if (state <= this.state) {
+ log.log(Level.WARNING, "Bogus state transition: "
+ + this.state + "->" + state);
+ return;
+ }
+ boolean live = (this.state == INITIAL && state == CONNECTED);
+ boolean down = (this.state != CLOSED && state == CLOSED);
+ boolean fini;
+ boolean pendingWrite;
+ synchronized (this) {
+ this.state = state;
+ fini = down && (activeReqs == 0);
+ pendingWrite = (writeWork > 0);
+ }
+ if (live) {
+ if (pendingWrite) {
+ enableWrite();
+ }
+ owner.sessionLive(this);
+ }
+ if (down) {
+ for (ReplyHandler rh : replyMap.values()) {
+ rh.handleConnectionDown();
+ }
+ for (TargetWatcher watcher : watchers.values()) {
+ watcher.notifyTargetInvalid(this);
+ }
+ owner.sessionDown(this);
+ }
+ if (fini) {
+ owner.sessionFini(this);
+ }
+ }
+
+ public Connection(Transport parent, Supervisor owner,
+ SocketChannel channel) {
+
+ this.parent = parent;
+ this.owner = owner;
+ this.channel = channel;
+ server = true;
+ owner.sessionInit(this);
+ }
+
+ public Connection(Transport parent, Supervisor owner,
+ Spec spec, Object context) {
+ super(context);
+ this.parent = parent;
+ this.owner = owner;
+ this.spec = spec;
+ server = false;
+ owner.sessionInit(this);
+ }
+
+ public void setMaxInputSize(int bytes) {
+ maxInputSize = bytes;
+ }
+
+ public void setMaxOutputSize(int bytes) {
+ maxOutputSize = bytes;
+ }
+
+ public Transport transport() {
+ return parent;
+ }
+
+ public int allocateKey() {
+ long v = requestId.getAndIncrement();
+ v = v*2 + (server ? 1 : 0);
+ int i = (int)(v & 0x7fffffff);
+ return i;
+ }
+
+ public synchronized boolean cancelReply(ReplyHandler handler) {
+ if (state == CLOSED) {
+ return false;
+ }
+ ReplyHandler stored = replyMap.remove(handler.key());
+ if (stored != handler) {
+ if (stored != null) {
+ replyMap.put(handler.key(), stored);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ public boolean postPacket(Packet packet, ReplyHandler handler) {
+ boolean accepted = false;
+ boolean enableWrite = false;
+ synchronized (this) {
+ if (state <= CONNECTED) {
+ enableWrite = (writeWork == 0 && state == CONNECTED);
+ queue.enqueue(packet);
+ writeWork++;
+ accepted = true;
+ if (handler != null) {
+ replyMap.put(handler.key(), handler);
+ }
+ }
+ }
+ if (enableWrite) {
+ parent.enableWrite(this);
+ }
+ return accepted;
+ }
+
+ public boolean postPacket(Packet packet) {
+ return postPacket(packet, null);
+ }
+
+ public Connection connect() {
+ if (spec == null || spec.malformed()) {
+ setLostReason(new IllegalArgumentException("jrt: malformed or missing spec"));
+ return this;
+ }
+ try {
+ channel = SocketChannel.open(spec.address());
+ } catch (Exception e) {
+ setLostReason(e);
+ }
+ return this;
+ }
+
+ public boolean init(Selector selector) {
+ if (channel == null) {
+ return false;
+ }
+ try {
+ channel.configureBlocking(false);
+ channel.socket().setTcpNoDelay(true);
+ selectionKey = channel.register(selector,
+ SelectionKey.OP_READ,
+ this);
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Error initializing connection", e);
+ setLostReason(e);
+ return false;
+ }
+ setState(CONNECTED);
+ return true;
+ }
+
+ public void enableWrite() {
+ selectionKey.interestOps(selectionKey.interestOps()
+ | SelectionKey.OP_WRITE);
+ }
+
+ public void disableWrite() {
+ selectionKey.interestOps(selectionKey.interestOps()
+ & ~SelectionKey.OP_WRITE);
+ }
+
+ public void read() throws IOException {
+ boolean doneRead = false;
+ for (int i = 0; !doneRead && i < READ_REDO; i++) {
+ ByteBuffer wb = input.getChannelWritable(READ_SIZE);
+ if (channel.read(wb) == -1) {
+ throw new IOException("jrt: Connection closed by peer");
+ }
+ doneRead = (wb.remaining() > 0);
+ ByteBuffer rb = input.getReadable();
+ while (true) {
+ PacketInfo info = PacketInfo.getPacketInfo(rb);
+ if (info == null || info.packetLength() > rb.remaining()) {
+ break;
+ }
+ owner.readPacket(info);
+ Packet packet;
+ try {
+ packet = info.decodePacket(rb);
+ } catch (RuntimeException e) {
+ log.log(Level.WARNING, "got garbage; closing connection: " + toString());
+ throw new IOException("jrt: decode error", e);
+ }
+ ReplyHandler handler;
+ synchronized (this) {
+ handler = replyMap.remove(packet.requestId());
+ }
+ if (handler != null) {
+ handler.handleReply(packet);
+ } else {
+ owner.handlePacket(this, packet);
+ }
+ }
+ }
+ if (maxInputSize > 0) {
+ input.shrink(maxInputSize);
+ }
+ }
+
+ public void write() throws IOException {
+ synchronized (this) {
+ queue.flush(myQueue);
+ }
+ for (int i = 0; i < WRITE_REDO; i++) {
+ while (output.bytes() < WRITE_SIZE) {
+ Packet packet = (Packet) myQueue.dequeue();
+ if (packet == null) {
+ break;
+ }
+ PacketInfo info = packet.getPacketInfo();
+ ByteBuffer wb = output.getWritable(info.packetLength());
+ owner.writePacket(info);
+ info.encodePacket(packet, wb);
+ }
+ ByteBuffer rb = output.getChannelReadable();
+ if (rb.remaining() == 0) {
+ break;
+ }
+ channel.write(rb);
+ if (rb.remaining() > 0) {
+ break;
+ }
+ }
+ boolean disableWrite;
+ synchronized (this) {
+ writeWork = queue.size()
+ + myQueue.size()
+ + ((output.bytes() > 0) ? 1 : 0);
+ disableWrite = (writeWork == 0);
+ }
+ if (disableWrite) {
+ disableWrite();
+ }
+ if (maxOutputSize > 0) {
+ output.shrink(maxOutputSize);
+ }
+ }
+
+ public void fini() {
+ setState(CLOSED);
+ if (selectionKey != null) {
+ selectionKey.cancel();
+ }
+ }
+
+ public boolean isClosed() {
+ return (state == CLOSED);
+ }
+
+ public boolean hasSocket() {
+ return (channel != null);
+ }
+
+ public void closeSocket() {
+ if (channel != null) {
+ try {
+ channel.socket().close();
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Error closing connection", e);
+ }
+ }
+ }
+
+ public void setLostReason(Exception e) {
+ if (lostReason == null) {
+ lostReason = e;
+ }
+ }
+
+ public TieBreaker startRequest() {
+ synchronized (this) {
+ activeReqs++;
+ }
+ return new TieBreaker();
+ }
+
+ public boolean completeRequest(TieBreaker done) {
+ boolean signalFini = false;
+ synchronized (this) {
+ if (!done.first()) {
+ return false;
+ }
+ if (--activeReqs == 0 && state == CLOSED) {
+ signalFini = true;
+ }
+ }
+ if (signalFini) {
+ owner.sessionFini(this);
+ }
+ return true;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Methods defined in the Target superclass
+ ///////////////////////////////////////////////////////////////////////////
+
+ public boolean isValid() {
+ return (state != CLOSED);
+ }
+
+ public Exception getConnectionLostReason() {
+ return lostReason;
+ }
+
+ public boolean isClient() {
+ return !server;
+ }
+
+ public boolean isServer() {
+ return server;
+ }
+
+ public void invokeSync(Request req, double timeout) {
+ SingleRequestWaiter waiter = new SingleRequestWaiter();
+ invokeAsync(req, timeout, waiter);
+ waiter.waitDone();
+ }
+
+ public void invokeAsync(Request req, double timeout,
+ RequestWaiter waiter) {
+ if (timeout < 0.0) {
+ timeout = 0.0;
+ }
+ new InvocationClient(this, req, timeout, waiter).invoke();
+ }
+
+ public boolean invokeVoid(Request req) {
+ return postPacket(new RequestPacket(Packet.FLAG_NOREPLY,
+ allocateKey(),
+ req.methodName(),
+ req.parameters()));
+ }
+
+ public synchronized boolean addWatcher(TargetWatcher watcher) {
+ if (state == CLOSED) {
+ return false;
+ }
+ watchers.put(watcher, watcher);
+ return true;
+ }
+
+ public synchronized boolean removeWatcher(TargetWatcher watcher) {
+ if (state == CLOSED) {
+ return false;
+ }
+ watchers.remove(watcher);
+ return true;
+ }
+
+ public void close() {
+ parent.closeConnection(this);
+ }
+
+ public String toString() {
+ if (channel != null) {
+ return "Connection { " + channel.socket() + " }";
+ }
+ return "Connection { no socket }";
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
new file mode 100644
index 00000000000..fa43710b1f6
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -0,0 +1,76 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class Connector {
+
+ private class Run implements Runnable {
+ public void run() {
+ try {
+ Connector.this.run();
+ } catch (Throwable problem) {
+ parent.handleFailure(problem, Connector.this);
+ }
+ }
+ }
+
+ private Thread thread = new Thread(new Run(), "<connector>");
+ private Transport parent;
+ private ThreadQueue connectQueue = new ThreadQueue();
+ private boolean done = false;
+ private boolean exit = false;
+
+ public Connector(Transport parent) {
+ this.parent = parent;
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void connectLater(Connection c) {
+ if (!connectQueue.enqueue(c)) {
+ parent.addConnection(c);
+ }
+ }
+
+ private void run() {
+ try {
+ while (true) {
+ Connection conn = (Connection) connectQueue.dequeue();
+ parent.addConnection(conn.connect());
+ }
+ } catch (EndOfQueueException e) {}
+ synchronized (this) {
+ done = true;
+ notifyAll();
+ while (!exit) {
+ try { wait(); } catch (InterruptedException x) {}
+ }
+ }
+ }
+
+ public Connector shutdown() {
+ connectQueue.close();
+ return this;
+ }
+
+ public synchronized void waitDone() {
+ while (!done) {
+ try { wait(); } catch (InterruptedException x) {}
+ }
+ }
+
+ public synchronized Connector exit() {
+ exit = true;
+ notifyAll();
+ return this;
+ }
+
+ public void join() {
+ while (true) {
+ try {
+ thread.join();
+ return;
+ } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/DataArray.java b/jrt/src/com/yahoo/jrt/DataArray.java
new file mode 100644
index 00000000000..8871b5df6da
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/DataArray.java
@@ -0,0 +1,64 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * Data array (an array of byte sequences)
+ **/
+public class DataArray extends Value
+{
+ private byte[][] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public DataArray(byte[][] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ DataArray(ByteBuffer src) {
+ int size = src.getInt();
+ value = new byte[size][];
+ for (int i = 0; i < size; i++) {
+ value[i] = new byte[src.getInt()];
+ src.get(value[i]);
+ }
+ }
+
+ /**
+ * @return DATA_ARRAY
+ **/
+ public byte type() { return DATA_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() {
+ int bytes = 4;
+ for (int i = 0; i < value.length; i++) {
+ bytes += 4 + value[i].length;
+ }
+ return bytes;
+ }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ for (int i = 0; i < value.length; i++) {
+ dst.putInt(value[i].length);
+ dst.put(value[i]);
+ }
+ }
+
+ public byte[][] asDataArray() { return value; }
+
+ public @Override String toString() {
+ return Arrays.toString(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/DataValue.java b/jrt/src/com/yahoo/jrt/DataValue.java
new file mode 100644
index 00000000000..284bf2e6a2d
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/DataValue.java
@@ -0,0 +1,51 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Data value (a sequence of bytes)
+ **/
+public class DataValue extends Value
+{
+ private byte[] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public DataValue(byte[] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ DataValue(ByteBuffer src) {
+ int size = src.getInt();
+ value = new byte[size];
+ src.get(value);
+ }
+
+ /**
+ * @return DATA
+ **/
+ public byte type() { return DATA; }
+ public int count() { return 1; }
+
+ int bytes() { return 4 + value.length; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ dst.put(value);
+ }
+
+ public byte[] asData() { return value; }
+
+ public @Override String toString() {
+ return String.valueOf(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/DoubleArray.java b/jrt/src/com/yahoo/jrt/DoubleArray.java
new file mode 100644
index 00000000000..3758abe9873
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/DoubleArray.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 64-bit floating-point array
+ **/
+public class DoubleArray extends Value
+{
+ private double[] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public DoubleArray(double[] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ DoubleArray(ByteBuffer src) {
+ int size = src.getInt();
+ value = new double[size];
+ src.asDoubleBuffer().get(value);
+ src.position(src.position() + size * 8);
+ }
+
+ /**
+ * @return DOUBLE_ARRAY
+ **/
+ public byte type() { return DOUBLE_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() { return 4 + value.length * 8; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ dst.asDoubleBuffer().put(value);
+ dst.position(dst.position() + value.length * 8);
+ }
+
+ public double[] asDoubleArray() { return value; }
+
+ public @Override String toString() {
+ return Arrays.toString(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/DoubleValue.java b/jrt/src/com/yahoo/jrt/DoubleValue.java
new file mode 100644
index 00000000000..e7739e80150
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/DoubleValue.java
@@ -0,0 +1,48 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * 64-bit floating-point value
+ **/
+public class DoubleValue extends Value
+{
+ private double value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public DoubleValue(double value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ DoubleValue(ByteBuffer src) {
+ value = src.getDouble();
+ }
+
+ /**
+ * @return DOUBLE
+ **/
+ public byte type() { return DOUBLE; }
+ public int count() { return 1; }
+
+ int bytes() { return 8; }
+ void encode(ByteBuffer dst) {
+ dst.putDouble(value);
+ }
+
+ public double asDouble() { return value; }
+
+ public @Override String toString() {
+ return String.valueOf(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/EndOfQueueException.java b/jrt/src/com/yahoo/jrt/EndOfQueueException.java
new file mode 100644
index 00000000000..67ba53fd5d2
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/EndOfQueueException.java
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Checked exception thrown when someone tries to dequeue an object
+ * from a closed and empty {@link ThreadQueue}.
+ **/
+class EndOfQueueException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Create a EndOfQueueException.
+ **/
+ EndOfQueueException() {
+ super();
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/ErrorCode.java b/jrt/src/com/yahoo/jrt/ErrorCode.java
new file mode 100644
index 00000000000..38c60bbd6fd
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/ErrorCode.java
@@ -0,0 +1,52 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * This class contains the error codes defined by the RPC
+ * protocol. The error code associated with a request is obtained
+ * through the {@link Request#errorCode} method. Note that according to
+ * the RPC protocol, applications may define custom error codes with
+ * values 65536 (0x10000) and greater.
+ **/
+public class ErrorCode
+{
+ /** No error (0) **/
+ public static final int NONE = 0;
+
+ /** General error (100) **/
+ public static final int GENERAL_ERROR = 100;
+
+ /** Not implemented (101) **/
+ public static final int NOT_IMPLEMENTED = 101;
+
+ /** Invocation aborted (102) **/
+ public static final int ABORT = 102;
+
+ /** Invocation timed out (103) **/
+ public static final int TIMEOUT = 103;
+
+ /** Connection error (104) **/
+ public static final int CONNECTION = 104;
+
+ /** Bad request packet (105) **/
+ public static final int BAD_REQUEST = 105;
+
+ /** No such method (106) **/
+ public static final int NO_SUCH_METHOD = 106;
+
+ /** Illegal parameters (107) **/
+ public static final int WRONG_PARAMS = 107;
+
+ /** Request dropped due to server overload (108) **/
+ public static final int OVERLOAD = 108;
+
+ /** Illegal return values (109) **/
+ public static final int WRONG_RETURN = 109;
+
+ /** Bad reply packet (110) **/
+ public static final int BAD_REPLY = 110;
+
+ /** Method failed (111) **/
+ public static final int METHOD_FAILED = 111;
+}
diff --git a/jrt/src/com/yahoo/jrt/ErrorPacket.java b/jrt/src/com/yahoo/jrt/ErrorPacket.java
new file mode 100644
index 00000000000..fa0ac835826
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/ErrorPacket.java
@@ -0,0 +1,52 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+class ErrorPacket extends Packet
+{
+ private int errorCode;
+ private StringValue errorMessage;
+
+ public ErrorPacket(int flags, int reqId,
+ int errorCode,
+ String errorMessage)
+ {
+ super(flags, reqId);
+ this.errorCode = errorCode;
+ this.errorMessage = new StringValue(errorMessage);
+ }
+
+ public ErrorPacket(int flags, int reqId,
+ ByteBuffer src)
+ {
+ super(flags, reqId);
+ errorCode = src.getInt();
+ errorMessage = new StringValue(src);
+ }
+
+ public int bytes() {
+ return (headerLength +
+ 4 +
+ errorMessage.bytes());
+ }
+
+ public int packetCode() {
+ return PCODE_ERROR;
+ }
+
+ public void encode(ByteBuffer dst) {
+ dst.putInt(errorCode);
+ errorMessage.encode(dst);
+ }
+
+ public int errorCode() {
+ return errorCode;
+ }
+
+ public String errorMessage() {
+ return errorMessage.asString();
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/FatalErrorHandler.java b/jrt/src/com/yahoo/jrt/FatalErrorHandler.java
new file mode 100644
index 00000000000..e3a17ac0151
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/FatalErrorHandler.java
@@ -0,0 +1,21 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Interface used to report fatal errors (internal thread
+ * unwinding). If the application wants to be notified of these
+ * errors, it must register a handler with the {@link Transport}
+ * constructor.
+ **/
+public interface FatalErrorHandler {
+
+ /**
+ * Invoked when an internal thread crashes due to thread
+ * unwinding.
+ *
+ * @param problem the throwable causing the problem
+ * @param context the object owning the crashed thread
+ **/
+ public void handleFailure(Throwable problem, Object context);
+}
diff --git a/jrt/src/com/yahoo/jrt/FloatArray.java b/jrt/src/com/yahoo/jrt/FloatArray.java
new file mode 100644
index 00000000000..05556e94276
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/FloatArray.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 32-bit floating-point array
+ **/
+public class FloatArray extends Value
+{
+ private float[] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public FloatArray(float[] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ FloatArray(ByteBuffer src) {
+ int size = src.getInt();
+ value = new float[size];
+ src.asFloatBuffer().get(value);
+ src.position(src.position() + size * 4);
+ }
+
+ /**
+ * @return FLOAT_ARRAY
+ **/
+ public byte type() { return FLOAT_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() { return 4 + value.length * 4; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ dst.asFloatBuffer().put(value);
+ dst.position(dst.position() + value.length * 4);
+ }
+
+ public float[] asFloatArray() { return value; }
+
+ public @Override String toString() {
+ return Arrays.toString(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/FloatValue.java b/jrt/src/com/yahoo/jrt/FloatValue.java
new file mode 100644
index 00000000000..bfba69f0965
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/FloatValue.java
@@ -0,0 +1,49 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 32-bit floating-point value
+ **/
+public class FloatValue extends Value
+{
+ private float value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public FloatValue(float value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ FloatValue(ByteBuffer src) {
+ value = src.getFloat();
+ }
+
+ /**
+ * @return FLOAT
+ **/
+ public byte type() { return FLOAT; }
+ public int count() { return 1; }
+
+ int bytes() { return 4; }
+ void encode(ByteBuffer dst) {
+ dst.putFloat(value);
+ }
+
+ public float asFloat() { return value; }
+
+ public @Override String toString() {
+ return String.valueOf(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int16Array.java b/jrt/src/com/yahoo/jrt/Int16Array.java
new file mode 100644
index 00000000000..fbb7f304872
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int16Array.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 16-bit integer array
+ **/
+public class Int16Array extends Value
+{
+ private short[] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int16Array(short[] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int16Array(ByteBuffer src) {
+ int size = src.getInt();
+ value = new short[size];
+ src.asShortBuffer().get(value);
+ src.position(src.position() + size * 2);
+ }
+
+ /**
+ * @return INT16_ARRAY
+ **/
+ public byte type() { return INT16_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() { return 4 + value.length * 2; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ dst.asShortBuffer().put(value);
+ dst.position(dst.position() + value.length * 2);
+ }
+
+ public short[] asInt16Array() { return value; }
+
+ public @Override String toString() {
+ return Arrays.toString(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int16Value.java b/jrt/src/com/yahoo/jrt/Int16Value.java
new file mode 100644
index 00000000000..45c10e09f76
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int16Value.java
@@ -0,0 +1,49 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 16-bit integer value
+ **/
+public class Int16Value extends Value
+{
+ private short value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int16Value(short value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int16Value(ByteBuffer src) {
+ value = src.getShort();
+ }
+
+ /**
+ * @return INT16
+ **/
+ public byte type() { return INT16; }
+ public int count() { return 1; }
+
+ int bytes() { return 2; }
+ void encode(ByteBuffer dst) {
+ dst.putShort(value);
+ }
+
+ public short asInt16() { return value; }
+
+ public @Override String toString() {
+ return String.valueOf(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int32Array.java b/jrt/src/com/yahoo/jrt/Int32Array.java
new file mode 100644
index 00000000000..dddcaf83ddc
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int32Array.java
@@ -0,0 +1,55 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 32-bit integer array
+ **/
+public class Int32Array extends Value
+{
+ private int[] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int32Array(int[] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int32Array(ByteBuffer src) {
+ int size = src.getInt();
+ value = new int[size];
+ src.asIntBuffer().get(value);
+ src.position(src.position() + size * 4);
+ }
+
+ /**
+ * @return INT32_ARRAY
+ **/
+ public byte type() { return INT32_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() { return 4 + value.length * 4; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ dst.asIntBuffer().put(value);
+ dst.position(dst.position() + value.length * 4);
+ }
+
+ public int[] asInt32Array() { return value; }
+
+ public @Override String toString() {
+ return Arrays.toString(value);
+ }
+
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int32Value.java b/jrt/src/com/yahoo/jrt/Int32Value.java
new file mode 100644
index 00000000000..0a57014431a
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int32Value.java
@@ -0,0 +1,48 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * 32-bit integer value
+ **/
+public class Int32Value extends Value
+{
+ private int value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int32Value(int value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int32Value(ByteBuffer src) {
+ value = src.getInt();
+ }
+
+ /**
+ * @return INT32
+ **/
+ public byte type() { return INT32; }
+ public int count() { return 1; }
+
+ int bytes() { return 4; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value);
+ }
+
+ public int asInt32() { return value; }
+
+ public @Override String toString() {
+ return String.valueOf(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int64Array.java b/jrt/src/com/yahoo/jrt/Int64Array.java
new file mode 100644
index 00000000000..00427b9eff4
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int64Array.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 64-bit integer array
+ **/
+public class Int64Array extends Value
+{
+ private long[] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int64Array(long[] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int64Array(ByteBuffer src) {
+ int size = src.getInt();
+ value = new long[size];
+ src.asLongBuffer().get(value);
+ src.position(src.position() + size * 8);
+ }
+
+ /**
+ * @return INT64_ARRAY
+ **/
+ public byte type() { return INT64_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() { return 4 + value.length * 8; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ dst.asLongBuffer().put(value);
+ dst.position(dst.position() + value.length * 8);
+ }
+
+ public long[] asInt64Array() { return value; }
+
+ public @Override String toString() {
+ return Arrays.toString(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int64Value.java b/jrt/src/com/yahoo/jrt/Int64Value.java
new file mode 100644
index 00000000000..27c5cd5ff45
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int64Value.java
@@ -0,0 +1,49 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * 64-bit integer value
+ **/
+public class Int64Value extends Value
+{
+ private long value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int64Value(long value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int64Value(ByteBuffer src) {
+ value = src.getLong();
+ }
+
+ /**
+ * @return INT64
+ **/
+ public byte type() { return INT64; }
+ public int count() { return 1; }
+
+ int bytes() { return 8; }
+ void encode(ByteBuffer dst) {
+ dst.putLong(value);
+ }
+
+ public long asInt64() { return value; }
+
+ public @Override String toString() {
+ return String.valueOf(value);
+ }
+
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int8Array.java b/jrt/src/com/yahoo/jrt/Int8Array.java
new file mode 100644
index 00000000000..4dc18a75d8e
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int8Array.java
@@ -0,0 +1,52 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * 8-bit integer array
+ **/
+public class Int8Array extends Value
+{
+ private byte[] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int8Array(byte[] value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int8Array(ByteBuffer src) {
+ int size = src.getInt();
+ value = new byte[size];
+ src.get(value);
+ }
+
+ /**
+ * @return INT8_ARRAY
+ **/
+ public byte type() { return INT8_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() { return 4 + value.length; }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ dst.put(value);
+ }
+
+ public byte[] asInt8Array() { return value; }
+
+ public @Override String toString() {
+ return Arrays.toString(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Int8Value.java b/jrt/src/com/yahoo/jrt/Int8Value.java
new file mode 100644
index 00000000000..e62460f4bd8
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Int8Value.java
@@ -0,0 +1,48 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * 8-bit integer value
+ **/
+public class Int8Value extends Value
+{
+ private byte value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public Int8Value(byte value) { this.value = value; }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ Int8Value(ByteBuffer src) {
+ value = src.get();
+ }
+
+ /**
+ * @return INT8
+ **/
+ public byte type() { return INT8; }
+ public int count() { return 1; }
+
+ int bytes() { return 1; }
+ void encode(ByteBuffer dst) {
+ dst.put(value);
+ }
+
+ public byte asInt8() { return value; }
+
+ public @Override String toString() {
+ return String.valueOf(value);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/InvocationClient.java b/jrt/src/com/yahoo/jrt/InvocationClient.java
new file mode 100644
index 00000000000..3d75148793d
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/InvocationClient.java
@@ -0,0 +1,95 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class InvocationClient implements ReplyHandler, Runnable {
+
+ Connection conn;
+ Request req;
+ double timeout;
+ RequestWaiter reqWaiter;
+ Integer replyKey;
+ Task timeoutTask;
+
+ public InvocationClient(Connection conn, Request req,
+ double timeout, RequestWaiter waiter) {
+
+ this.conn = conn;
+ this.req = req;
+ this.timeout = timeout;
+ this.reqWaiter = waiter;
+ req.clientHandler(this);
+
+ this.replyKey = conn.allocateKey();
+ this.timeoutTask = conn.transport().createTask(this);
+ }
+
+ public void invoke() {
+ if (!conn.postPacket(new RequestPacket(0,
+ replyKey.intValue(),
+ req.methodName(),
+ req.parameters()), this)) {
+ req.setError(ErrorCode.CONNECTION, "Connection error");
+ reqWaiter.handleRequestDone(req);
+ return;
+ }
+ timeoutTask.schedule(timeout);
+ }
+
+ public Integer key() {
+ return replyKey;
+ }
+
+ /**
+ * Handle normal packet reply. The reply may contain either return
+ * values or an error.
+ **/
+ public void handleReply(Packet packet) {
+ timeoutTask.kill();
+ if (packet == null) {
+ req.setError(ErrorCode.BAD_REPLY, "Bad reply packet");
+ } else {
+ int pcode = packet.packetCode();
+ if (pcode == Packet.PCODE_REPLY) {
+ ReplyPacket rp = (ReplyPacket) packet;
+ req.returnValues(rp.returnValues());
+ } else if (pcode == Packet.PCODE_ERROR) {
+ ErrorPacket ep = (ErrorPacket) packet;
+ req.setError(ep.errorCode(), ep.errorMessage());
+ }
+ }
+ reqWaiter.handleRequestDone(req);
+ }
+
+ /**
+ * Handle user abort.
+ **/
+ public void handleAbort() {
+ if (!conn.cancelReply(this)) {
+ return;
+ }
+ timeoutTask.kill();
+ req.setError(ErrorCode.ABORT, "Aborted by user");
+ reqWaiter.handleRequestDone(req);
+ }
+
+ /**
+ * Handle connection down.
+ **/
+ public void handleConnectionDown() {
+ timeoutTask.kill();
+ req.setError(ErrorCode.CONNECTION, "Connection error");
+ reqWaiter.handleRequestDone(req);
+ }
+
+ /**
+ * Handle timeout.
+ **/
+ public void run() {
+ if (!conn.cancelReply(this)) {
+ return;
+ }
+ req.setError(ErrorCode.TIMEOUT, "Request timed out");
+ reqWaiter.handleRequestDone(req);
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/InvocationServer.java b/jrt/src/com/yahoo/jrt/InvocationServer.java
new file mode 100644
index 00000000000..5f39bdebe97
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/InvocationServer.java
@@ -0,0 +1,65 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class InvocationServer {
+
+ private Connection conn;
+ private Request request;
+ private Method method;
+ private int replyKey;
+ private boolean noReply;
+ private TieBreaker done;
+
+ public InvocationServer(Connection conn, Request request, Method method,
+ int replyKey, boolean noReply) {
+
+ this.conn = conn;
+ this.request = request;
+ this.method = method;
+ this.replyKey = replyKey;
+ this.noReply = noReply;
+ request.serverHandler(this);
+
+ done = conn.startRequest();
+ }
+
+ public Target getTarget() {
+ return conn;
+ }
+
+ public void invoke() {
+ if (method != null) {
+ if (method.checkParameters(request)) {
+ method.invoke(request);
+ } else {
+ request.setError(ErrorCode.WRONG_PARAMS, "Parameters in " + request + " does not match " + method);
+ }
+ } else {
+ request.setError(ErrorCode.NO_SUCH_METHOD, "No such method");
+ }
+ if (!request.isDetached()) {
+ returnRequest();
+ }
+ }
+
+ public void returnRequest() {
+ if (!conn.completeRequest(done)) {
+ throw new IllegalStateException("Request already returned");
+ }
+ if (noReply) {
+ return;
+ }
+ if (!request.isError() && !method.checkReturnValues(request)) {
+ request.setError(ErrorCode.WRONG_RETURN, "Return values in " + request + " does not match " + method);
+ }
+ if (request.isError()) {
+ conn.postPacket(new ErrorPacket(0, replyKey,
+ request.errorCode(),
+ request.errorMessage()));
+ } else {
+ conn.postPacket(new ReplyPacket(0, replyKey,
+ request.returnValues()));
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/InvokeProxy.java b/jrt/src/com/yahoo/jrt/InvokeProxy.java
new file mode 100644
index 00000000000..944871e2ec0
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/InvokeProxy.java
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+/**
+ * Invokes asynchronous JRT requests in a blocking method, that can be aborted by calling shutdown().
+ * This can be used when one would like to use invokeSync(), but when abortion is required.
+ * <p>
+ * This class is thread safe.
+ *
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ */
+//@ThreadSafe
+public class InvokeProxy {
+
+ private boolean active = true;
+ private Request pending = null;
+
+ public Request invoke(Request req, Target target, double timeout) {
+ SingleRequestWaiter waiter = null;
+ synchronized (this) {
+ if (active) {
+ waiter = new SingleRequestWaiter();
+ target.invokeAsync(req, timeout, waiter);
+ pending = req;
+ } else {
+ req.setError(ErrorCode.ABORT, "Aborted by user");
+ }
+ }
+ if (waiter != null) {
+ waiter.waitDone();
+ synchronized (this) {
+ pending = null;
+ }
+ }
+ return req;
+ }
+
+ public void shutdown() {
+ synchronized (this) {
+ active = false;
+ if (pending != null) {
+ pending.abort();
+ }
+ }
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/ListenFailedException.java b/jrt/src/com/yahoo/jrt/ListenFailedException.java
new file mode 100644
index 00000000000..098b9761a00
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/ListenFailedException.java
@@ -0,0 +1,31 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Checked exception thrown when listening fails.
+ * @see Supervisor#listen
+ **/
+public class ListenFailedException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Create a ListenFailedException with the given message.
+ *
+ * @param msg the exception message
+ **/
+ ListenFailedException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Create a ListenFailedException with the given message and cause.
+ *
+ * @param msg the exception message
+ * @param cause what caused this exception
+ **/
+ ListenFailedException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/MandatoryMethods.java b/jrt/src/com/yahoo/jrt/MandatoryMethods.java
new file mode 100644
index 00000000000..f700a81729c
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/MandatoryMethods.java
@@ -0,0 +1,98 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.util.Iterator;
+
+
+class MandatoryMethods {
+
+ Supervisor parent;
+
+ public MandatoryMethods(Supervisor parent) {
+ this.parent = parent;
+ //---------------------------------------------------------------------
+ Method m;
+ //---------------------------------------------------------------------
+ m = new Method("frt.rpc.ping", "", "", this, "ping");
+ m.methodDesc("Method that may be used to "
+ + "check if the server is online");
+ parent.addMethod(m);
+ //---------------------------------------------------------------------
+ m = new Method("frt.rpc.getMethodList", "", "SSS", this,
+ "getMethodList");
+ m.methodDesc("Obtain a list of all available methods");
+ m.returnDesc(0, "names", "Method names");
+ m.returnDesc(1, "params", "Method parameter types");
+ m.returnDesc(2, "return", "Method return types");
+ parent.addMethod(m);
+ //---------------------------------------------------------------------
+ m = new Method("frt.rpc.getMethodInfo", "s", "sssSSSS", this,
+ "getMethodInfo");
+ m.methodDesc("Obtain detailed information about a single method");
+ m.paramDesc (0, "methodName", "The method we want information about");
+ m.returnDesc(0, "desc", "Description of what the method does");
+ m.returnDesc(1, "params", "Method parameter types");
+ m.returnDesc(2, "return", "Method return values");
+ m.returnDesc(3, "paramNames", "Method parameter names");
+ m.returnDesc(4, "paramDesc", "Method parameter descriptions");
+ m.returnDesc(5, "returnNames", "Method return value names");
+ m.returnDesc(6, "returnDesc", "Method return value descriptions");
+ parent.addMethod(m);
+ //---------------------------------------------------------------------
+ }
+
+ public void ping(Request req) {
+ // no code needed :)
+ }
+
+ public void getMethodList(Request req) {
+ int cnt = parent.methodMap().size();
+ String[] ret0_names = new String[cnt];
+ String[] ret1_params = new String[cnt];
+ String[] ret2_return = new String[cnt];
+
+ int i = 0;
+ Iterator<Method> itr = parent.methodMap().values().iterator();
+ while (itr.hasNext()) {
+ Method m = itr.next();
+ ret0_names[i] = m.name();
+ ret1_params[i] = m.paramTypes();
+ ret2_return[i] = m.returnTypes();
+ i++;
+ }
+ req.returnValues().add(new StringArray(ret0_names));
+ req.returnValues().add(new StringArray(ret1_params));
+ req.returnValues().add(new StringArray(ret2_return));
+ }
+
+ public void getMethodInfo(Request req) {
+ Method method = parent.methodMap().get(req.parameters().get(0).asString());
+ if (method == null) {
+ req.setError(ErrorCode.METHOD_FAILED, "No Such Method");
+ return;
+ }
+ req.returnValues().add(new StringValue(method.methodDesc()));
+ req.returnValues().add(new StringValue(method.paramTypes()));
+ req.returnValues().add(new StringValue(method.returnTypes()));
+
+ int paramCnt = method.paramTypes().length();
+ int returnCnt = method.returnTypes().length();
+ String[] ret3_paramName = new String[paramCnt];
+ String[] ret4_paramDesc = new String[paramCnt];
+ String[] ret5_returnName = new String[returnCnt];
+ String[] ret6_returnDesc = new String[returnCnt];
+ for (int i = 0; i < paramCnt; i++) {
+ ret3_paramName[i] = method.paramName(i);
+ ret4_paramDesc[i] = method.paramDesc(i);
+ }
+ for (int i = 0; i < returnCnt; i++) {
+ ret5_returnName[i] = method.returnName(i);
+ ret6_returnDesc[i] = method.returnDesc(i);
+ }
+ req.returnValues().add(new StringArray(ret3_paramName));
+ req.returnValues().add(new StringArray(ret4_paramDesc));
+ req.returnValues().add(new StringArray(ret5_returnName));
+ req.returnValues().add(new StringArray(ret6_returnDesc));
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Method.java b/jrt/src/com/yahoo/jrt/Method.java
new file mode 100644
index 00000000000..d752e33b0d8
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Method.java
@@ -0,0 +1,290 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * <p>A Method encapsulates the reflective information about a single RPC
+ * method.</p>
+ *
+ * <p>Method parameters and return values are declared with type
+ * strings. A <i>type string</i> denotes the concatenation of type
+ * identifiers where a string is used to represent a sequence of
+ * types. For example 'ii' is the type string for two 32-bit integers,
+ * while 'iss' is the type string for a single 32-bit integer followed
+ * by two strings. The complete list of type identifiers can be found
+ * in the {@link Value} class.</p>
+ *
+ * <p>The type strings associated with actual method parameters and
+ * return values may only contain valid type identifiers. However,
+ * when you specify the parameters accepted by- or returned from a RPC
+ * method via the Method constructor, '*' may be used as the last
+ * character in the type string. Ending a type string specification
+ * with '*' means that additional values are optional and may have any
+ * type. This feature can also be used with the {@link
+ * Request#checkReturnTypes Request.checkReturnTypes} method when
+ * verifying return types.</p>
+ *
+ * @see Supervisor#addMethod
+ **/
+public class Method {
+
+ private MethodHandler handler; // option 1: interface
+ private Object object; // option 2: reflection
+ private java.lang.reflect.Method method; // option 2: reflection
+
+ private String name;
+ private String paramTypes;
+ private String returnTypes;
+
+ private String desc;
+ private String[] paramName;
+ private String[] paramDesc;
+ private String[] returnName;
+ private String[] returnDesc;
+
+ private static final String undocumented = "???";
+ private static final Class<?>[] handlerParams = { Request.class };
+
+
+ private void init(String name, String paramTypes, String returnTypes) {
+ this.name = name;
+ this.paramTypes = paramTypes;
+ this.returnTypes = returnTypes;
+ desc = undocumented;
+ paramName = new String[this.paramTypes.length()];
+ paramDesc = new String[this.paramTypes.length()];
+ for (int i = 0; i < this.paramTypes.length(); i++) {
+ paramName[i] = undocumented;
+ paramDesc[i] = undocumented;
+ }
+ returnName = new String[this.returnTypes.length()];
+ returnDesc = new String[this.returnTypes.length()];
+ for (int i = 0; i < this.returnTypes.length(); i++) {
+ returnName[i] = undocumented;
+ returnDesc[i] = undocumented;
+ }
+ }
+
+ /**
+ * Create a new Method. The parameters define the name of the
+ * method, the parameter types, the return value types and also
+ * what method in which object should be invoked to perform the
+ * method. Please refer to the {@link Method} class description
+ * for an explanation of type strings.
+ *
+ * @param name method name
+ * @param paramTypes a type string defining the parameter types
+ * @param returnTypes a type string defining the return value types
+ * @param handler the object handling this RPC method
+ * @param handlerMethod the name of the method in the handler that
+ * should be used to handle invocations of this RPC method. This
+ * method must be public, return void and take a {@link Request}
+ * as its only parameter.
+ *
+ * @throws MethodCreateException if the handler method cannot be
+ * resolved.
+ **/
+ public Method(String name, String paramTypes, String returnTypes,
+ Object handler, String handlerMethod) {
+
+ this.handler = null;
+ this.object = handler;
+ try {
+ this.method = this.object.getClass().getMethod(handlerMethod,
+ handlerParams);
+ } catch (Exception e) {
+ throw new MethodCreateException("Method lookup failed", e);
+ }
+ init(name, paramTypes, returnTypes);
+ }
+
+ /**
+ * Create a new Method. The parameters define the name of the
+ * method, the parameter types, the return value types and also
+ * the handler for the method. Please refer to the {@link Method}
+ * class description for an explanation of type strings.
+ *
+ * @param name method name
+ * @param paramTypes a type string defining the parameter types
+ * @param returnTypes a type string defining the return value types
+ * @param handler the handler for this RPC method
+ *
+ * @throws MethodCreateException if the handler is <i>null</i>.
+ **/
+ public Method(String name, String paramTypes, String returnTypes,
+ MethodHandler handler) {
+
+ this.handler = handler;
+ this.object = null;
+ this.method = null;
+ if (this.handler == null) {
+ throw new MethodCreateException("Handler is null");
+ }
+ init(name, paramTypes, returnTypes);
+ }
+
+ /**
+ * Obtain the name of this method
+ *
+ * @return method name
+ **/
+ String name() {
+ return name;
+ }
+
+ /**
+ * Obtain the parameter types of this method
+ *
+ * @return parameter types
+ **/
+ String paramTypes() {
+ return paramTypes;
+ }
+
+ /**
+ * Obtain the return value types of this method
+ *
+ * @return return value types
+ **/
+ String returnTypes() {
+ return returnTypes;
+ }
+
+ /**
+ * Describe this method. This adds documentation that can be
+ * obtained through remote reflection.
+ *
+ * @return this Method, to allow chaining
+ **/
+ public Method methodDesc(String desc) {
+ this.desc = desc;
+ return this;
+ }
+
+ /**
+ * Obtain the method description
+ *
+ * @return method description
+ **/
+ String methodDesc() {
+ return desc;
+ }
+
+ /**
+ * Describe a parameter of this method. This adds documentation
+ * that can be obtained through remote reflection.
+ *
+ * @return this Method, to allow chaining
+ * @param index the parameter index
+ * @param name the parameter name
+ * @param desc the parameter description
+ **/
+ public Method paramDesc(int index, String name, String desc) {
+ paramName[index] = name;
+ paramDesc[index] = desc;
+ return this;
+ }
+
+ /**
+ * Obtain the name of a parameter
+ *
+ * @return parameter name
+ * @param index parameter index
+ **/
+ String paramName(int index) {
+ return paramName[index];
+ }
+
+ /**
+ * Obtain the description of a parameter
+ *
+ * @return parameter description
+ * @param index parameter index
+ **/
+ String paramDesc(int index) {
+ return paramDesc[index];
+ }
+
+ /**
+ * Describe a return value of this method. This adds documentation
+ * that can be obtained through remote reflection.
+ *
+ * @return this Method, to allow chaining
+ * @param index the return value index
+ * @param name the return value name
+ * @param desc the return value description
+ **/
+ public Method returnDesc(int index, String name, String desc) {
+ returnName[index] = name;
+ returnDesc[index] = desc;
+ return this;
+ }
+
+ /**
+ * Obtain the name of a return value
+ *
+ * @return return value name
+ * @param index return value index
+ **/
+ String returnName(int index) {
+ return returnName[index];
+ }
+
+ /**
+ * Obtain the description of a return value
+ *
+ * @return return value description
+ * @param index return value index
+ **/
+ String returnDesc(int index) {
+ return returnDesc[index];
+ }
+
+ /**
+ * Check whether the parameters of the given request satisfies the
+ * parameters of this method.
+ *
+ * @return true if the parameters of the given request satisfies
+ * the parameters of this method
+ * @param req a request
+ **/
+ boolean checkParameters(Request req) {
+ return req.parameters().satisfies(paramTypes);
+ }
+
+ /**
+ * Check whether the return values of the given request satisfies
+ * the return values of this method.
+ *
+ * @return true if the return values of the given request satisfies
+ * the return values of this method
+ * @param req a request
+ **/
+ boolean checkReturnValues(Request req) {
+ return req.returnValues().satisfies(returnTypes);
+ }
+
+ /**
+ * Invoke this method. The given request holds the parameters and
+ * will be given as parameter to the handler method.
+ *
+ * @param req the request causing this invocation
+ **/
+ void invoke(Request req) {
+ try {
+ if (handler != null) {
+ handler.invoke(req);
+ } else {
+ Object[] args = { req };
+ method.invoke(object, args);
+ }
+ } catch (Exception e) {
+ req.setError(ErrorCode.METHOD_FAILED, e.toString());
+ }
+ }
+
+ public @Override String toString() {
+ return "method " + name + "(" + paramTypes + ")" + ( returnTypes.length()>0 ? ": " + returnTypes : "");
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/MethodCreateException.java b/jrt/src/com/yahoo/jrt/MethodCreateException.java
new file mode 100644
index 00000000000..e598f0f3a54
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/MethodCreateException.java
@@ -0,0 +1,34 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Unchecked exception thrown when the {@link Method} constructor
+ * fails to resolve the method handler. The most usual reasons for
+ * this is that the method handler simply does not exist or that it
+ * has the wrong signature.
+ **/
+public class MethodCreateException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Create a MethodCreateException with the given message.
+ *
+ * @param msg the exception message
+ **/
+ MethodCreateException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Create a MethodCreateException with the given message and
+ * cause.
+ *
+ * @param msg the exception message
+ * @param cause what caused this exception
+ **/
+ MethodCreateException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/MethodHandler.java b/jrt/src/com/yahoo/jrt/MethodHandler.java
new file mode 100644
index 00000000000..5db43124280
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/MethodHandler.java
@@ -0,0 +1,21 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * <p>Interface used to handle the invocation of a method.</p>
+ *
+ * <p>The {@link Method} class is used to register rpc methods. There
+ * are two ways rpc methods can be defined(bound); with this interface
+ * or with reflection. This choice is reflected by the two different
+ * constructors in the {@link Method} class.</p>
+ **/
+public interface MethodHandler {
+
+ /**
+ * Method used to dispatch an rpc request.
+ *
+ * @param req the request
+ **/
+ public void invoke(Request req);
+}
diff --git a/jrt/src/com/yahoo/jrt/Packet.java b/jrt/src/com/yahoo/jrt/Packet.java
new file mode 100644
index 00000000000..3d50d5d0f97
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Packet.java
@@ -0,0 +1,50 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+abstract class Packet
+{
+ public static final int PCODE_REQUEST = 100;
+ public static final int PCODE_REPLY = 101;
+ public static final int PCODE_ERROR = 102;
+
+ public static final int FLAG_REVERSE = 0x1; // bit 0
+ public static final int FLAG_NOREPLY = 0x2; // bit 1
+
+ public static final int headerLength = 12;
+
+ public static boolean checkFlag(int flag, int flags) {
+ return (flags & flag) != 0;
+ }
+
+ private int flags;
+ private int requestId;
+
+ public Packet(int flags, int reqId) {
+ this.flags = flags;
+ this.requestId = reqId;
+ }
+
+ public int requestId() {
+ return requestId;
+ }
+
+ public boolean reverseByteOrder() {
+ return checkFlag(FLAG_REVERSE, flags);
+ }
+
+ public boolean noReply() {
+ return checkFlag(FLAG_NOREPLY, flags);
+ }
+
+ public abstract int bytes();
+ public abstract int packetCode();
+ public abstract void encode(ByteBuffer dst);
+
+ public PacketInfo getPacketInfo() {
+ return new PacketInfo(bytes(), flags, packetCode(), requestId);
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/PacketInfo.java b/jrt/src/com/yahoo/jrt/PacketInfo.java
new file mode 100644
index 00000000000..82502387402
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/PacketInfo.java
@@ -0,0 +1,109 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+class PacketInfo
+{
+ private int packetLength;
+ private int flags;
+ private int packetCode;
+ private int requestId;
+
+ private PacketInfo(ByteBuffer src) {
+ packetLength = src.getInt() + 4;
+ flags = src.getShort();
+ packetCode = src.getShort();
+ requestId = src.getInt();
+ }
+
+ PacketInfo(int plen, int flags, int pcode, int reqId) {
+ this.packetLength = plen;
+ this.flags = flags;
+ this.packetCode = pcode;
+ this.requestId = reqId;
+ }
+
+ public int packetLength() {
+ return packetLength;
+ }
+
+ public int flags() {
+ return flags;
+ }
+
+ public int packetCode() {
+ return packetCode;
+ }
+
+ public int requestId() {
+ return requestId;
+ }
+
+ public boolean reverseByteOrder() {
+ return Packet.checkFlag(Packet.FLAG_REVERSE, flags);
+ }
+
+ public boolean noReply() {
+ return Packet.checkFlag(Packet.FLAG_NOREPLY, flags);
+ }
+
+ public static PacketInfo getPacketInfo(ByteBuffer src) {
+ if (src.remaining() < Packet.headerLength) {
+ return null;
+ }
+ return new PacketInfo(src.slice());
+ }
+
+ public Packet decodePacket(ByteBuffer src) {
+ int pos = src.position();
+ int end = pos + packetLength;
+ int limit = src.limit();
+ try {
+ src.limit(end);
+ src.position(src.position() + Packet.headerLength);
+ if (reverseByteOrder()) {
+ src.order(ByteOrder.LITTLE_ENDIAN);
+ }
+ switch (packetCode) {
+ case Packet.PCODE_REQUEST:
+ return new RequestPacket(flags, requestId, src);
+ case Packet.PCODE_REPLY:
+ return new ReplyPacket(flags, requestId, src);
+ case Packet.PCODE_ERROR:
+ return new ErrorPacket(flags, requestId, src);
+ }
+ throw new IllegalArgumentException();
+ } finally {
+ src.order(ByteOrder.BIG_ENDIAN);
+ src.position(end);
+ src.limit(limit);
+ }
+ }
+
+ public void encodePacket(Packet packet, ByteBuffer dst) {
+ int pos = dst.position();
+ int end = pos + packetLength;
+ int limit = dst.limit();
+ try {
+ dst.limit(end);
+ dst.putInt(packetLength - 4);
+ dst.putShort((short)flags);
+ dst.putShort((short)packetCode);
+ dst.putInt(requestId);
+ if (reverseByteOrder()) {
+ dst.order(ByteOrder.LITTLE_ENDIAN);
+ }
+ packet.encode(dst);
+ } catch (RuntimeException e) {
+ dst.position(pos);
+ throw e;
+ } finally {
+ dst.order(ByteOrder.BIG_ENDIAN);
+ dst.limit(limit);
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Queue.java b/jrt/src/com/yahoo/jrt/Queue.java
new file mode 100644
index 00000000000..5feb62c2af1
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Queue.java
@@ -0,0 +1,132 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * A queue implementation that is not thread-safe. The implementation
+ * uses a growable circular array to hold the elements.
+ **/
+class Queue
+{
+ private Object[] buf;
+ private int used;
+ private int readPos;
+ private int writePos;
+
+ /**
+ * Ensure the queue has room for the specified number of
+ * additional elements.
+ *
+ * @param need space needed on queue
+ **/
+ private void ensureFree(int need) {
+ if (buf.length < used + need) {
+ int newSize = Math.max(buf.length, 8);
+ while (newSize < used + need) {
+ newSize *= 2;
+ }
+ Object[] newBuf = new Object[newSize];
+ for (int i = 0; i < used; i++) {
+ newBuf[i] = buf[readPos++];
+ if (readPos == buf.length) {
+ readPos = 0;
+ }
+ }
+ buf = newBuf;
+ readPos = 0;
+ writePos = used; // this cannot wrap
+ }
+ }
+
+ /**
+ * Create a queue. If more elements are put on the queue than can
+ * be held by the initial capacity, the underlying structures will
+ * be grown as needed.
+ *
+ * @param capacity initial queue capacity
+ **/
+ public Queue(int capacity) {
+ buf = new Object[capacity];
+ used = 0;
+ readPos = 0;
+ writePos = 0;
+ }
+
+ /**
+ * Create a queue with an initial capacity of 64.
+ **/
+ public Queue() {
+ this(64);
+ }
+
+ /**
+ * Enqueue an object on this queue.
+ *
+ * @param obj the object to enqueue
+ **/
+ public void enqueue(Object obj) {
+ ensureFree(1);
+ buf[writePos++] = obj;
+ if (writePos == buf.length) {
+ writePos = 0;
+ }
+ used++;
+ }
+
+ /**
+ * Dequeue the next object from this queue.
+ *
+ * @return the next object from the queue or 'null' if the queue
+ * is empty
+ **/
+ public Object dequeue() {
+ if (used == 0) {
+ return null;
+ }
+ Object obj = buf[readPos];
+ buf[readPos++] = null; // enable GC of dequeued object
+ if (readPos == buf.length) {
+ readPos = 0;
+ }
+ used--;
+ return obj;
+ }
+
+ /**
+ * @return whether this queue is empty
+ **/
+ public boolean isEmpty() {
+ return (used == 0);
+ }
+
+ /**
+ * @return the number of elements in this queue
+ **/
+ public int size() {
+ return used;
+ }
+
+ /**
+ * Flush all elements currently in this queue into another
+ * queue. Note that this will clear the queue.
+ *
+ * @return the number of elements flushed
+ **/
+ public int flush(Queue dst) {
+ int cnt = used;
+ dst.ensureFree(cnt);
+ for (int i = 0; i < used; i++) {
+ dst.buf[dst.writePos++] = buf[readPos];
+ buf[readPos++] = null; // enable GC of dequeued object
+ if (dst.writePos == dst.buf.length) {
+ dst.writePos = 0;
+ }
+ if (readPos == buf.length) {
+ readPos = 0;
+ }
+ }
+ dst.used += used;
+ used = 0;
+ return cnt;
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/ReplyHandler.java b/jrt/src/com/yahoo/jrt/ReplyHandler.java
new file mode 100644
index 00000000000..9c7214fb97c
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/ReplyHandler.java
@@ -0,0 +1,9 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+interface ReplyHandler {
+ public Integer key();
+ public void handleReply(Packet packet);
+ public void handleConnectionDown();
+}
diff --git a/jrt/src/com/yahoo/jrt/ReplyPacket.java b/jrt/src/com/yahoo/jrt/ReplyPacket.java
new file mode 100644
index 00000000000..158c901d1f2
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/ReplyPacket.java
@@ -0,0 +1,42 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+class ReplyPacket extends Packet
+{
+ private Values returnValues;
+
+ public ReplyPacket(int flags, int reqId,
+ Values returnValues)
+ {
+ super(flags, reqId);
+ this.returnValues = returnValues;
+ }
+
+ public ReplyPacket(int flags, int reqId,
+ ByteBuffer src)
+ {
+ super(flags, reqId);
+ returnValues = new Values(src);
+ }
+
+ public int bytes() {
+ return (headerLength +
+ returnValues.bytes());
+ }
+
+ public int packetCode() {
+ return PCODE_REPLY;
+ }
+
+ public void encode(ByteBuffer dst) {
+ returnValues.encode(dst);
+ }
+
+ public Values returnValues() {
+ return returnValues;
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Request.java b/jrt/src/com/yahoo/jrt/Request.java
new file mode 100644
index 00000000000..99d7df8657e
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Request.java
@@ -0,0 +1,281 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * A Request bundles information about a single RPC invocation. A
+ * Request contains the name of the method, the method parameters, the
+ * method return values and also error information if something went
+ * wrong. Request objects are used by both RPC clients and RPC
+ * servers. An RPC client is the one requesting the invocation. An RPC
+ * server is the one performing the invocation. The RPC client uses a
+ * {@link Target} to invoke the request. The RPC server registers a
+ * {@link Method} with the {@link Supervisor}. Note that RPC
+ * client/server roles are independent of connection client/server
+ * roles, since invocations can be performed both ways across a {@link
+ * Target}.
+ **/
+public class Request
+{
+ private String methodName;
+ private Values parameters;
+ private Values returnValues = new Values();
+ private int errorCode = 0;
+ private String errorMessage = null;
+ private boolean detached = false;
+ private Object context = null;
+
+ private InvocationServer serverHandler;
+ private InvocationClient clientHandler;
+
+ /**
+ * Create a Request from a method name and a set of
+ * parameters. This method is used internally on the server side
+ * to create Requests for incoming invocations.
+ *
+ * @param methodName method name
+ * @param parameters method parameters
+ **/
+ Request(String methodName, Values parameters) {
+ this.methodName = methodName;
+ this.parameters = parameters;
+ }
+
+ /**
+ * Set the client invocation helper object for this request
+ *
+ * @param handler helper object
+ **/
+ void clientHandler(InvocationClient handler) {
+ clientHandler = handler;
+ }
+
+ /**
+ * Set the server invocation helper object for this request
+ *
+ * @param handler helper object
+ **/
+ void serverHandler(InvocationServer handler) {
+ serverHandler = handler;
+ }
+
+ /**
+ * Create a new Request with the given method name.
+ *
+ * @param methodName name of the method you want to invoke
+ **/
+ public Request(String methodName) {
+ this(methodName, new Values());
+ }
+
+ /**
+ * Set the application context associated with this request.
+ *
+ * @param context application context
+ **/
+ public void setContext(Object context) {
+ this.context = context;
+ }
+
+ /**
+ * Obtain the application context associated with this request.
+ *
+ * @return application context
+ **/
+ public Object getContext() {
+ return context;
+ }
+
+ /**
+ * Obtain the method name
+ *
+ * @return method name
+ **/
+ public String methodName() {
+ return methodName;
+ }
+
+ /**
+ * Obtain the parameters
+ *
+ * @return request parameters
+ **/
+ public Values parameters() {
+ return parameters;
+ }
+
+ /**
+ * Set the return values for this request. Used internally on the
+ * client side to incorporate the server response into the
+ * request.
+ *
+ * @param returnValues return values
+ **/
+ void returnValues(Values returnValues) {
+ this.returnValues = returnValues;
+ }
+
+ /**
+ * Obtain the return values
+ *
+ * @return request return values
+ **/
+ public Values returnValues() {
+ return returnValues;
+ }
+
+ /**
+ * Create a new empty set of parameters for this request. The old
+ * set of parameters will still be valid, but will no longer be
+ * part of this request. This method may be used to allow earlier
+ * garbage collection of large parameters that are no longer
+ * needed. While the obvious use of this method is to get rid of
+ * parameters when being an RPC server it can also be used after
+ * starting an RPC request on a client.
+ **/
+ public void discardParameters() {
+ parameters = new Values();
+ }
+
+ /**
+ * Obtain the Target representing our end of the connection over
+ * which this request was invoked. This method may only be invoked
+ * during method handling (RPC server aspect).
+ *
+ * @return Target representing our end of the connection over
+ * which this request was invoked
+ * @throws IllegalStateException if invoked inappropriately
+ **/
+ public Target target() {
+ if (serverHandler == null) {
+ throw new IllegalStateException("No server handler registered");
+ }
+ return serverHandler.getTarget();
+ }
+
+ /**
+ * Abort a request. This method may only be called by the RPC
+ * client after an asynchronous method invocation was requested.
+ *
+ * @throws IllegalStateException if invoked inappropriately
+ **/
+ public void abort() {
+ if (clientHandler == null) {
+ throw new IllegalStateException("No client handler registered");
+ }
+ clientHandler.handleAbort();
+ }
+
+ /**
+ * Detach a method invocation. This method may only be invoked
+ * during method handling (RPC server aspect). If this method is
+ * invoked, the method is not returned when the method handler
+ * returns. Instead, the application must invoke the {@link
+ * #returnRequest returnRequest} method when it wants the request
+ * to be returned.
+ *
+ * @throws IllegalStateException if invoked inappropriately
+ **/
+ public void detach() {
+ if (serverHandler == null) {
+ throw new IllegalStateException("No server handler registered");
+ }
+ detached = true;
+ }
+
+ /**
+ * Check whether this method was detached.
+ *
+ * @return true if this method was detached
+ **/
+ boolean isDetached() {
+ return detached;
+ }
+
+ /**
+ * Return this request. This method may only be invoked after the
+ * {@link #detach detach} method has been invoked, and only once
+ * per request. Note that if you detach a method without invoking
+ * this method, it will never be returned, causing a resource leak
+ * (NB: not good).
+ *
+ * @throws IllegalStateException if invoked inappropriately
+ **/
+ public void returnRequest() {
+ if (!detached) {
+ throw new IllegalStateException("Request not detached");
+ }
+ serverHandler.returnRequest();
+ }
+
+ /**
+ * Register the fact that an error has occurred.
+ *
+ * @param errorCode the error code (see {@link ErrorCode})
+ * @param errorMessage the error message
+ **/
+ public void setError(int errorCode, String errorMessage) {
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
+ }
+
+ /**
+ * Check if this Request contains return types compatible with the
+ * given type string. If this Request contains an error it is
+ * considered incompatible with all possible type strings. If the
+ * return values are not compatible with the given type string and
+ * an error condition is not set, the {@link
+ * ErrorCode#WRONG_RETURN} error will be set. This method is
+ * intended to be used by the RPC client after a method has been
+ * invoked to verify the return value types. Please refer to the
+ * {@link Method} class description for an explanation of type
+ * strings.
+ *
+ * @return true if all is ok and the return types are compatible
+ * with 'returnTypes'
+ * @param returnTypes type string
+ **/
+ public boolean checkReturnTypes(String returnTypes) {
+ if (errorCode != ErrorCode.NONE) {
+ return false;
+ }
+ if (returnValues.satisfies(returnTypes)) {
+ return true;
+ }
+ setError(ErrorCode.WRONG_RETURN,
+ "checkReturnValues: Wrong return values");
+ return false;
+ }
+
+ /**
+ * Check if an error has occurred with this Request
+ *
+ * @return true if an error has occurred
+ **/
+ public boolean isError() {
+ return (errorCode != ErrorCode.NONE);
+ }
+
+ /**
+ * Obtain the error code associated with this Request
+ *
+ * @return error code
+ **/
+ public int errorCode() {
+ return errorCode;
+ }
+
+ /**
+ * Obtain the error message associated with this Request, if any
+ *
+ * @return error message
+ **/
+ public String errorMessage() {
+ return errorMessage;
+ }
+
+ public @Override String toString() {
+ return "request " + methodName + "(" + parameters + ")" + ( returnValues.size()>0 ? ": " + returnValues : "");
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/RequestPacket.java b/jrt/src/com/yahoo/jrt/RequestPacket.java
new file mode 100644
index 00000000000..bffcbd05f81
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/RequestPacket.java
@@ -0,0 +1,52 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+
+
+class RequestPacket extends Packet
+{
+ private StringValue methodName;
+ private Values parameters;
+
+ public RequestPacket(int flags, int reqId,
+ String methodName,
+ Values parameters)
+ {
+ super(flags, reqId);
+ this.methodName = new StringValue(methodName);
+ this.parameters = parameters;
+ }
+
+ public RequestPacket(int flags, int reqId,
+ ByteBuffer src)
+ {
+ super(flags, reqId);
+ methodName = new StringValue(src);
+ parameters = new Values(src);
+ }
+
+ public int bytes() {
+ return (headerLength +
+ methodName.bytes() +
+ parameters.bytes());
+ }
+
+ public int packetCode() {
+ return PCODE_REQUEST;
+ }
+
+ public void encode(ByteBuffer dst) {
+ methodName.encode(dst);
+ parameters.encode(dst);
+ }
+
+ public String methodName() {
+ return methodName.asString();
+ }
+
+ public Values parameters() {
+ return parameters;
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/RequestWaiter.java b/jrt/src/com/yahoo/jrt/RequestWaiter.java
new file mode 100644
index 00000000000..523f2b25223
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/RequestWaiter.java
@@ -0,0 +1,18 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Interface used to wait for the completion of a {@link
+ * Request}. This interface is used with the {@link Target#invokeAsync
+ * Target.invokeAsync} method.
+ **/
+public interface RequestWaiter {
+
+ /**
+ * Invoked when a request has completed.
+ *
+ * @param req the completed request
+ **/
+ public void handleRequestDone(Request req);
+}
diff --git a/jrt/src/com/yahoo/jrt/Scheduler.java b/jrt/src/com/yahoo/jrt/Scheduler.java
new file mode 100644
index 00000000000..403cb4cae99
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Scheduler.java
@@ -0,0 +1,136 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class Scheduler {
+ private static final int TICK = 100;
+ private static final int SLOTS = 512;
+ private static final int MASK = 511;
+ private static final int SHIFT = 9;
+
+ private Task[] slots = new Task[SLOTS + 1];
+ private int[] counts = new int[SLOTS + 1];
+ private Queue queue = new Queue(TICK);
+ private int currIter = 0;
+ private int currSlot = 0;
+ private long nextTick;
+
+ private static boolean isActive(Task task) {
+ return (task.next() != null);
+ }
+
+ private void linkIn(Task task) {
+ Task head = slots[task.slot()];
+ if (head == null) {
+ task.next(task);
+ task.prev(task);
+ slots[task.slot()] = task;
+ } else {
+ task.next(head);
+ task.prev(head.prev());
+ head.prev().next(task);
+ head.prev(task);
+ }
+ ++counts[task.slot()];
+ }
+
+ private void linkOut(Task task) {
+ Task head = slots[task.slot()];
+ if (task.next() == task) {
+ slots[task.slot()] = null;
+ } else {
+ task.prev().next(task.next());
+ task.next().prev(task.prev());
+ if (head == task) {
+ slots[task.slot()] = task.next();
+ }
+ }
+ task.next(null);
+ task.prev(null);
+ --counts[task.slot()];
+ }
+
+ public Scheduler(long now) {
+ nextTick = now + TICK;
+ }
+
+ public synchronized void schedule(Task task, double seconds) {
+ if (task.isKilled()) {
+ return;
+ }
+ if (seconds < 0.0) {
+ throw new IllegalArgumentException("cannot schedule a Task in the past");
+ }
+ int ticks = 1 + (int) (seconds * 10.0 + 0.5);
+ if (isActive(task)) {
+ linkOut(task);
+ }
+ task.slot((ticks + currSlot) & MASK);
+ task.iter(currIter + ((ticks + currSlot) >> SHIFT));
+ linkIn(task);
+ }
+
+ public synchronized void scheduleNow(Task task) {
+ if (task.isKilled()) {
+ return;
+ }
+ if (isActive(task)) {
+ linkOut(task);
+ }
+ task.slot(SLOTS);
+ task.iter(0);
+ linkIn(task);
+ }
+
+ public synchronized boolean unschedule(Task task) {
+ if (isActive(task)) {
+ linkOut(task);
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized boolean kill(Task task) {
+ task.setKilled();
+ if (isActive(task)) {
+ linkOut(task);
+ return true;
+ }
+ return false;
+ }
+
+ private void queueTasks(int slot, int iter) {
+ int cnt = counts[slot];
+ Task task = slots[slot];
+ for (int i = 0; i < cnt; i++) {
+ Task next = task.next();
+ if (task.iter() == iter) {
+ linkOut(task);
+ queue.enqueue(task);
+ }
+ task = next;
+ }
+ }
+
+ public void checkTasks(long now) {
+ if (slots[SLOTS] == null && now < nextTick) {
+ return;
+ }
+ synchronized (this) {
+ queueTasks(SLOTS, 0);
+ for (int i = 0; now >= nextTick; i++, nextTick += TICK) {
+ if (i < 3) {
+ if (++currSlot >= SLOTS) {
+ currSlot = 0;
+ currIter++;
+ }
+ queueTasks(currSlot, currIter);
+ }
+ }
+ }
+ while (!queue.isEmpty()) {
+ Task task = (Task) queue.dequeue();
+ task.perform();
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/SessionHandler.java b/jrt/src/com/yahoo/jrt/SessionHandler.java
new file mode 100644
index 00000000000..0f9411b7173
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/SessionHandler.java
@@ -0,0 +1,66 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Interface used to handle the lifetime of a {@link Target}. The word
+ * session is used to denote all the RPC activity across a single
+ * {@link Target} during its lifetime. This interface gives the
+ * application information about when different {@link Target} objects
+ * enter different stages in their lifetime. Combined with the ability
+ * to bind application specific data to a {@link Target} with the
+ * {@link Target#setContext} method, this enables method invocations
+ * in the same session to share state information. Usage of this
+ * interface is optional. It is typically useful for server
+ * applications needing state to be shared between RPC method
+ * invocation on a session. Each {@link Supervisor} can only have a
+ * single session handler. Use the {@link Supervisor#setSessionHandler
+ * Supervisor.setSessionHandler} method to set the session
+ * handler. The different callbacks may be called from several
+ * different threads, but for a single target there will be no
+ * overlapping of session callbacks, and the order will always be the
+ * same; init, live (not always called), down, fini.
+ **/
+public interface SessionHandler {
+
+ /**
+ * Invoked when a new {@link Target} is created. This is a nice
+ * place to initialize and attach application context to the
+ * {@link Target}.
+ *
+ * @param target the target
+ **/
+ public void handleSessionInit(Target target);
+
+ /**
+ * Invoked when a connection is established with the peer. Note
+ * that if a connection could not be established with the peer,
+ * this method is never invoked.
+ *
+ * @param target the target
+ **/
+ public void handleSessionLive(Target target);
+
+ /**
+ * Invoked when the target becomes invalid. This is typically
+ * caused by the network connection with the peer going down. Note
+ * that this method is invoked also when a connection with the
+ * peer could not be established at all.
+ *
+ * @param target the target
+ **/
+ public void handleSessionDown(Target target);
+
+ /**
+ * Invoked when the target is invalid and no more RPC invocations
+ * are active on our side of this target (invoked from the other
+ * side; we being the server). If you need to perform cleanup
+ * related to the application data associated with the target, you
+ * should wait until this method is invoked, to avoid cleaning up
+ * the {@link Target} application context under the feet of active
+ * invocations.
+ *
+ * @param target the target
+ **/
+ public void handleSessionFini(Target target);
+}
diff --git a/jrt/src/com/yahoo/jrt/SingleRequestWaiter.java b/jrt/src/com/yahoo/jrt/SingleRequestWaiter.java
new file mode 100644
index 00000000000..64f354aaf89
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/SingleRequestWaiter.java
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class SingleRequestWaiter implements RequestWaiter {
+
+ private boolean done = false;
+
+ public synchronized void handleRequestDone(Request req) {
+ done = true;
+ notify();
+ }
+
+ public synchronized void waitDone() {
+ while (!done) {
+ try { wait(); } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Spec.java b/jrt/src/com/yahoo/jrt/Spec.java
new file mode 100644
index 00000000000..7ed0aa69920
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Spec.java
@@ -0,0 +1,133 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+
+
+/**
+ * A Spec is a network address used for either listening or
+ * connecting.
+ **/
+public class Spec
+{
+ private SocketAddress address;
+ private String host;
+ private int port;
+ private boolean malformed;
+
+ /**
+ * Create a Spec from a string. The form of the input string is
+ * 'tcp/host:port' or 'tcp/port' where 'host' is the host name and
+ * 'port' is the port number.
+ *
+ * @param spec input string to be parsed
+ * @see #malformed
+ **/
+ public Spec(String spec) {
+ if (spec.startsWith("tcp/")) {
+ int sep = spec.indexOf(':');
+ String portStr = null;
+ if (sep == -1) {
+ portStr = spec.substring(4);
+ } else {
+ host = spec.substring(4, sep);
+ portStr = spec.substring(sep + 1);
+ }
+ try {
+ port = Integer.parseInt(portStr);
+ } catch (NumberFormatException e) {
+ host = null;
+ port = 0;
+ malformed = true;
+ }
+ } else {
+ malformed = true;
+ }
+ }
+
+ /**
+ * Create a Spec from a host name and a port number.
+ *
+ * @param host host name
+ * @param port port number
+ **/
+ public Spec(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ /**
+ * Create a Spec from a port number.
+ *
+ * @param port port number
+ **/
+ public Spec(int port) {
+ this.port = port;
+ }
+
+ /**
+ * Obtain the host name of this address
+ *
+ * @return host name
+ **/
+ public String host() {
+ return host;
+ }
+
+ /**
+ * Obtain the port number if this address
+ *
+ * @return port number
+ **/
+ public int port() {
+ return port;
+ }
+
+ /**
+ * If this Spec was created from a string, this method will tell
+ * you whether that string was malformed.
+ *
+ * @return true if this address is malformed
+ **/
+ public boolean malformed() {
+ return malformed;
+ }
+
+ /**
+ * Resolve the socket address for this Spec. If this Spec is
+ * malformed, this method will return null.
+ *
+ * @return socket address
+ **/
+ SocketAddress address() {
+ if (malformed) {
+ return null;
+ }
+ if (address == null) {
+ if (host == null) {
+ address = new InetSocketAddress(port);
+ } else {
+ address = new InetSocketAddress(host, port);
+ }
+ }
+ return address;
+ }
+
+ /**
+ * Obtain a string representation of this address. The return
+ * value from this method may be used to create a new Spec.
+ *
+ * @return string representation of this address
+ **/
+ public String toString() {
+ if (malformed) {
+ return "MALFORMED";
+ }
+ if (host == null) {
+ return "tcp/" + port;
+ }
+ return "tcp/" + host + ":" + port;
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/StringArray.java b/jrt/src/com/yahoo/jrt/StringArray.java
new file mode 100644
index 00000000000..8195a586f9d
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/StringArray.java
@@ -0,0 +1,82 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * String array. The internal string representation is UTF-8 encoded
+ * bytes. This means that creating an object of this class as well as
+ * extracting the value contained with the {@link #asStringArray
+ * asStringArray} method will incur a string conversion overhead.
+ **/
+public class StringArray extends Value
+{
+ private byte[][] value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public StringArray(String[] value) {
+ this.value = new byte[value.length][];
+ for (int i = 0; i < value.length; i++) {
+ try {
+ this.value[i] = value[i].getBytes("UTF-8");
+ } catch(java.io.UnsupportedEncodingException e) {}
+ }
+ }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ StringArray(ByteBuffer src) {
+ int size = src.getInt();
+ value = new byte[size][];
+ for (int i = 0; i < size; i++) {
+ value[i] = new byte[src.getInt()];
+ src.get(value[i]);
+ }
+ }
+
+ /**
+ * @return STRING_ARRAY
+ **/
+ public byte type() { return STRING_ARRAY; }
+ public int count() { return value.length; }
+
+ int bytes() {
+ int bytes = 4;
+ for (int i = 0; i < value.length; i++) {
+ bytes += 4 + value[i].length;
+ }
+ return bytes;
+ }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.length);
+ for (int i = 0; i < value.length; i++) {
+ dst.putInt(value[i].length);
+ dst.put(value[i]);
+ }
+ }
+
+ public String[] asStringArray() {
+ String[] ret = new String[value.length];
+ for (int i = 0; i < value.length; i++) {
+ try {
+ ret[i] = new String(value[i], "UTF-8");
+ } catch(java.io.UnsupportedEncodingException e) {}
+ }
+ return ret;
+ }
+
+ public @Override String toString() {
+ return Arrays.toString(asStringArray());
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/StringValue.java b/jrt/src/com/yahoo/jrt/StringValue.java
new file mode 100644
index 00000000000..175884c435b
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/StringValue.java
@@ -0,0 +1,68 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import com.yahoo.text.Utf8Array;
+import com.yahoo.text.Utf8String;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * String value. The internal string representation is UTF-8 encoded
+ * bytes. This means that creating an object of this class as well as
+ * extracting the value contained with the {@link #asString asString}
+ * method will incur a string conversion overhead.
+ **/
+public class StringValue extends Value
+{
+ private Utf8Array value;
+
+ /**
+ * Create from a Java-type value
+ *
+ * @param value the value
+ **/
+ public StringValue(String value) {
+ this.value = new Utf8String(value);
+ }
+ public StringValue(Utf8String value) {
+ this.value = value;
+ }
+ public StringValue(Utf8Array value) {
+ this.value = value;
+ }
+
+ /**
+ * Create by decoding the value from the given buffer
+ *
+ * @param src buffer where the value is stored
+ **/
+ StringValue(ByteBuffer src) {
+ int size = src.getInt();
+ value = new Utf8String(new Utf8Array(src, size));
+ }
+
+ /**
+ * @return STRING
+ **/
+ public byte type() { return STRING; }
+ public int count() { return 1; }
+
+ int bytes() { return 4 + value.getByteLength(); }
+ void encode(ByteBuffer dst) {
+ dst.putInt(value.getByteLength());
+ value.writeTo(dst);
+ }
+
+ public String asString() {
+ return value.toString();
+ }
+
+ public @Override Utf8Array asUtf8Array() { return value; }
+
+ public @Override String toString() {
+ return asString();
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java
new file mode 100644
index 00000000000..89f677c04f0
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Supervisor.java
@@ -0,0 +1,320 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.util.HashMap;
+
+
+/**
+ * A Supervisor keeps a method repository and handles dispatching of
+ * incoming invocation requests. Each end-point of a connection is
+ * represented by a {@link Target} object and each {@link Target} is
+ * associated with a single Supervisor that handles the invocation
+ * requests obtained from that {@link Target}. Note that RPC
+ * invocations can be performed both ways across a connection, so even
+ * the client side of a connection has RPC server capabilities.
+ **/
+public class Supervisor {
+
+ private class AddMethod implements Runnable {
+ private Method method;
+ AddMethod(Method method) {
+ this.method = method;
+ }
+ public void run() {
+ methodMap.put(method.name(), method);
+ }
+ }
+
+ private class RemoveMethod implements Runnable {
+ private String methodName;
+ private Method method = null;
+ RemoveMethod(String methodName) {
+ this.methodName = methodName;
+ }
+ RemoveMethod(Method method) {
+ this.methodName = method.name();
+ this.method = method;
+ }
+ public void run() {
+ Method m = methodMap.remove(methodName);
+ if (method != null && m != method) {
+ methodMap.put(method.name(), method);
+ }
+ }
+ }
+
+ private Transport transport;
+ private SessionHandler sessionHandler = null;
+ private HashMap<String, Method> methodMap = new HashMap<>();
+ private int maxInputBufferSize = 0;
+ private int maxOutputBufferSize = 0;
+
+ /**
+ * Create a new Supervisor based on the given {@link Transport}
+ *
+ * @param transport object performing low-level operations for
+ * this Supervisor
+ **/
+ public Supervisor(Transport transport) {
+ this.transport = transport;
+ new MandatoryMethods(this);
+ }
+
+ /**
+ * Set maximum input buffer size. This value will only affect
+ * connections that use a common input buffer when decoding
+ * incoming packets. Note that this value is not an absolute
+ * max. The buffer will still grow larger than this value if
+ * needed to decode big packets. However, when the buffer becomes
+ * larger than this value, it will be shrunk back when possible.
+ *
+ * @param bytes buffer size in bytes. 0 means unlimited.
+ **/
+ public void setMaxInputBufferSize(int bytes) {
+ maxInputBufferSize = bytes;
+ }
+
+ /**
+ * Set maximum output buffer size. This value will only affect
+ * connections that use a common output buffer when encoding
+ * outgoing packets. Note that this value is not an absolute
+ * max. The buffer will still grow larger than this value if needed
+ * to encode big packets. However, when the buffer becomes larger
+ * than this value, it will be shrunk back when possible.
+ *
+ * @param bytes buffer size in bytes. 0 means unlimited.
+ **/
+ public void setMaxOutputBufferSize(int bytes) {
+ maxOutputBufferSize = bytes;
+ }
+
+ /**
+ * Obtain the method map for this Supervisor
+ *
+ * @return the method map
+ **/
+ HashMap<String, Method> methodMap() {
+ return methodMap;
+ }
+
+ /**
+ * Obtain the underlying Transport object.
+ *
+ * @return underlying Transport object
+ **/
+ public Transport transport() {
+ return transport;
+ }
+
+ /**
+ * Set the session handler for this Supervisor
+ *
+ * @param handler the session handler
+ **/
+ public void setSessionHandler(SessionHandler handler) {
+ sessionHandler = handler;
+ }
+
+ /**
+ * Add a method to the set of methods held by this Supervisor
+ *
+ * @param method the method to add
+ **/
+ public void addMethod(Method method) {
+ transport.perform(new AddMethod(method));
+ }
+
+ /**
+ * Remove a method from the set of methods held by this Supervisor
+ *
+ * @param methodName name of the method to remove
+ **/
+ public void removeMethod(String methodName) {
+ transport.perform(new RemoveMethod(methodName));
+ }
+
+ /**
+ * Remove a method from the set of methods held by this
+ * Supervisor. Use this if you know exactly which method to remove
+ * and not only the name.
+ *
+ * @param method the method to remove
+ **/
+ public void removeMethod(Method method) {
+ transport.perform(new RemoveMethod(method));
+ }
+
+ /**
+ * Connect to the given address. The new {@link Target} will be
+ * associated with this Supervisor.
+ *
+ * @return Target representing our end of the connection
+ * @param spec where to connect
+ * @see #connect(com.yahoo.jrt.Spec, java.lang.Object)
+ **/
+ public Target connect(Spec spec) {
+ return transport.connect(this, spec, null, false);
+ }
+
+ /**
+ * Connect to the given address. The new {@link Target} will be
+ * associated with this Supervisor. This method will perform a
+ * synchronous connect in the calling thread.
+ *
+ * @return Target representing our end of the connection
+ * @param spec where to connect
+ * @see #connectSync(com.yahoo.jrt.Spec, java.lang.Object)
+ **/
+ public Target connectSync(Spec spec) {
+ return transport.connect(this, spec, null, true);
+ }
+
+ /**
+ * Connect to the given address. The new {@link Target} will be
+ * associated with this Supervisor and will have 'context' as
+ * application context.
+ *
+ * @return Target representing our end of the connection
+ * @param spec where to connect
+ * @param context application context for the Target
+ * @see Target#getContext
+ **/
+ public Target connect(Spec spec, Object context) {
+ return transport.connect(this, spec, context, false);
+ }
+
+ /**
+ * Connect to the given address. The new {@link Target} will be
+ * associated with this Supervisor and will have 'context' as
+ * application context. This method will perform a synchronous
+ * connect in the calling thread.
+ *
+ * @return Target representing our end of the connection
+ * @param spec where to connect
+ * @param context application context for the Target
+ * @see Target#getContext
+ **/
+ public Target connectSync(Spec spec, Object context) {
+ return transport.connect(this, spec, context, true);
+ }
+
+ /**
+ * Listen to the given address.
+ *
+ * @return active object accepting new connections that will be
+ * associated with this Supervisor
+ * @param spec the address to listen to
+ **/
+ public Acceptor listen(Spec spec) throws ListenFailedException {
+ return transport.listen(this, spec);
+ }
+
+ /**
+ * Convenience method for connecting to a peer, invoking a method
+ * and disconnecting.
+ *
+ * @param spec the address to connect to
+ * @param req the invocation request
+ * @param timeout request timeout in seconds
+ **/
+ public void invokeBatch(Spec spec, Request req, double timeout) {
+ Target target = connectSync(spec);
+ try {
+ target.invokeSync(req, timeout);
+ } finally {
+ target.close();
+ }
+ }
+
+ /**
+ * This method is invoked when a new target is created
+ *
+ * @param target the target
+ **/
+ void sessionInit(Target target) {
+ if (target instanceof Connection) {
+ Connection conn = (Connection) target;
+ conn.setMaxInputSize(maxInputBufferSize);
+ conn.setMaxOutputSize(maxOutputBufferSize);
+ }
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionInit(target);
+ }
+ }
+
+ /**
+ * This method is invoked when a target establishes a connection
+ * with its peer
+ *
+ * @param target the target
+ **/
+ void sessionLive(Target target) {
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionLive(target);
+ }
+ }
+
+ /**
+ * This method is invoked when a target becomes invalid
+ *
+ * @param target the target
+ **/
+ void sessionDown(Target target) {
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionDown(target);
+ }
+ }
+
+ /**
+ * This method is invoked when a target is invalid and no more
+ * invocations are active
+ *
+ * @param target the target
+ **/
+ void sessionFini(Target target) {
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionFini(target);
+ }
+ }
+
+ /**
+ * This method is invoked each time we write a packet. This method
+ * is empty and only used for testing through sub-classing.
+ *
+ * @param info information about the written packet
+ **/
+ void writePacket(PacketInfo info) {}
+
+ /**
+ * This method is invoked each time we read a packet. This method
+ * is empty and only used for testing through sub-classing.
+ *
+ * @param info information about the read packet
+ **/
+ void readPacket(PacketInfo info) {}
+
+ /**
+ * Handle a packet received on one of the connections associated
+ * with this Supervisor. This method is invoked for all packets
+ * not handled by a {@link ReplyHandler}
+ *
+ * @param conn where the packet came from
+ * @param packet the packet
+ **/
+ void handlePacket(Connection conn, Packet packet) {
+ if (packet.packetCode() != Packet.PCODE_REQUEST) {
+ return;
+ }
+ RequestPacket rp = (RequestPacket) packet;
+ Request req = new Request(rp.methodName(), rp.parameters());
+ Method method = methodMap.get(req.methodName());
+ new InvocationServer(conn, req, method,
+ packet.requestId(),
+ packet.noReply()).invoke();
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Target.java b/jrt/src/com/yahoo/jrt/Target.java
new file mode 100644
index 00000000000..eb7e2d0e38d
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Target.java
@@ -0,0 +1,147 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * A Target represents a connection endpoint with RPC
+ * capabilities. Each such connection has a client and a server
+ * side. The client side is the one initiating the connection. RPC
+ * requests may be invoked across the connection from both the client
+ * and the server side.
+ **/
+public abstract class Target {
+
+ private Object context;
+
+ /**
+ * Create a Target with the given application context.
+ *
+ * @param context application context
+ **/
+ Target(Object context) {
+ this.context = context;
+ }
+
+ /**
+ * Create a Target without any application context.
+ **/
+ Target() {
+ this(null);
+ }
+
+ /**
+ * Set the application context associated with this target.
+ *
+ * @param context application context
+ **/
+ public void setContext(Object context) {
+ this.context = context;
+ }
+
+ /**
+ * Obtain the application context associated with this target.
+ *
+ * @return application context
+ **/
+ public Object getContext() {
+ return context;
+ }
+
+ /**
+ * Check if this target is still valid for invocations.
+ *
+ * @return true if this target is still valid
+ **/
+ public abstract boolean isValid();
+
+ /**
+ * Obtain the low-level reason behind losing the connection for
+ * which this target is an endpoint. If the target is still valid
+ * or if the target became invalid because it was closed, this
+ * method will return null. In other cases this method may or may
+ * not return an exception indicating why the connection was
+ * lost. Also, if an exception is returned, its nature may vary
+ * based on implementation details across platforms.
+ *
+ * @return exception causing connection loss or null
+ **/
+ public Exception getConnectionLostReason() { return null; }
+
+ /**
+ * Check if this target represents the client side of a
+ * connection.
+ *
+ * @return true if this is a client-side target
+ **/
+ public abstract boolean isClient();
+
+ /**
+ * Check if this target represents the server side of a
+ * connection.
+ *
+ * @return true if this is a server-side target
+ **/
+ public abstract boolean isServer();
+
+ /**
+ * Invoke a request on this target and wait for it to return.
+ *
+ * @param req the request
+ * @param timeout timeout in seconds
+ **/
+ public abstract void invokeSync(Request req, double timeout);
+
+ /**
+ * Invoke a request on this target and let the completion be
+ * signalled with a callback.
+ *
+ * @param req the request
+ * @param timeout timeout in seconds
+ * @param waiter callback handler
+ **/
+ public abstract void invokeAsync(Request req, double timeout,
+ RequestWaiter waiter);
+
+ /**
+ * Invoke a request on this target, but ignore the return
+ * value(s). The success or failure of the invocation is also
+ * ignored. However, the return value gives a little hint by
+ * indicating whether the invocation has been attempted at all.
+ *
+ * @return false if the invocation was not attempted due to the
+ * target being invalid
+ * @param req the request
+ **/
+ public abstract boolean invokeVoid(Request req);
+
+ /**
+ * Add a watcher to this target. A watcher is notified if the
+ * target becomes invalid. If the target is already invalid when
+ * this method is invoked, no operation is performed and false is
+ * returned. Multiple adds of the same watcher has no additional
+ * effect.
+ *
+ * @return true if the add operation was performed
+ * @param watcher the watcher to be added
+ **/
+ public abstract boolean addWatcher(TargetWatcher watcher);
+
+ /**
+ * Remove a watcher from this target. If the target is already
+ * invalid when this method is invoked, no operation is performed
+ * and false is returned. Multiple removes of the same watcher has
+ * no additional effect.
+ *
+ * @return true if the remove operation was performed
+ * @param watcher the watcher to be removed
+ * @see #addWatcher
+ **/
+ public abstract boolean removeWatcher(TargetWatcher watcher);
+
+ /**
+ * Close this target. Note that the close operation is
+ * asynchronous. If you need to wait for the target to become
+ * invalid, use the {@link Transport#sync Transport.sync} method.
+ **/
+ public abstract void close();
+}
diff --git a/jrt/src/com/yahoo/jrt/TargetWatcher.java b/jrt/src/com/yahoo/jrt/TargetWatcher.java
new file mode 100644
index 00000000000..9b75bcdede4
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/TargetWatcher.java
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Interface used to notify when a {@link Target} becomes
+ * invalid. Listening is controlled with the {@link Target#addWatcher
+ * Target.addWatcher} and {@link Target#removeWatcher
+ * Target.removeWatcher} methods.
+ **/
+public interface TargetWatcher {
+
+ /**
+ * Invoked when a target becomes invalid.
+ *
+ * @param target the target that has become invalid.
+ **/
+ public void notifyTargetInvalid(Target target);
+}
diff --git a/jrt/src/com/yahoo/jrt/Task.java b/jrt/src/com/yahoo/jrt/Task.java
new file mode 100644
index 00000000000..36b96d6fda7
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Task.java
@@ -0,0 +1,99 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * A Task enables a Runnable to be scheduled for execution in the
+ * transport thread some time in the future. Tasks are used internally
+ * to handle RPC timeouts. Use the {@link Transport#createTask
+ * Transport.createTask} method to create a task associated with a
+ * {@link Transport} object. Note that Task execution is designed to
+ * be low-cost, so do not expect extreme accuracy. Also note that any
+ * tasks that are pending execution when the owning {@link Transport}
+ * object is shut down will never be run.
+ **/
+public class Task {
+ private Scheduler owner;
+ private Runnable doit;
+ private int slot;
+ private int iter;
+ private Task next;
+ private Task prev;
+ private boolean killed;
+
+ // methods used by the scheduler
+ int slot() { return slot; }
+ void slot(int val) { slot = val; }
+ int iter() { return iter; }
+ void iter(int val) { iter = val; }
+ Task next() { return next; }
+ void next(Task val) { next = val; }
+ Task prev() { return prev; }
+ void prev(Task val) { prev = val; }
+ boolean isKilled() { return killed; }
+ void setKilled() { killed = true; }
+ void perform() { doit.run(); }
+
+ /**
+ * Create a Task owned by the given scheduler
+ *
+ * @param owner the scheduler owning this task
+ * @param doit what to run when the task is executed
+ **/
+ Task(Scheduler owner, Runnable doit) {
+ this.owner = owner;
+ this.doit = doit;
+ }
+
+ /**
+ * Schedule this task for execution. A task may be scheduled
+ * multiple times, but may only have a single pending execution
+ * time. Re-scheduling a task that is not yet run will move the
+ * execution time. If the task has already been executed,
+ * scheduling works just like if the task was never run.
+ *
+ * @param seconds the number of seconds until the task should be
+ * executed
+ * @see #kill
+ **/
+ public void schedule(double seconds) {
+ owner.schedule(this, seconds);
+ }
+
+ /**
+ * Schedule this task for execution as soon as possible. This will
+ * result in the task being executed the next time the reactor
+ * loop inside the owning {@link Transport} object checks for
+ * tasks to run. If you have something that is even more urgent,
+ * or something you need to be executed even if the {@link
+ * Transport} is shut down, use the {@link Transport#perform}
+ * method instead.
+ * @see #kill
+ **/
+ public void scheduleNow() {
+ owner.scheduleNow(this);
+ }
+
+ /**
+ * Cancel the execution of this task.
+ *
+ * @return true if the task was scheduled and we managed to avoid
+ * execution
+ **/
+ public boolean unschedule() {
+ return owner.unschedule(this);
+ }
+
+ /**
+ * Cancel the execution of this task and make sure it can never be
+ * scheduled for execution again. After this method is invoked,
+ * invoking the {@link #schedule schedule} and {@link #scheduleNow
+ * scheduleNow} methods will have no effect.
+ *
+ * @return true if the task was scheduled and we managed to avoid
+ * execution
+ **/
+ public boolean kill() {
+ return owner.kill(this);
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/ThreadQueue.java b/jrt/src/com/yahoo/jrt/ThreadQueue.java
new file mode 100644
index 00000000000..e5a2a28b5d9
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/ThreadQueue.java
@@ -0,0 +1,63 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * A thread-safe queue wrapper used to pass objects between threads.
+ **/
+class ThreadQueue
+{
+ private Queue queue = new Queue();
+ private boolean closed = false;
+
+ public ThreadQueue() {}
+
+ /**
+ * Enqueue an object on this queue. If the queue has been closed,
+ * the object will not be queued, and this method will return
+ * false.
+ *
+ * @return true if the object was enqueued, false if this queue
+ * was closed
+ * @param obj the object to enqueue
+ **/
+ public synchronized boolean enqueue(Object obj) {
+ if (closed) {
+ return false;
+ }
+ queue.enqueue(obj);
+ if (queue.size() == 1) {
+ notify(); // assume only one reader
+ }
+ return true;
+ }
+
+ /**
+ * Close this queue. After this method is invoked, no more objects
+ * may be enqueued on this queue. Also, trying to dequeue an
+ * object form a queue that is both empty and closed will cause a
+ * {@link EndOfQueueException}.
+ **/
+ public synchronized void close() {
+ closed = true;
+ notify();
+ }
+
+ /**
+ * Dequeue the next object from this queue. This method will block
+ * until at least one object is available in the queue.
+ *
+ * @return the next object from the queue
+ * @throws EndOfQueueException if the queue is both empty and
+ * closed
+ **/
+ public synchronized Object dequeue() throws EndOfQueueException {
+ while (queue.isEmpty() && !closed) {
+ try { wait(); } catch (InterruptedException x) {}
+ }
+ if (queue.isEmpty()) {
+ throw new EndOfQueueException();
+ }
+ return queue.dequeue();
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/TieBreaker.java b/jrt/src/com/yahoo/jrt/TieBreaker.java
new file mode 100644
index 00000000000..b327328fae6
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/TieBreaker.java
@@ -0,0 +1,25 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * When multiple actors race to come to a certain point and only one
+ * should be allowed to continue, an object of this class may be used
+ * to settle who should be allowed to continue. Note that external
+ * synchronization is needed if this class is used with multiple
+ * threads.
+ **/
+class TieBreaker {
+ private boolean first = true;
+
+ /**
+ * Are we the first to come here?
+ *
+ * @return true if you are first, false otherwise.
+ **/
+ public boolean first() {
+ boolean ret = first;
+ first = false;
+ return ret;
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
new file mode 100644
index 00000000000..85bfed79732
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -0,0 +1,401 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * The Transport class is the core needed to make your {@link
+ * Supervisor} tick. It implements the reactor pattern to perform
+ * multiplexed network IO, handles scheduled tasks and keeps track of
+ * some additional helper threads. A single Transport object can back
+ * multiple {@link Supervisor} objects.
+ **/
+public class Transport {
+
+ private static final int OPEN = 1;
+ private static final int CLOSING = 2;
+ private static final int CLOSED = 3;
+
+ private class Run implements Runnable {
+ public void run() {
+ try {
+ Transport.this.run();
+ } catch (Throwable problem) {
+ handleFailure(problem, Transport.this);
+ }
+ }
+ }
+
+ private class AddConnectionCmd implements Runnable {
+ private Connection conn;
+ AddConnectionCmd(Connection conn) { this.conn = conn; }
+ public void run() { handleAddConnection(conn); }
+ }
+
+ private class CloseConnectionCmd implements Runnable {
+ private Connection conn;
+ CloseConnectionCmd(Connection conn) { this.conn = conn; }
+ public void run() { handleCloseConnection(conn); }
+ }
+
+ private class EnableWriteCmd implements Runnable {
+ private Connection conn;
+ EnableWriteCmd(Connection conn) { this.conn = conn; }
+ public void run() { handleEnableWrite(conn); }
+ }
+
+ private class SyncCmd implements Runnable {
+ boolean done = false;
+ public synchronized void waitDone() {
+ while (!done) {
+ try { wait(); } catch (InterruptedException e) {}
+ }
+ }
+ public synchronized void run() {
+ done = true;
+ notify();
+ }
+ }
+
+ private static Logger log = Logger.getLogger(Transport.class.getName());
+
+ private FatalErrorHandler fatalHandler; // NB: this must be set first
+ private Thread thread;
+ private Queue queue;
+ private Queue myQueue;
+ private Connector connector;
+ private Closer closer;
+ private Scheduler scheduler;
+ private int state;
+ private Selector selector;
+
+ private void handleAddConnection(Connection conn) {
+ if (conn.isClosed()) {
+ if (conn.hasSocket()) {
+ closer.closeLater(conn);
+ }
+ return;
+ }
+ if (!conn.init(selector)) {
+ handleCloseConnection(conn);
+ }
+ }
+
+ private void handleCloseConnection(Connection conn) {
+ if (conn.isClosed()) {
+ return;
+ }
+ conn.fini();
+ if (conn.hasSocket()) {
+ closer.closeLater(conn);
+ }
+ }
+
+ private void handleEnableWrite(Connection conn) {
+ if (conn.isClosed()) {
+ return;
+ }
+ conn.enableWrite();
+ }
+
+ private boolean postCommand(Runnable cmd) {
+ boolean wakeup;
+ synchronized (this) {
+ if (state == CLOSED) {
+ return false;
+ }
+ wakeup = queue.isEmpty();
+ queue.enqueue(cmd);
+ }
+ if (wakeup) {
+ selector.wakeup();
+ }
+ return true;
+ }
+
+ private void handleEvents() {
+ synchronized (this) {
+ queue.flush(myQueue);
+ }
+ while (!myQueue.isEmpty()) {
+ ((Runnable)myQueue.dequeue()).run();
+ }
+ }
+
+ private boolean handleIOEvents(Connection conn,
+ SelectionKey key) {
+ if (conn.isClosed()) {
+ return true;
+ }
+ if (key.isReadable()) {
+ try {
+ conn.read();
+ } catch (IOException e) {
+ conn.setLostReason(e);
+ return false;
+ }
+ }
+ if (key.isWritable()) {
+ try {
+ conn.write();
+ } catch (IOException e) {
+ conn.setLostReason(e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Create a new Transport object with the given fatal error handler.
+ *
+ * @param fatalHandler fatal error handler
+ **/
+ public Transport(FatalErrorHandler fatalHandler) {
+ synchronized (this) {
+ this.fatalHandler = fatalHandler; // NB: this must be set first
+ }
+ thread = new Thread(new Run(), "<transport>");
+ queue = new Queue();
+ myQueue = new Queue();
+ connector = new Connector(this);
+ closer = new Closer(this);
+ scheduler = new Scheduler(System.currentTimeMillis());
+ state = OPEN;
+ try {
+ selector = Selector.open();
+ } catch (Exception e) {
+ throw new Error("Could not open transport selector", e);
+ }
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Create a Transport object with no fatal error handler. If a
+ * fatal error occurs when no fatal error handler is registered,
+ * the default action is to log the error and exit with exit code
+ * 1.
+ **/
+ public Transport() {
+ this(null);
+ }
+
+ /**
+ * Proxy method used to dispatch fatal errors to the fatal error
+ * handler. If no handler is registered, the default action is to
+ * log the error and halt the Java VM.
+ *
+ * @param problem the throwable causing the failure
+ * @param context the object owning the crashing thread
+ **/
+ void handleFailure(Throwable problem, Object context) {
+ if (fatalHandler != null) {
+ fatalHandler.handleFailure(problem, context);
+ return;
+ }
+ try {
+ log.log(Level.SEVERE, "fatal error in " + context, problem);
+ } catch (Throwable ignore) {}
+ Runtime.getRuntime().halt(1);
+ }
+
+ /**
+ * Listen to the given address. This method is called by a {@link
+ * Supervisor} object.
+ *
+ * @return active object accepting new connections
+ * @param owner the one calling this method
+ * @param spec the address to listen to
+ **/
+ Acceptor listen(Supervisor owner, Spec spec) throws ListenFailedException {
+ return new Acceptor(this, owner, spec);
+ }
+
+ /**
+ * Connect to the given address. This method is called by a {@link
+ * Supervisor} object.
+ *
+ * @return the new connection
+ * @param owner the one calling this method
+ * @param spec the address to connect to
+ * @param context application context for the new connection
+ * @param sync perform a synchronous connect in the calling thread
+ * if this flag is set
+ **/
+ Connection connect(Supervisor owner, Spec spec,
+ Object context, boolean sync) {
+ Connection conn = new Connection(this, owner, spec, context);
+ if (sync) {
+ addConnection(conn.connect());
+ } else {
+ connector.connectLater(conn);
+ }
+ return conn;
+ }
+
+ /**
+ * Add a connection to the set of connections handled by this
+ * Transport. Invoked by the {@link Connector} class.
+ *
+ * @param conn the connection to add
+ **/
+ void addConnection(Connection conn) {
+ if (!postCommand(new AddConnectionCmd(conn))) {
+ perform(new CloseConnectionCmd(conn));
+ }
+ }
+
+ /**
+ * Request an asynchronous close of a connection.
+ *
+ * @param conn the connection to close
+ **/
+ void closeConnection(Connection conn) {
+ postCommand(new CloseConnectionCmd(conn));
+ }
+
+ /**
+ * Request an asynchronous enabling of write events for a
+ * connection.
+ *
+ * @param conn the connection to enable write events for
+ **/
+ void enableWrite(Connection conn) {
+ if (Thread.currentThread() == thread) {
+ handleEnableWrite(conn);
+ } else {
+ postCommand(new EnableWriteCmd(conn));
+ }
+ }
+
+ /**
+ * Create a {@link Task} that can be scheduled for execution in
+ * the transport thread.
+ *
+ * @return the newly created Task
+ * @param cmd what to run when the task is executed
+ **/
+ public Task createTask(Runnable cmd) {
+ return new Task(scheduler, cmd);
+ }
+
+ /**
+ * Perform the given command in such a way that it does not run
+ * concurrently with the transport thread or other commands
+ * performed by invoking this method. This method will continue to
+ * work even after the transport thread has been shut down.
+ *
+ * @param cmd the command to perform
+ **/
+ public void perform(Runnable cmd) {
+ if (Thread.currentThread() == thread) {
+ cmd.run();
+ return;
+ }
+ if (!postCommand(cmd)) {
+ join();
+ synchronized (thread) {
+ cmd.run();
+ }
+ }
+ }
+
+ /**
+ * Synchronize with the transport thread. This method will block
+ * until all commands issued before this method was invoked has
+ * completed. If the transport thread has been shut down (or is in
+ * the progress of being shut down) this method will instead wait
+ * for the transport thread to complete, since no more commands
+ * will be performed, and waiting would be forever. Invoking this
+ * method from the transport thread is not a good idea.
+ *
+ * @return this object, to enable chaining
+ **/
+ public Transport sync() {
+ SyncCmd cmd = new SyncCmd();
+ if (postCommand(cmd)) {
+ cmd.waitDone();
+ } else {
+ join();
+ }
+ return this;
+ }
+
+ private void run() {
+ while (state == OPEN) {
+
+ // perform I/O selection
+ try {
+ selector.select(100);
+ } catch (IOException e) {
+ log.log(Level.WARNING, "error during select", e);
+ }
+
+ // handle internal events
+ handleEvents();
+
+ // handle I/O events
+ Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+ while (keys.hasNext()) {
+ SelectionKey key = keys.next();
+ Connection conn = (Connection) key.attachment();
+ keys.remove();
+ if (!handleIOEvents(conn, key)) {
+ handleCloseConnection(conn);
+ }
+ }
+
+ // check scheduled tasks
+ scheduler.checkTasks(System.currentTimeMillis());
+ }
+ connector.shutdown().waitDone();
+ synchronized (this) {
+ state = CLOSED;
+ }
+ handleEvents();
+ Iterator<SelectionKey> keys = selector.keys().iterator();
+ while (keys.hasNext()) {
+ SelectionKey key = keys.next();
+ Connection conn = (Connection) key.attachment();
+ handleCloseConnection(conn);
+ }
+ try { selector.close(); } catch (Exception e) {}
+ closer.shutdown().join();
+ connector.exit().join();
+ }
+
+ /**
+ * Initiate controlled shutdown of the transport thread.
+ *
+ * @return this object, to enable chaining with join
+ **/
+ public Transport shutdown() {
+ synchronized (this) {
+ if (state == OPEN) {
+ state = CLOSING;
+ selector.wakeup();
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Wait for the transport thread to finish.
+ **/
+ public void join() {
+ while (true) {
+ try {
+ thread.join();
+ return;
+ } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Value.java b/jrt/src/com/yahoo/jrt/Value.java
new file mode 100644
index 00000000000..7effb7f6e2a
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Value.java
@@ -0,0 +1,290 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import com.yahoo.text.Utf8Array;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * <p>A single value that may be either a parameter or a return value
+ * associated with a {@link Request}. Multiple values are bundled
+ * together with the {@link Values} class. The value type identifiers
+ * are defined by the RPC protocol. Each identifier matches the value
+ * of an ASCII character (listed after the Java class for the type).</p>
+ *
+ * <p>Most Value subclasses that are constructed from a Java array
+ * will not copy the array. This enables the same data to back
+ * multiple Value objects, but it also means that the application
+ * should be careful not to change the backing data under the feet of
+ * a Value object.</p>
+ **/
+public abstract class Value
+{
+ /** type identifier for {@link Int8Value} (b) **/
+ public static final byte INT8 = 'b';
+
+ /** type identifier for {@link Int8Array} (B) **/
+ public static final byte INT8_ARRAY = 'B';
+
+ /** type identifier for {@link Int16Value} (h) **/
+ public static final byte INT16 = 'h';
+
+ /** type identifier for {@link Int16Array} (H) **/
+ public static final byte INT16_ARRAY = 'H';
+
+ /** type identifier for {@link Int32Value} (i) **/
+ public static final byte INT32 = 'i';
+
+ /** type identifier for {@link Int32Array} (I) **/
+ public static final byte INT32_ARRAY = 'I';
+
+ /** type identifier for {@link Int64Value} (l) **/
+ public static final byte INT64 = 'l';
+
+ /** type identifier for {@link Int64Array} (L) **/
+ public static final byte INT64_ARRAY = 'L';
+
+ /** type identifier for {@link FloatValue} (f) **/
+ public static final byte FLOAT = 'f';
+
+ /** type identifier for {@link FloatArray} (F) **/
+ public static final byte FLOAT_ARRAY = 'F';
+
+ /** type identifier for {@link DoubleValue} (d) **/
+ public static final byte DOUBLE = 'd';
+
+ /** type identifier for {@link DoubleArray} (D) **/
+ public static final byte DOUBLE_ARRAY = 'D';
+
+ /** type identifier for {@link StringValue} (s) **/
+ public static final byte STRING = 's';
+
+ /** type identifier for {@link StringArray} (S) **/
+ public static final byte STRING_ARRAY = 'S';
+
+ /** type identifier for {@link DataValue} (x) **/
+ public static final byte DATA = 'x';
+
+ /** type identifier for {@link DataArray} (X) **/
+ public static final byte DATA_ARRAY = 'X';
+
+ /**
+ * Obtain the type identifier for this value
+ *
+ * @return type identifier
+ **/
+ public abstract byte type();
+
+ /**
+ * Obtain the number of entries stored in this value. This is 1
+ * for basic data types and the size of the array for array types.
+ *
+ * @return the number of entries stored in this value
+ **/
+ public abstract int count();
+
+ /**
+ * Determine the number of bytes needed to store this value when
+ * encoded into a buffer
+ *
+ * @return number of bytes needed for encoding this value
+ **/
+ abstract int bytes();
+
+ /**
+ * Encode this value into the given buffer
+ *
+ * @param dst where to encode this value
+ **/
+ abstract void encode(ByteBuffer dst);
+
+ /**
+ * Decode a value from the given buffer. This method also acts as
+ * a factory for value objects
+ *
+ * @return the decoded value
+ * @param type value type identifier
+ * @param src where the value is stored
+ * @throws IllegalArgumentException if the given type identifier is illegal
+ **/
+ static Value decode(byte type, ByteBuffer src) {
+ switch (type) {
+ case INT8: return new Int8Value(src);
+ case INT8_ARRAY: return new Int8Array(src);
+ case INT16: return new Int16Value(src);
+ case INT16_ARRAY: return new Int16Array(src);
+ case INT32: return new Int32Value(src);
+ case INT32_ARRAY: return new Int32Array(src);
+ case INT64: return new Int64Value(src);
+ case INT64_ARRAY: return new Int64Array(src);
+ case FLOAT: return new FloatValue(src);
+ case FLOAT_ARRAY: return new FloatArray(src);
+ case DOUBLE: return new DoubleValue(src);
+ case DOUBLE_ARRAY: return new DoubleArray(src);
+ case STRING: return new StringValue(src);
+ case STRING_ARRAY: return new StringArray(src);
+ case DATA: return new DataValue(src);
+ case DATA_ARRAY: return new DataArray(src);
+ }
+ throw new IllegalArgumentException();
+ }
+
+ /**
+ * Interpret this value as a {@link Int8Value} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int8Value}
+ **/
+ public byte asInt8() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link Int8Array} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int8Array}
+ **/
+ public byte[] asInt8Array() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link Int16Value} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int16Value}
+ **/
+ public short asInt16() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link Int16Array} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int16Array}
+ **/
+ public short[] asInt16Array() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link Int32Value} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int32Value}
+ **/
+ public int asInt32() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link Int32Array} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int32Array}
+ **/
+ public int[] asInt32Array() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link Int64Value} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int64Value}
+ **/
+ public long asInt64() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link Int64Array} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Int64Array}
+ **/
+ public long[] asInt64Array() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link FloatValue} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link FloatValue}
+ **/
+ public float asFloat() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link FloatArray} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link FloatArray}
+ **/
+ public float[] asFloatArray() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link DoubleValue} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link DoubleValue}
+ **/
+ public double asDouble() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link DoubleArray} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link DoubleArray}
+ **/
+ public double[] asDoubleArray() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link StringValue} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link StringValue}
+ **/
+ public String asString() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link StringValue} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link Utf8Array}
+ **/
+ public Utf8Array asUtf8Array() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link StringArray} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link StringArray}
+ **/
+ public String[] asStringArray() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link DataValue} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link DataValue}
+ **/
+ public byte[] asData() { throw new ClassCastException(); }
+
+ /**
+ * Interpret this value as a {@link DataArray} and return the
+ * contents as an appropriate Java type
+ *
+ * @return the value contained in this object as a Java type
+ * @throws ClassCastException if this is not a {@link DataArray}
+ **/
+ public byte[][] asDataArray() { throw new ClassCastException(); }
+
+ /** Force a proper toString */
+ public abstract @Override String toString();
+
+}
diff --git a/jrt/src/com/yahoo/jrt/Values.java b/jrt/src/com/yahoo/jrt/Values.java
new file mode 100644
index 00000000000..05a444e4c6d
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Values.java
@@ -0,0 +1,140 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.nio.ByteBuffer;
+
+
+/**
+ * A sequence of values used to represent parameters and return values
+ * associated with a {@link Request}. The individual values are
+ * represented by {@link Value} objects.
+ **/
+public class Values
+{
+ private List<Value> values = new ArrayList<Value>(16);
+
+ /**
+ * Check whether the values stored in this object satisfies the
+ * given type string.
+ *
+ * @return true if this value sequence satisfies 'types'
+ * @param types type string
+ **/
+ public boolean satisfies(String types) {
+ int off = 0;
+ int len = Math.min(types.length(), size());
+ while (off < len && types.charAt(off) == get(off).type()) {
+ off++;
+ }
+ return ((off == types.length() && off == size()) ||
+ (off + 1 == types.length() && types.charAt(off) == '*'));
+ }
+
+ /**
+ * Create an empty sequence of values
+ **/
+ public Values() {
+ }
+
+ /**
+ * Create a sequence of values by decoding them from the given
+ * buffer
+ *
+ * @param src buffer containing a contained value sequence
+ **/
+ Values(ByteBuffer src) {
+ decode(src);
+ }
+
+ /**
+ * Add a value to the end of the sequence
+ *
+ * @return this, to enable chaining
+ * @param value the value to add
+ **/
+ public Values add(Value value) {
+ values.add(value);
+ return this;
+ }
+
+ /**
+ * Obtain a specific value from this sequence
+ *
+ * @return a value from this sequence
+ * @param idx the index of the value to obtain
+ **/
+ public Value get(int idx) {
+ return values.get(idx);
+ }
+
+ /**
+ * Obtain the number of values in this sequence
+ *
+ * @return the number of values in this sequence
+ **/
+ public int size() {
+ return values.size();
+ }
+
+ /**
+ * Determine the number of bytes needed to store this value
+ * sequence when encoded into a buffer
+ *
+ * @return number of bytes needed for encoding this value sequence
+ **/
+ int bytes() {
+ int bytes = 4 + values.size();
+ for (int i = 0; i < values.size(); i++) {
+ bytes += get(i).bytes();
+ }
+ return bytes;
+ }
+
+ /**
+ * Encode this value sequence into the given buffer
+ *
+ * @param dst where to encode this value sequence
+ **/
+ void encode(ByteBuffer dst) {
+ byte[] types = new byte[values.size()];
+ for (int i = 0; i < types.length; i++) {
+ types[i] = get(i).type();
+ }
+ dst.putInt(types.length);
+ dst.put(types);
+ for (int i = 0; i < types.length; i++) {
+ get(i).encode(dst);
+ }
+ }
+
+ /**
+ * Decode a value sequence from the given buffer into this object
+ *
+ * @param src where the value sequence is stored
+ **/
+ void decode(ByteBuffer src) {
+ values.clear();
+ int cnt = src.getInt();
+ byte[] types = new byte[cnt];
+ src.get(types);
+ for (int i = 0; i < cnt; i++) {
+ values.add(Value.decode(types[i], src));
+ }
+ }
+
+ public @Override String toString() {
+ if (values.size()==0) return "";
+ if (values.size()==1) return values.get(0).toString();
+ StringBuffer buffer=new StringBuffer();
+ for (int i=0; i<values.size(); i++) {
+ buffer.append(values.get(i).toString());
+ if (i<values.size()-1)
+ buffer.append(",");
+ }
+ return buffer.toString();
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/package-info.java b/jrt/src/com/yahoo/jrt/package-info.java
new file mode 100644
index 00000000000..2af54efd92f
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.jrt;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/BackOff.java b/jrt/src/com/yahoo/jrt/slobrok/api/BackOff.java
new file mode 100644
index 00000000000..ead299a7a8d
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/BackOff.java
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.slobrok.api;
+
+
+class BackOff implements BackOffPolicy
+{
+ private double time = 0.50;
+
+ public void reset() {
+ time = 0.50;
+ }
+
+ public double get() {
+ double ret = time;
+ if (time < 5.0) {
+ time += 0.5;
+ } else if (time < 10.0) {
+ time += 1.0;
+ } else if (time < 30.0) {
+ time += 5;
+ } else {
+ // max retry time is 30 seconds
+ }
+ return ret;
+ }
+
+ public boolean shouldWarn(double t) {
+ return ((t > 8.1 && t < 9.9) || (t > 29.9));
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/BackOffPolicy.java b/jrt/src/com/yahoo/jrt/slobrok/api/BackOffPolicy.java
new file mode 100644
index 00000000000..4fcbe157c44
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/BackOffPolicy.java
@@ -0,0 +1,38 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.slobrok.api;
+
+/**
+ * Interface used to control how fast the mirror and register classes
+ * will retry in case of errors. The reset method is used to indicate
+ * that all is ok. When things start failing, the get method will be
+ * invoked for each new attempt in order to get the appropriate delay
+ * between retries (typically increasing for each invocation). The
+ * shouldWarn method is used to ask if a certain delay returned from
+ * get should result in a warning being logged. When things get back
+ * to normal operation, the reset method is invoked to indicate that
+ * we are no longer in a failure state.
+ **/
+public interface BackOffPolicy
+{
+ /**
+ * Reset backoff logic.
+ **/
+ public void reset();
+
+ /**
+ * Obtain the number of seconds to wait before the next
+ * attempt.
+ *
+ * @return delay in seconds
+ **/
+ public double get();
+
+ /**
+ * Check if a certain delay should result in a warning being
+ * logged.
+ *
+ * @return true if we should log
+ * @param t current delay value
+ **/
+ public boolean shouldWarn(double t);
+}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java
new file mode 100644
index 00000000000..3662e6ad5b9
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java
@@ -0,0 +1,34 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.slobrok.api;
+
+/**
+ * Defines an interface for the name server lookup.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ **/
+public interface IMirror {
+
+ /**
+ * Obtain all the services matching a given pattern.
+ *
+ * The pattern is matched against all service names in the local mirror repository. A service name may contain '/'
+ * as a separator token. A pattern may contain '*' to match anything up to the next '/' (or the end of the
+ * name). This means that the pattern 'foo/<!-- slash-star -->*<!-- star-slash -->/baz' would match the service
+ * names 'foo/bar/baz' and 'foo/xyz/baz'. The pattern 'foo/b*' would match 'foo/bar', but neither 'foo/xyz' nor
+ * 'foo/bar/baz'. The pattern 'a*b' will never match anything.
+ * As a special case, a pattern can end in '**' to match the rest of a name including '/' separators.
+ *
+ * @return a list of all matching services, with corresponding connect specs
+ * @param pattern The pattern used for matching
+ **/
+ public Mirror.Entry[] lookup(String pattern);
+
+ /**
+ * Obtain the number of updates seen by this mirror. The value may wrap, but will never become 0 again. This can be
+ * used for name lookup optimization, because the results returned by lookup() will never change unless this number
+ * also changes.
+ *
+ * @return number of slobrok updates seen
+ **/
+ public int updates();
+}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java
new file mode 100644
index 00000000000..5e62cb61b76
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java
@@ -0,0 +1,337 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.slobrok.api;
+
+
+import com.yahoo.jrt.*;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.ArrayList;
+import java.util.logging.Logger;
+import java.util.logging.Level;
+
+
+/**
+ * A Mirror object is used to keep track of the services registered
+ * with a slobrok cluster.
+ *
+ * Updates to the service repository are fetched in the
+ * background. Lookups against this object is done using an internal
+ * mirror of the service repository.
+ **/
+public class Mirror implements IMirror {
+
+ private static Logger log = Logger.getLogger(Mirror.class.getName());
+
+ /**
+ * An Entry contains the name and connection spec for a single
+ * service.
+ **/
+ public static final class Entry implements Comparable<Entry> {
+ private final String name;
+ private final String spec;
+ private final char [] nameArray;
+
+ public Entry(String name, String spec) {
+ this.name = name;
+ this.spec = spec;
+ this.nameArray = name.toCharArray();
+ }
+
+ public boolean equals(Object rhs) {
+ if (rhs == null || !(rhs instanceof Entry)) {
+ return false;
+ }
+ Entry e = (Entry) rhs;
+ return (name.equals(e.name) && spec.equals(e.spec));
+ }
+
+ public int hashCode() {
+ return (name.hashCode() + spec.hashCode());
+ }
+
+ public int compareTo(Entry b) {
+ int diff = name.compareTo(b.name);
+ return diff != 0
+ ? diff
+ : spec.compareTo(b.spec);
+ }
+ char [] getNameArray() { return nameArray; }
+ public String getName() { return name; }
+ public String getSpec() { return spec; }
+ }
+
+ private Supervisor orb;
+ private SlobrokList slobroks;
+ private String currSlobrok;
+ private BackOffPolicy backOff;
+ private volatile int updates = 0;
+ private boolean reqDone = false;
+ private volatile Entry[] specs = new Entry[0];
+ private int specsGen = 0;
+ private Task updateTask = null;
+ private RequestWaiter reqWait = null;
+ private Target target = null;
+ private Request req = null;
+
+ /**
+ * Create a new MirrorAPI using the given Supervisor and slobrok
+ * connect specs.
+ *
+ * @param orb the Supervisor to use
+ * @param slobroks slobrok connect spec list
+ * @param bop custom backoff policy, mostly useful for testing
+ **/
+ public Mirror(Supervisor orb, SlobrokList slobroks, BackOffPolicy bop) {
+ this.orb = orb;
+ this.slobroks = slobroks;
+ this.backOff = bop;
+ updateTask = orb.transport().createTask(new Runnable() {
+ public void run() { handleUpdate(); }
+ });
+ reqWait = new RequestWaiter() {
+ public void handleRequestDone(Request req) {
+ reqDone = true;
+ updateTask.scheduleNow();
+ }
+ };
+ updateTask.scheduleNow();
+ }
+
+ /**
+ * Create a new MirrorAPI using the given Supervisor and slobrok
+ * connect specs.
+ *
+ * @param orb the Supervisor to use
+ * @param slobroks slobrok connect spec list
+ **/
+ public Mirror(Supervisor orb, SlobrokList slobroks) {
+ this(orb, slobroks, new BackOff());
+ }
+
+ /**
+ * Shut down the Mirror. This will close any open connections and
+ * stop the regular mirror updates.
+ **/
+ public void shutdown() {
+ updateTask.kill();
+ orb.transport().perform(new Runnable() {
+ public void run() { handleShutdown(); }
+ });
+ }
+
+ @Override
+ public Entry[] lookup(String pattern) {
+ ArrayList<Entry> found = new ArrayList<Entry>();
+ Entry [] e = specs;
+ char [] p = pattern.toCharArray();
+ for (int i = 0; i < e.length; i++) {
+ if (match(e[i].getNameArray(), p)) {
+ found.add(e[i]);
+ }
+ }
+ return found.toArray(new Entry[found.size()]);
+ }
+
+ @Override
+ public int updates() {
+ return updates;
+ }
+
+ /**
+ * Ask if the MirrorAPI has got any useful information from the Slobrok.
+ *
+ * On application startup it is often useful to run the event loop for some time until this functions returns true
+ * (or if it never does, time out and tell the user there was no answer from any Service Location Broker).
+ *
+ * @return true if the MirrorAPI object has asked for updates from a Slobrok and got any answer back
+ **/
+ public boolean ready() {
+ return (updates != 0);
+ }
+
+ /**
+ * Returns whether this mirror is connected to the slobrok server at this
+ * time or not.
+ */
+ public boolean connected() {
+ return (target != null);
+ }
+
+ /**
+ * Match a single name against a pattern.
+ * A pattern can contain '*' to match until the next '/' separator,
+ * and end with '**' to match the rest of the name.
+ * Note that this isn't quite globbing, as there is no backtracking.
+ *
+ * @return true if the name matches the pattern
+ * @param name the name
+ * @param pattern the pattern
+ **/
+ static boolean match(char [] name, char [] pattern) {
+ int ni = 0;
+ int pi = 0;
+
+ while (ni < name.length && pi < pattern.length) {
+ if (name[ni] == pattern[pi]) {
+ ni++;
+ pi++;
+ } else if (pattern[pi] == '*') {
+ pi++;
+ while (ni < name.length && name[ni] != '/') {
+ ni++;
+ }
+ if (pi < pattern.length && pattern[pi] == '*') {
+ pi++;
+ ni = name.length;
+ }
+ } else {
+ return false;
+ }
+ }
+ while (pi < pattern.length && pattern[pi] == '*') {
+ pi++;
+ }
+ return ((ni == name.length) && (pi == pattern.length));
+ }
+
+ /**
+ * Invoked by the update task.
+ **/
+ private void handleUpdate() {
+ if (reqDone) {
+ reqDone = false;
+
+ if (req.errorCode() == ErrorCode.NONE &&
+ req.returnValues().satisfies("SSi") &&
+ req.returnValues().get(0).count() == req.returnValues().get(1).count())
+ {
+ Values answer = req.returnValues();
+
+ if (specsGen != answer.get(2).asInt32()) {
+
+ int numNames = answer.get(0).count();
+ String[] n = answer.get(0).asStringArray();
+ String[] s = answer.get(1).asStringArray();
+ Entry[] newSpecs = new Entry[numNames];
+
+ for (int idx = 0; idx < numNames; idx++) {
+ newSpecs[idx] = new Entry(n[idx], s[idx]);
+ }
+
+ specs = newSpecs;
+
+ specsGen = answer.get(2).asInt32();
+ int u = (updates + 1);
+ if (u == 0) {
+ u++;
+ }
+ updates = u;
+ }
+ backOff.reset();
+ updateTask.schedule(0.1); // be nice
+ return;
+ }
+ if (!req.checkReturnTypes("iSSSi")
+ || (req.returnValues().get(2).count() !=
+ req.returnValues().get(3).count()))
+ {
+ target.close();
+ target = null;
+ updateTask.scheduleNow(); // try next slobrok
+ return;
+ }
+
+
+ Values answer = req.returnValues();
+
+ int diffFrom = answer.get(0).asInt32();
+ int diffTo = answer.get(4).asInt32();
+
+ if (specsGen != diffTo) {
+
+ int nRemoves = answer.get(1).count();
+ String[] r = answer.get(1).asStringArray();
+
+ int numNames = answer.get(2).count();
+ String[] n = answer.get(2).asStringArray();
+ String[] s = answer.get(3).asStringArray();
+
+
+ Entry[] newSpecs;
+ if (diffFrom == 0) {
+ newSpecs = new Entry[numNames];
+
+ for (int idx = 0; idx < numNames; idx++) {
+ newSpecs[idx] = new Entry(n[idx], s[idx]);
+ }
+ } else {
+ java.util.HashMap<String, Entry> map = new java.util.HashMap<String, Entry>();
+ for (Entry e : specs) {
+ map.put(e.getName(), e);
+ }
+ for (String rem : r) {
+ map.remove(rem);
+ }
+ for (int idx = 0; idx < numNames; idx++) {
+ map.put(n[idx], new Entry(n[idx], s[idx]));
+ }
+ newSpecs = new Entry[map.size()];
+ int idx = 0;
+ for (Entry e : map.values()) {
+ newSpecs[idx++] = e;
+ }
+ }
+
+ specs = newSpecs;
+
+ specsGen = diffTo;
+ int u = (updates + 1);
+ if (u == 0) {
+ u++;
+ }
+ updates = u;
+ }
+ backOff.reset();
+ updateTask.schedule(0.1); // be nice
+ return;
+ }
+ if (target != null && ! slobroks.contains(currSlobrok)) {
+ target.close();
+ target = null;
+ }
+ if (target == null) {
+ currSlobrok = slobroks.nextSlobrokSpec();
+ if (currSlobrok == null) {
+ double delay = backOff.get();
+ updateTask.schedule(delay);
+ if (backOff.shouldWarn(delay)) {
+ log.log(Level.INFO, "no location brokers available "
+ + "(retry in " + delay + " seconds) for: " + slobroks);
+ }
+ return;
+ }
+ target = orb.connect(new Spec(currSlobrok));
+ specsGen = 0;
+ }
+ req = new Request("slobrok.incremental.fetch");
+ req.parameters().add(new Int32Value(specsGen)); // gencnt
+ req.parameters().add(new Int32Value(5000)); // mstimeout
+ target.invokeAsync(req, 40.0, reqWait);
+ }
+
+ /**
+ * Invoked from the transport thread, requested by the shutdown
+ * method.
+ **/
+ private void handleShutdown() {
+ if (req != null) {
+ req.abort();
+ req = null;
+ }
+ if (target != null) {
+ target.close();
+ target = null;
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Register.java b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java
new file mode 100644
index 00000000000..84720501ff8
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java
@@ -0,0 +1,269 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.slobrok.api;
+
+
+import com.yahoo.jrt.*;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.logging.Logger;
+import java.util.logging.Level;
+
+
+/**
+ * A Register object is used to register and unregister services with
+ * a slobrok cluster.
+ *
+ * The register/unregister operations performed against this object
+ * are stored in a todo list that will be performed asynchronously
+ * against the slobrok cluster as soon as possible.
+ **/
+public class Register {
+
+ private static Logger log = Logger.getLogger(Register.class.getName());
+
+ private Supervisor orb;
+ private SlobrokList slobroks;
+ private String currSlobrok;
+ private String mySpec;
+ private BackOffPolicy backOff;
+ private boolean reqDone = false;
+ private ArrayList<String> names = new ArrayList<String>();
+ private ArrayList<String> pending = new ArrayList<String>();
+ private ArrayList<String> unreg = new ArrayList<String>();
+ private Task updateTask = null;
+ private RequestWaiter reqWait = null;
+ private Target target = null;
+ private Request req = null;
+ private Method m_list = null;
+ private Method m_unreg = null;
+
+ /**
+ * Remove all instances of name from list.
+ **/
+ private void discard(ArrayList<String> list, String name) {
+ ArrayList<String> tmp = new ArrayList<String>();
+ tmp.add(name);
+ list.removeAll(tmp);
+ }
+
+ /**
+ * Create a new Register using the given Supervisor, slobrok
+ * connect specs, hostname and port
+ *
+ * @param orb the Supervisor to use
+ * @param slobroks slobrok connect spec list
+ * @param spec the Spec representing hostname and port for this host
+ * @param bop custom backoff policy, mostly useful for testing
+ **/
+ public Register(Supervisor orb, SlobrokList slobroks, Spec spec, BackOffPolicy bop) {
+ this.orb = orb;
+ this.slobroks = slobroks;
+ this.backOff = bop;
+ mySpec = spec.toString();
+ updateTask = orb.transport().createTask(new Runnable() {
+ public void run() { handleUpdate(); }
+ });
+ reqWait = new RequestWaiter() {
+ public void handleRequestDone(Request req) {
+ reqDone = true;
+ updateTask.scheduleNow();
+ }
+ };
+ m_list = new Method("slobrok.callback.listNamesServed",
+ "", "S", new MethodHandler() {
+ public void invoke(Request req) {
+ handleRpcList(req);
+ }
+ })
+ .methodDesc("List rpcserver names")
+ .returnDesc(0, "names",
+ "The rpcserver names this server wants to serve");
+ orb.addMethod(m_list);
+ m_unreg = new Method("slobrok.callback.notifyUnregistered",
+ "s", "", new MethodHandler() {
+ public void invoke(Request req) {
+ handleRpcUnreg(req);
+ }
+ })
+ .methodDesc("Notify a server about removed registration")
+ .paramDesc(0, "name", "RpcServer name");
+ orb.addMethod(m_unreg);
+ updateTask.scheduleNow();
+ }
+
+ /**
+ * Create a new Register using the given Supervisor, slobrok
+ * connect specs, hostname and port
+ *
+ * @param orb the Supervisor to use
+ * @param slobroks slobrok connect spec list
+ * @param spec the Spec representing hostname and port for this host
+ **/
+ public Register(Supervisor orb, SlobrokList slobroks, Spec spec) {
+ this(orb, slobroks, spec, new BackOff());
+ }
+
+ /**
+ * Create a new Register using the given Supervisor, slobrok
+ * connect specs, hostname and port
+ *
+ * @param orb the Supervisor to use
+ * @param slobroks slobrok connect spec list
+ * @param myHost the hostname of this host
+ * @param myPort the port number we are listening to
+ **/
+ public Register(Supervisor orb, SlobrokList slobroks,
+ String myHost, int myPort) {
+ this(orb, slobroks, new Spec(myHost, myPort));
+ }
+
+
+ /**
+ * Shut down the Register. This will close any open connections
+ * and stop the regular re-registration.
+ **/
+ public void shutdown() {
+ updateTask.kill();
+ orb.transport().perform(new Runnable() {
+ public void run() { handleShutdown(); }
+ });
+ }
+
+ /**
+ * Register a service with the slobrok cluster.
+ *
+ * @param name service name
+ **/
+ public synchronized void registerName(String name) {
+ if (names.indexOf(name) >= 0) {
+ return;
+ }
+ names.add(name);
+ pending.add(name);
+ discard(unreg, name);
+ updateTask.scheduleNow();
+ }
+
+ /**
+ * Unregister a service with the slobrok cluster
+ *
+ * @param name service name
+ **/
+ public synchronized void unregisterName(String name) {
+ discard(names, name);
+ discard(pending, name);
+ unreg.add(name);
+ updateTask.scheduleNow();
+ }
+
+ /**
+ * Invoked by the update task.
+ **/
+ private void handleUpdate() {
+ if (reqDone) {
+ reqDone = false;
+ if (req.isError()) {
+ if (req.errorCode() != ErrorCode.METHOD_FAILED) {
+ log.log(Level.FINE, "register failed: "
+ + req.errorMessage()
+ + " (code " + req.errorCode() + ")");
+ target.close();
+ target = null;
+ } else {
+ log.log(Level.WARNING, "register failed: "
+ + req.errorMessage()
+ + " (code " + req.errorCode() + ")");
+ }
+ } else {
+ backOff.reset();
+ }
+ req = null;
+ }
+ if (req != null) {
+ log.log(Level.FINEST, "req in progress");
+ return; // current request still in progress
+ }
+ if (target != null && ! slobroks.contains(currSlobrok)) {
+ target.close();
+ target = null;
+ }
+ if (target == null) {
+ currSlobrok = slobroks.nextSlobrokSpec();
+ if (currSlobrok == null) {
+ double delay = backOff.get();
+ updateTask.schedule(delay);
+ if (backOff.shouldWarn(delay)) {
+ log.log(Level.WARNING, "slobrok connection problems "
+ + "(retry in " + delay + " seconds) to: " + slobroks);
+ } else {
+ log.log(Level.FINE, "slobrok retry in " + delay
+ + " seconds");
+ }
+ return;
+ }
+ target = orb.connect(new Spec(currSlobrok));
+ synchronized (this) {
+ pending.clear();
+ pending.addAll(names);
+ }
+ }
+ boolean rem = false;
+ boolean reg = false;
+ String name;
+ synchronized (this) {
+ if (unreg.size() > 0) {
+ name = unreg.remove(unreg.size() - 1);
+ rem = true;
+ } else if (pending.size() > 0) {
+ name = pending.remove(pending.size() - 1);
+ reg = true;
+ } else {
+ pending.addAll(names);
+ log.log(Level.FINE, "done, reschedule in 30s");
+ updateTask.schedule(30.0);
+ return;
+ }
+ }
+
+ if (rem) {
+ req = new Request("slobrok.unregisterRpcServer");
+ req.parameters().add(new StringValue(name));
+ log.log(Level.FINE, "unregister [" + name + "]");
+ req.parameters().add(new StringValue(mySpec));
+ target.invokeAsync(req, 35.0, reqWait);
+ } else if (reg) {
+ req = new Request("slobrok.registerRpcServer");
+ req.parameters().add(new StringValue(name));
+ log.log(Level.FINE, "register [" + name + "]");
+ req.parameters().add(new StringValue(mySpec));
+ target.invokeAsync(req, 35.0, reqWait);
+ }
+ }
+
+ private synchronized void handleRpcList(Request req) {
+ Values dst = req.returnValues();
+ dst.add(new StringArray(names.toArray(new String[names.size()])));
+ }
+
+ private void handleRpcUnreg(Request req) {
+ log.log(Level.WARNING, "unregistered name "
+ + req.parameters().get(0).asString());
+ }
+
+ /**
+ * Invoked from the transport thread, requested by the shutdown
+ * method.
+ **/
+ private void handleShutdown() {
+ orb.removeMethod(m_list);
+ orb.removeMethod(m_unreg);
+ if (req != null) {
+ req.abort();
+ req = null;
+ }
+ if (target != null) {
+ target.close();
+ target = null;
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/SlobrokList.java b/jrt/src/com/yahoo/jrt/slobrok/api/SlobrokList.java
new file mode 100644
index 00000000000..43c1c41faa5
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/SlobrokList.java
@@ -0,0 +1,94 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.slobrok.api;
+
+import java.util.Arrays;
+import java.util.Random;
+
+public class SlobrokList {
+
+ private final Internal internal;
+ private String[] slobroks;
+ private int idx = 0;
+
+ public SlobrokList() {
+ this.internal = new Internal();
+ }
+
+ public SlobrokList(SlobrokList sibling) {
+ this.internal = sibling.internal;
+ }
+
+ private void checkUpdate() {
+ synchronized (internal) {
+ if (slobroks != internal.slobroks) {
+ slobroks = internal.slobroks;
+ idx = 0;
+ }
+ }
+ }
+
+
+ public String nextSlobrokSpec() {
+ checkUpdate();
+ if (idx < slobroks.length) {
+ return slobroks[idx++];
+ }
+ idx = 0;
+ return null;
+ }
+
+ public void setup(String[] slobroks) {
+ internal.setup(slobroks);
+ }
+
+ public int length() {
+ return internal.length();
+ }
+
+ public boolean contains(String slobrok) {
+ checkUpdate();
+ for (String s : slobroks) {
+ if (s == slobrok) return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return internal.toString();
+ }
+
+ private static class Internal {
+
+ String[] slobroks = new String[0];
+
+ void setup(String[] slobroks) {
+ String[] next = new String[slobroks.length];
+ for (int i = 0; i < slobroks.length; i++) {
+ next[i] = slobroks[i];
+ }
+ Random rnd = new Random();
+ for (int i = 0; i + 1 < next.length; i++) {
+ int lim = next.length - i;
+ int x = rnd.nextInt(lim);
+ if (x != 0) {
+ String tmp = next[i];
+ next[i] = next[i+x];
+ next[i+x] = tmp;
+ }
+ }
+ synchronized (this) {
+ this.slobroks = next;
+ }
+ }
+
+ synchronized int length() {
+ return slobroks.length;
+ }
+
+ @Override
+ public synchronized String toString() {
+ return Arrays.toString(slobroks);
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/package-info.java b/jrt/src/com/yahoo/jrt/slobrok/api/package-info.java
new file mode 100644
index 00000000000..0c9bc4d7407
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/api/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.jrt.slobrok.api;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jrt/src/com/yahoo/jrt/slobrok/package-info.java b/jrt/src/com/yahoo/jrt/slobrok/package-info.java
new file mode 100644
index 00000000000..a1d921df174
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.jrt.slobrok;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
new file mode 100644
index 00000000000..085489897b5
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
@@ -0,0 +1,270 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.slobrok.server;
+
+import com.yahoo.jrt.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Slobrok {
+
+ private class RegisterCallback implements RequestWaiter {
+
+ Request registerReq;
+ String name;
+ String spec;
+ Target target;
+
+ public RegisterCallback(Request req, String name, String spec) {
+ req.detach();
+ registerReq = req;
+ this.name = name;
+ this.spec = spec;
+ target = orb.connect(new Spec(spec));
+ Request cbReq = new Request("slobrok.callback.listNamesServed");
+ target.invokeAsync(cbReq, 5.0, this);
+ }
+
+ public void handleRequestDone(Request req) {
+ if (!req.checkReturnTypes("S")) {
+ registerReq.setError(ErrorCode.METHOD_FAILED, "error during register callback: "
+ + req.errorMessage());
+ registerReq.returnRequest();
+ target.close();
+ return;
+ }
+ String[] names = req.returnValues().get(0).asStringArray();
+ boolean found = false;
+ for (String n : names) {
+ if (n.equals(name)) {
+ found = true;
+ }
+ }
+ if (!found) {
+ registerReq.setError(ErrorCode.METHOD_FAILED, "register failed: "
+ + "served names does not contain name");
+ registerReq.returnRequest();
+ target.close();
+ return;
+ }
+ handleRegisterCallbackDone(registerReq, name, spec, target);
+ }
+ }
+
+ private class FetchMirror implements Runnable {
+ public final Request req;
+ public final Task task;
+
+ public FetchMirror(Request req, int timeout) {
+ req.detach();
+ this.req = req;
+ task = orb.transport().createTask(this);
+ task.schedule(((double)timeout)/1000.0);
+ }
+ public void run() { // timeout
+ handleFetchMirrorTimeout(this);
+ }
+ }
+
+ private class TargetMonitor implements TargetWatcher {
+ public void notifyTargetInvalid(Target target) {
+ handleTargetDown(target);
+ }
+ }
+
+ Supervisor orb;
+ Acceptor listener;
+ HashMap<String,String> services = new HashMap<String,String>();
+ ArrayList<FetchMirror> pendingFetch = new ArrayList<FetchMirror>();
+ HashMap<String,Target> targets = new HashMap<String,Target>();
+ TargetMonitor monitor = new TargetMonitor();
+ int gencnt = 1;
+
+ public String lookup(String name) {
+ return services.get(name);
+ }
+
+
+ public Slobrok(int port) throws ListenFailedException {
+ orb = new Supervisor(new Transport());
+ registerMethods();
+ try {
+ listener = orb.listen(new Spec(port));
+ } catch (ListenFailedException e) {
+ orb.transport().shutdown().join();
+ throw e;
+ }
+ }
+
+ public Slobrok() throws ListenFailedException {
+ this(0);
+ }
+
+ public int port() {
+ return listener.port();
+ }
+
+ public void stop() {
+ orb.transport().shutdown().join();
+ listener.shutdown().join();
+ }
+
+ public String configId() {
+ return "raw:slobrok[1]\n" +
+ "slobrok[0].connectionspec \"" + new Spec("localhost", listener.port()).toString() + "\"\n";
+ }
+
+ private void updated() {
+ gencnt++;
+ if (gencnt == 0) {
+ gencnt++;
+ }
+ handleFetchMirrorFlush();
+ }
+
+ private void handleRegisterCallbackDone(Request req,
+ String name, String spec,
+ Target target)
+ {
+ String stored = services.get(name);
+ if (stored != null) { // too late
+ if (!stored.equals(spec)) {
+ req.setError(ErrorCode.METHOD_FAILED,
+ "service '" + name + "' registered with another spec");
+ }
+ req.returnRequest();
+ target.close();
+ return;
+ }
+ target.setContext(name);
+ target.addWatcher(monitor);
+ services.put(name, spec);
+ targets.put(name, target);
+ req.returnRequest();
+ updated();
+ }
+
+ private void handleTargetDown(Target target) {
+ String name = (String) target.getContext();
+ targets.remove(name);
+ services.remove(name);
+ updated();
+ }
+
+ private void dumpServices(Request req) {
+ ArrayList<String> names = new ArrayList<String>();
+ ArrayList<String> specs = new ArrayList<String>();
+ for (Map.Entry<String,String> entry : services.entrySet()) {
+ names.add(entry.getKey());
+ specs.add(entry.getValue());
+ }
+ req.returnValues().add(new StringArray(names.toArray(new String[names.size()])));
+ req.returnValues().add(new StringArray(specs.toArray(new String[specs.size()])));
+ req.returnValues().add(new Int32Value(gencnt));
+ }
+
+ private void handleFetchMirrorTimeout(FetchMirror fetch) {
+ pendingFetch.remove(fetch);
+ fetch.req.returnValues().add(new StringArray(new String[0]));
+ fetch.req.returnValues().add(new StringArray(new String[0]));
+ fetch.req.returnValues().add(new Int32Value(gencnt));
+ fetch.req.returnRequest();
+ }
+
+ private void handleFetchMirrorFlush() {
+ for (FetchMirror fetch : pendingFetch) {
+ fetch.task.kill();
+ dumpServices(fetch.req);
+ fetch.req.returnRequest();
+ }
+ pendingFetch.clear();
+ }
+
+ private void registerMethods() {
+ orb.addMethod(new Method("slobrok.registerRpcServer", "ss", "",
+ new MethodHandler() {
+ public void invoke(Request req) {
+ rpc_register(req);
+ }
+ })
+ .methodDesc("Register a rpcserver")
+ .paramDesc(0, "name", "RpcServer name")
+ .paramDesc(1, "spec", "The connection specification"));
+ orb.addMethod(new Method("slobrok.unregisterRpcServer", "ss", "",
+ new MethodHandler() {
+ public void invoke(Request req) {
+ rpc_unregister(req);
+ }
+ })
+ .methodDesc("Unregister a rpcserver")
+ .paramDesc(0, "name", "RpcServer name")
+ .paramDesc(1, "spec", "The connection specification"));
+
+ orb.addMethod(new Method("slobrok.incremental.fetch", "ii", "iSSSi",
+ new MethodHandler() {
+ public void invoke(Request req) {
+ rpc_fetchIncremental(req);
+ }
+ })
+ .methodDesc("Fetch or update mirror of name to spec map")
+ .paramDesc(0, "gencnt", "generation already known by client")
+ .paramDesc(1, "timeout", "How many milliseconds to wait for changes"
+ + "before returning if nothing has changed (max=10000)")
+ .returnDesc(0, "oldgen", "diff from generation already known by client")
+ .returnDesc(1, "removed", "Array of RpcServer names to remove")
+ .returnDesc(2, "names", "Array of RpcServer names with new values")
+ .returnDesc(3, "specs", "Array of connection specifications (same order)")
+ .returnDesc(4, "newgen", "Generation count for new version of the map"));
+ }
+
+ private void rpc_register(Request req) {
+ String name = req.parameters().get(0).asString();
+ String spec = req.parameters().get(1).asString();
+ String stored = services.get(name);
+ if (stored == null) {
+ new RegisterCallback(req, name, spec);
+ } else {
+ if (stored.equals(spec)) {
+ // ok, already stored
+ } else {
+ req.setError(ErrorCode.METHOD_FAILED,
+ "service '" + name + "' registered with another spec");
+ }
+ }
+ }
+
+ private void rpc_unregister(Request req) {
+ String name = req.parameters().get(0).asString();
+ String spec = req.parameters().get(1).asString();
+ String stored = services.get(name);
+ if (stored != null) {
+ if (stored.equals(spec)) {
+ Target target = targets.remove(name);
+ target.removeWatcher(monitor);
+ services.remove(name);
+ target.close();
+ updated();
+ } else {
+ req.setError(ErrorCode.METHOD_FAILED,
+ "service '" + name + "' registered with another spec");
+ }
+ }
+ }
+
+ private void rpc_fetchIncremental(Request req) {
+ int gencnt = req.parameters().get(0).asInt32();
+ int timeout = req.parameters().get(1).asInt32();
+
+ // for now, always make "full diff" from generation 0
+ req.returnValues().add(new Int32Value(0));
+ req.returnValues().add(new StringArray(new String[0]));
+
+ if (gencnt == this.gencnt) {
+ pendingFetch.add(new FetchMirror(req, timeout));
+ } else {
+ dumpServices(req);
+ }
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/package-info.java b/jrt/src/com/yahoo/jrt/slobrok/server/package-info.java
new file mode 100644
index 00000000000..2e77e20852b
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/slobrok/server/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.jrt.slobrok.server;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java
new file mode 100644
index 00000000000..1742b365697
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java
@@ -0,0 +1,107 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt.tool;
+
+import com.yahoo.jrt.*;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * A generic rpc invoker for use by command line tools
+ *
+ * @author <a href="bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class RpcInvoker {
+
+ private Value getArgument(Request request,String parameter) {
+ if (parameter.length()<=1 || parameter.charAt(1)!=':')
+ return new StringValue(parameter);
+
+ String value=parameter.substring(2);
+ switch (parameter.charAt(0)) {
+ case 'b':
+ return new Int8Value(Byte.parseByte(value));
+ case 'h':
+ return new Int16Value(Short.parseShort(value));
+ case 'i':
+ return new Int32Value(Integer.parseInt(value));
+ case 'l':
+ return new Int64Value(Long.parseLong(value));
+ case 'f':
+ return new FloatValue(Float.parseFloat(value));
+ case 'd':
+ return new DoubleValue(Double.parseDouble(value));
+ case 's':
+ return new StringValue(value);
+ }
+
+ throw new IllegalArgumentException("The first letter in '" + parameter + "' must be a type argument. " +
+ "There is no jrt type identified by '" + parameter.charAt(0) + "'");
+ }
+
+ protected Request createRequest(String method,List<String> arguments) {
+ Request request=new Request(method);
+ if (arguments!=null) {
+ for (String argument : arguments)
+ request.parameters().add(getArgument(request,argument));
+ }
+ return request;
+ }
+
+ /**
+ * Invokes a rpc method without throwing an exception
+ *
+ * @param connectspec the rpc server connection spec
+ * @param method the name of the method to invoke
+ * @param arguments the argument to the method, or null or an empty list if there are no arguments
+ */
+ public void invoke(String connectspec,String method, List<String> arguments) {
+ Supervisor supervisor=null;
+ Target target=null;
+
+ try {
+ if (connectspec.indexOf('/')<0)
+ connectspec="tcp/" + connectspec;
+
+ supervisor=new Supervisor(new Transport());
+ target = supervisor.connect(new Spec(connectspec));
+ Request request=createRequest(method,arguments);
+ target.invokeSync(request,10.0);
+ if (request.isError()) {
+ System.err.println("error(" + request.errorCode() + "): " + request.errorMessage());
+ return;
+ }
+ Values returned=request.returnValues();
+ for (int i=0; i<returned.size(); i++) {
+ System.out.println(returned.get(i));
+ }
+ }
+ finally {
+ if (target!=null)
+ target.close();
+ if (supervisor!=null)
+ supervisor.transport().shutdown().join();
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length<1) {
+ System.err.println("usage: invoke [-h <connectspec>] <method> [arguments]");
+ System.err.println(" Connectspec: This is on the form hostname:port, or tcp/hostname:port");
+ System.err.println(" if omitted, localhost:8086 is used");
+ System.err.println(" Arguments: each argument must be a string or on the form <type>:<value>");
+ System.err.println(" supported types: {'b','h','i','l','f','d','s'}");
+ System.exit(0);
+ }
+ List<String> arguments=new ArrayList<String>(Arrays.asList(args));
+ String connectSpec="localhost:8086";
+ if ("-h".equals(arguments.get(0)) && arguments.size()>=3) {
+ arguments.remove(0); // Consume -h
+ connectSpec=arguments.remove(0);
+ }
+ String method=arguments.remove(0);
+ new RpcInvoker().invoke(connectSpec,method,arguments);
+ }
+
+}
diff --git a/jrt/src/com/yahoo/jrt/tool/package-info.java b/jrt/src/com/yahoo/jrt/tool/package-info.java
new file mode 100644
index 00000000000..ccb5473362c
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/tool/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.jrt.tool;
+
+import com.yahoo.osgi.annotation.ExportPackage;