diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /jrt |
Publish
Diffstat (limited to 'jrt')
102 files changed, 10142 insertions, 0 deletions
diff --git a/jrt/.gitignore b/jrt/.gitignore new file mode 100644 index 00000000000..21c1ace700e --- /dev/null +++ b/jrt/.gitignore @@ -0,0 +1,9 @@ +*.iws +.classpath +.project +archive +doc +jrt.iml +log +target +/pom.xml.build diff --git a/jrt/OWNERS b/jrt/OWNERS new file mode 100644 index 00000000000..efd8e309424 --- /dev/null +++ b/jrt/OWNERS @@ -0,0 +1,2 @@ +havardpe +arnej27959 diff --git a/jrt/README b/jrt/README new file mode 100644 index 00000000000..5d98c969a9b --- /dev/null +++ b/jrt/README @@ -0,0 +1 @@ +Java Remote Tools; java implementation of FRT/FNET rpc. diff --git a/jrt/examples/.gitignore b/jrt/examples/.gitignore new file mode 100644 index 00000000000..90b07e9d451 --- /dev/null +++ b/jrt/examples/.gitignore @@ -0,0 +1 @@ +classes diff --git a/jrt/examples/SimpleClient.java b/jrt/examples/SimpleClient.java new file mode 100644 index 00000000000..cfacb92f8ff --- /dev/null +++ b/jrt/examples/SimpleClient.java @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.jrt.*; + +public class SimpleClient { + + public static void main(String args[]) { + if (args.length != 3) { + System.err.println("usage: SimpleClient spec n1 n2"); + System.exit(1); + } + Supervisor supervisor = new Supervisor(new Transport()); + Target target = supervisor.connect(new Spec(args[0])); + Request req = new Request("add"); + req.parameters().add(new Int32Value(Integer.parseInt(args[1]))); + req.parameters().add(new Int32Value(Integer.parseInt(args[2]))); + target.invokeSync(req, 5.0); + if (req.checkReturnTypes("i")) { + System.out.println(args[1] + " + " + args[2] + " = " + + req.returnValues().get(0).asInt32()); + } else { + System.out.println("Invocation failed: " + + req.errorCode() + ": " + req.errorMessage()); + } + target.close(); + supervisor.transport().shutdown().join(); + } +} diff --git a/jrt/examples/SimpleServer.java b/jrt/examples/SimpleServer.java new file mode 100644 index 00000000000..f387e2916c1 --- /dev/null +++ b/jrt/examples/SimpleServer.java @@ -0,0 +1,34 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.jrt.*; + +public class SimpleServer { + + public void rpc_add(Request req) { + int n1 = req.parameters().get(0).asInt32(); + int n2 = req.parameters().get(1).asInt32(); + req.returnValues().add(new Int32Value(n1 + n2)); + } + + public static void main(String args[]) { + if (args.length != 1) { + System.err.println("usage: SimpleServer spec"); + System.exit(1); + } + Supervisor supervisor = new Supervisor(new Transport()); + supervisor.addMethod(new Method("add", "ii", "i", + new SimpleServer(), "rpc_add") + .methodDesc("calculate the sum of 2 integers") + .paramDesc(0, "n1", "an integer") + .paramDesc(1, "n2", "another integer") + .returnDesc(0, "ret", "n1 + n2")); + try { + Acceptor acceptor = supervisor.listen(new Spec(args[0])); + System.out.println("Listening at " + args[0]); + supervisor.transport().join(); + acceptor.shutdown().join(); + } catch (ListenFailedException e) { + System.err.println("Could not listen at " + args[0]); + supervisor.transport().shutdown().join(); + } + } +} diff --git a/jrt/pom.xml b/jrt/pom.xml new file mode 100644 index 00000000000..967d75e8937 --- /dev/null +++ b/jrt/pom.xml @@ -0,0 +1,63 @@ +<?xml version="1.0"?> +<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.vespa</groupId> + <artifactId>parent</artifactId> + <version>6-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <artifactId>jrt</artifactId> + <packaging>container-plugin</packaging> + <version>6-SNAPSHOT</version> + <name>jrt</name> + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>annotations</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>vespajlib</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <sourceDirectory>src</sourceDirectory> + <testSourceDirectory>tests</testSourceDirectory> + <plugins> + <plugin> + <groupId>com.yahoo.vespa</groupId> + <artifactId>bundle-plugin</artifactId> + <extensions>true</extensions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerArgs> + <arg>-Xlint:all</arg> + <arg>-Xlint:-serial</arg> + <arg>-Werror</arg> + </compilerArgs> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-install-plugin</artifactId> + <version>2.3.1</version> + <configuration> + <updateReleaseInfo>true</updateReleaseInfo> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/jrt/runexample.sh b/jrt/runexample.sh new file mode 100755 index 00000000000..f119ee3c4ee --- /dev/null +++ b/jrt/runexample.sh @@ -0,0 +1,9 @@ +#!/bin/sh +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +if [ $# -eq 0 ]; then + echo "usage: $0 <class> [class args]" + echo " available class files:" + ls examples/classes +else + java -cp build/classes:examples/classes "$@" +fi diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java new file mode 100644 index 00000000000..05a7591ab74 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Acceptor.java @@ -0,0 +1,145 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.channels.ServerSocketChannel; +import java.util.logging.Level; +import java.util.logging.Logger; + + +/** + * A class used to listen on a network socket. A separate thread is + * used to accept connections and register them with the underlying + * transport thread. To create an acceptor you need to invoke the + * {@link Supervisor#listen listen} method in the {@link Supervisor} + * class. + **/ +public class Acceptor { + + private class Run implements Runnable { + public void run() { + try { + Acceptor.this.run(); + } catch (Throwable problem) { + parent.handleFailure(problem, Acceptor.this); + } + } + } + + private static Logger log = Logger.getLogger(Acceptor.class.getName()); + + private Thread thread = new Thread(new Run(), "<acceptor>"); + private Transport parent; + private Supervisor owner; + + private ServerSocketChannel serverChannel; + + Acceptor(Transport parent, Supervisor owner, + Spec spec) throws ListenFailedException { + + this.parent = parent; + this.owner = owner; + + if (spec.malformed()) { + throw new ListenFailedException("Malformed spec"); + } + + try { + serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(true); + if (spec.port() != 0) { + serverChannel.socket().setReuseAddress(true); + } + serverChannel.socket().bind(spec.address(), 500); + } catch (Exception e) { + if (serverChannel != null) { + try { serverChannel.socket().close(); } catch (Exception x) {} + } + throw new ListenFailedException("Listen failed", e); + } + + thread.setDaemon(true); + thread.start(); + } + + /** + * Obtain the local port number this Acceptor is listening to. If + * this Acceptor is no longer listening (it has been shut down), + * -1 will be returned. + * + * @return listening port, or -1 if not listening + **/ + public int port() { + if (!serverChannel.isOpen()) { + return -1; + } + return serverChannel.socket().getLocalPort(); + } + + /** + * Obtain the Spec for the local port and host interface this Acceptor + * is listening to. If this Acceptor is no longer listening (it has + * been shut down), null will be returned. + * + * @return listening spec, or null if not listening. + **/ + public Spec spec() { + if (!serverChannel.isOpen()) { + return null; + } + return new Spec(serverChannel.socket().getInetAddress().getHostName(), + serverChannel.socket().getLocalPort()); + } + + private void run() { + while (serverChannel.isOpen()) { + try { + parent.addConnection(new Connection(parent, owner, + serverChannel.accept())); + parent.sync(); + } catch (java.nio.channels.ClosedChannelException x) { + } catch (Exception e) { + log.log(Level.WARNING, "Error accepting connection", e); + } + } + } + + /** + * Initiate controlled shutdown of the acceptor thread + * + * @return this object, to enable chaining with {@link #join join} + **/ + public Acceptor shutdown() { + try { + serverChannel.socket().close(); + } catch (Exception e1) { + log.log(Level.WARNING, "Error closing server socket", e1); + Thread.yield(); // throw some salt over the shoulder + try { + serverChannel.socket().close(); + } catch (Exception e2) { + log.log(Level.WARNING, "Error closing server socket", e2); + Thread.yield(); // throw some salt over the shoulder + try { + serverChannel.socket().close(); + } catch (Exception e3) { + log.log(Level.WARNING, "Error closing server socket", e3); + throw new Error("Error closing server socket 3 times", e3); + } + } + } + return this; + } + + /** + * Wait for the acceptor thread to finish + **/ + public void join() { + while (true) { + try { + thread.join(); + return; + } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/src/com/yahoo/jrt/Buffer.java b/jrt/src/com/yahoo/jrt/Buffer.java new file mode 100644 index 00000000000..53e91d80b4d --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Buffer.java @@ -0,0 +1,126 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +class Buffer { + + static final int MAX_IO = 65000; + + private ByteBuffer buf; + private int readPos; + private int writePos; + private boolean readMode; + + private void setReadMode() { + if (readMode) { + buf.limit(writePos); + return; + } + writePos = buf.position(); + buf.position(readPos); + buf.limit(writePos); + readMode = true; + } + + private void setWriteMode() { + if (!readMode) { + buf.limit(buf.capacity()); + return; + } + readPos = buf.position(); + buf.limit(buf.capacity()); + buf.position(writePos); + readMode = false; + } + + private void ensureFree(int minFree) { + // assumes setWriteMode called just before + if (buf.remaining() >= minFree) { + return; + } + writePos = buf.position(); + int used = writePos - readPos; + int free = buf.remaining() + readPos; + if (free >= minFree && free >= used) { + buf.position(readPos); + buf.limit(writePos); + buf.compact(); + readPos = 0; + } else { + int size = buf.capacity() * 2; + if (buf.capacity() + free < minFree) { + size = buf.capacity() + minFree; + } + ByteBuffer tmp = ByteBuffer.allocate(size); + tmp.order(buf.order()); + buf.position(readPos); + buf.limit(writePos); + tmp.put(buf); + buf = tmp; + readPos = 0; + } + } + + public Buffer(int size) { + buf = ByteBuffer.allocate(size); + readPos = 0; + writePos = 0; + readMode = false; + } + + public boolean shrink(int size) { + int rpos = readMode? buf.position() : readPos; + int wpos = readMode? writePos : buf.position(); + int used = wpos - rpos; + if (used > size || buf.capacity() <= size) { + return false; + } + ByteBuffer tmp = ByteBuffer.allocate(size); + tmp.order(buf.order()); + buf.position(rpos); + buf.limit(wpos); + tmp.put(buf); + buf = tmp; + readPos = 0; + writePos = used; + buf.position(readMode? readPos : writePos); + buf.limit(readMode? writePos : buf.capacity()); + return true; + } + + public int bytes() { + return (readMode) + ? (writePos - buf.position()) + : (buf.position() - readPos); + } + + public ByteBuffer getReadable() { + setReadMode(); + return buf; + } + + public ByteBuffer getWritable(int minFree) { + setWriteMode(); + ensureFree(minFree); + return buf; + } + + public ByteBuffer getChannelReadable() { + ByteBuffer bb = getReadable(); + if (bb.remaining() > MAX_IO) { + bb.limit(bb.position() + MAX_IO); + } + return bb; + } + + public ByteBuffer getChannelWritable(int minFree) { + ByteBuffer bb = getWritable(minFree); + if (bb.remaining() > MAX_IO) { + bb.limit(bb.position() + MAX_IO); + } + return bb; + } +} diff --git a/jrt/src/com/yahoo/jrt/Closer.java b/jrt/src/com/yahoo/jrt/Closer.java new file mode 100644 index 00000000000..2c49d0abdaa --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Closer.java @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class Closer { + + private class Run implements Runnable { + public void run() { + try { + Closer.this.run(); + } catch (Throwable problem) { + parent.handleFailure(problem, Closer.this); + } + } + } + + private Thread thread = new Thread(new Run(), "<closer>"); + private Transport parent; + private ThreadQueue closeQueue = new ThreadQueue(); + + public Closer(Transport parent) { + this.parent = parent; + thread.setDaemon(true); + thread.start(); + } + + public void closeLater(Connection c) { + if (!closeQueue.enqueue(c)) { + c.closeSocket(); + } + } + + private void run() { + try { + while (true) { + ((Connection)closeQueue.dequeue()).closeSocket(); + } + } catch (EndOfQueueException e) {} + } + + public Closer shutdown() { + closeQueue.close(); + return this; + } + + public void join() { + while (true) { + try { + thread.join(); + return; + } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java new file mode 100644 index 00000000000..7affa875cd6 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -0,0 +1,405 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + + +class Connection extends Target { + + private static Logger log = Logger.getLogger(Connection.class.getName()); + + private static final int READ_SIZE = 8192; + private static final int READ_REDO = 10; + private static final int WRITE_SIZE = 8192; + private static final int WRITE_REDO = 10; + + private static final int INITIAL = 0; + private static final int CONNECTED = 1; + private static final int CLOSED = 2; + + private int state = INITIAL; + private Queue queue = new Queue(); + private Queue myQueue = new Queue(); + private Buffer input = new Buffer(READ_SIZE * 2); + private Buffer output = new Buffer(WRITE_SIZE * 2); + private int maxInputSize = 64*1024; + private int maxOutputSize = 64*1024; + private Map<Integer, ReplyHandler> replyMap + = new HashMap<Integer, ReplyHandler>(); + private Map<TargetWatcher, TargetWatcher> watchers + = new IdentityHashMap<TargetWatcher, TargetWatcher>(); + private int activeReqs = 0; + private int writeWork = 0; + private Transport parent; + private Supervisor owner; + private Spec spec; + private SocketChannel channel; + private boolean server; + private AtomicLong requestId = new AtomicLong(0); + private SelectionKey selectionKey; + private Exception lostReason = null; + + private void setState(int state) { + if (state <= this.state) { + log.log(Level.WARNING, "Bogus state transition: " + + this.state + "->" + state); + return; + } + boolean live = (this.state == INITIAL && state == CONNECTED); + boolean down = (this.state != CLOSED && state == CLOSED); + boolean fini; + boolean pendingWrite; + synchronized (this) { + this.state = state; + fini = down && (activeReqs == 0); + pendingWrite = (writeWork > 0); + } + if (live) { + if (pendingWrite) { + enableWrite(); + } + owner.sessionLive(this); + } + if (down) { + for (ReplyHandler rh : replyMap.values()) { + rh.handleConnectionDown(); + } + for (TargetWatcher watcher : watchers.values()) { + watcher.notifyTargetInvalid(this); + } + owner.sessionDown(this); + } + if (fini) { + owner.sessionFini(this); + } + } + + public Connection(Transport parent, Supervisor owner, + SocketChannel channel) { + + this.parent = parent; + this.owner = owner; + this.channel = channel; + server = true; + owner.sessionInit(this); + } + + public Connection(Transport parent, Supervisor owner, + Spec spec, Object context) { + super(context); + this.parent = parent; + this.owner = owner; + this.spec = spec; + server = false; + owner.sessionInit(this); + } + + public void setMaxInputSize(int bytes) { + maxInputSize = bytes; + } + + public void setMaxOutputSize(int bytes) { + maxOutputSize = bytes; + } + + public Transport transport() { + return parent; + } + + public int allocateKey() { + long v = requestId.getAndIncrement(); + v = v*2 + (server ? 1 : 0); + int i = (int)(v & 0x7fffffff); + return i; + } + + public synchronized boolean cancelReply(ReplyHandler handler) { + if (state == CLOSED) { + return false; + } + ReplyHandler stored = replyMap.remove(handler.key()); + if (stored != handler) { + if (stored != null) { + replyMap.put(handler.key(), stored); + } + return false; + } + return true; + } + + public boolean postPacket(Packet packet, ReplyHandler handler) { + boolean accepted = false; + boolean enableWrite = false; + synchronized (this) { + if (state <= CONNECTED) { + enableWrite = (writeWork == 0 && state == CONNECTED); + queue.enqueue(packet); + writeWork++; + accepted = true; + if (handler != null) { + replyMap.put(handler.key(), handler); + } + } + } + if (enableWrite) { + parent.enableWrite(this); + } + return accepted; + } + + public boolean postPacket(Packet packet) { + return postPacket(packet, null); + } + + public Connection connect() { + if (spec == null || spec.malformed()) { + setLostReason(new IllegalArgumentException("jrt: malformed or missing spec")); + return this; + } + try { + channel = SocketChannel.open(spec.address()); + } catch (Exception e) { + setLostReason(e); + } + return this; + } + + public boolean init(Selector selector) { + if (channel == null) { + return false; + } + try { + channel.configureBlocking(false); + channel.socket().setTcpNoDelay(true); + selectionKey = channel.register(selector, + SelectionKey.OP_READ, + this); + } catch (Exception e) { + log.log(Level.WARNING, "Error initializing connection", e); + setLostReason(e); + return false; + } + setState(CONNECTED); + return true; + } + + public void enableWrite() { + selectionKey.interestOps(selectionKey.interestOps() + | SelectionKey.OP_WRITE); + } + + public void disableWrite() { + selectionKey.interestOps(selectionKey.interestOps() + & ~SelectionKey.OP_WRITE); + } + + public void read() throws IOException { + boolean doneRead = false; + for (int i = 0; !doneRead && i < READ_REDO; i++) { + ByteBuffer wb = input.getChannelWritable(READ_SIZE); + if (channel.read(wb) == -1) { + throw new IOException("jrt: Connection closed by peer"); + } + doneRead = (wb.remaining() > 0); + ByteBuffer rb = input.getReadable(); + while (true) { + PacketInfo info = PacketInfo.getPacketInfo(rb); + if (info == null || info.packetLength() > rb.remaining()) { + break; + } + owner.readPacket(info); + Packet packet; + try { + packet = info.decodePacket(rb); + } catch (RuntimeException e) { + log.log(Level.WARNING, "got garbage; closing connection: " + toString()); + throw new IOException("jrt: decode error", e); + } + ReplyHandler handler; + synchronized (this) { + handler = replyMap.remove(packet.requestId()); + } + if (handler != null) { + handler.handleReply(packet); + } else { + owner.handlePacket(this, packet); + } + } + } + if (maxInputSize > 0) { + input.shrink(maxInputSize); + } + } + + public void write() throws IOException { + synchronized (this) { + queue.flush(myQueue); + } + for (int i = 0; i < WRITE_REDO; i++) { + while (output.bytes() < WRITE_SIZE) { + Packet packet = (Packet) myQueue.dequeue(); + if (packet == null) { + break; + } + PacketInfo info = packet.getPacketInfo(); + ByteBuffer wb = output.getWritable(info.packetLength()); + owner.writePacket(info); + info.encodePacket(packet, wb); + } + ByteBuffer rb = output.getChannelReadable(); + if (rb.remaining() == 0) { + break; + } + channel.write(rb); + if (rb.remaining() > 0) { + break; + } + } + boolean disableWrite; + synchronized (this) { + writeWork = queue.size() + + myQueue.size() + + ((output.bytes() > 0) ? 1 : 0); + disableWrite = (writeWork == 0); + } + if (disableWrite) { + disableWrite(); + } + if (maxOutputSize > 0) { + output.shrink(maxOutputSize); + } + } + + public void fini() { + setState(CLOSED); + if (selectionKey != null) { + selectionKey.cancel(); + } + } + + public boolean isClosed() { + return (state == CLOSED); + } + + public boolean hasSocket() { + return (channel != null); + } + + public void closeSocket() { + if (channel != null) { + try { + channel.socket().close(); + } catch (Exception e) { + log.log(Level.WARNING, "Error closing connection", e); + } + } + } + + public void setLostReason(Exception e) { + if (lostReason == null) { + lostReason = e; + } + } + + public TieBreaker startRequest() { + synchronized (this) { + activeReqs++; + } + return new TieBreaker(); + } + + public boolean completeRequest(TieBreaker done) { + boolean signalFini = false; + synchronized (this) { + if (!done.first()) { + return false; + } + if (--activeReqs == 0 && state == CLOSED) { + signalFini = true; + } + } + if (signalFini) { + owner.sessionFini(this); + } + return true; + } + + /////////////////////////////////////////////////////////////////////////// + // Methods defined in the Target superclass + /////////////////////////////////////////////////////////////////////////// + + public boolean isValid() { + return (state != CLOSED); + } + + public Exception getConnectionLostReason() { + return lostReason; + } + + public boolean isClient() { + return !server; + } + + public boolean isServer() { + return server; + } + + public void invokeSync(Request req, double timeout) { + SingleRequestWaiter waiter = new SingleRequestWaiter(); + invokeAsync(req, timeout, waiter); + waiter.waitDone(); + } + + public void invokeAsync(Request req, double timeout, + RequestWaiter waiter) { + if (timeout < 0.0) { + timeout = 0.0; + } + new InvocationClient(this, req, timeout, waiter).invoke(); + } + + public boolean invokeVoid(Request req) { + return postPacket(new RequestPacket(Packet.FLAG_NOREPLY, + allocateKey(), + req.methodName(), + req.parameters())); + } + + public synchronized boolean addWatcher(TargetWatcher watcher) { + if (state == CLOSED) { + return false; + } + watchers.put(watcher, watcher); + return true; + } + + public synchronized boolean removeWatcher(TargetWatcher watcher) { + if (state == CLOSED) { + return false; + } + watchers.remove(watcher); + return true; + } + + public void close() { + parent.closeConnection(this); + } + + public String toString() { + if (channel != null) { + return "Connection { " + channel.socket() + " }"; + } + return "Connection { no socket }"; + } +} diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java new file mode 100644 index 00000000000..fa43710b1f6 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -0,0 +1,76 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class Connector { + + private class Run implements Runnable { + public void run() { + try { + Connector.this.run(); + } catch (Throwable problem) { + parent.handleFailure(problem, Connector.this); + } + } + } + + private Thread thread = new Thread(new Run(), "<connector>"); + private Transport parent; + private ThreadQueue connectQueue = new ThreadQueue(); + private boolean done = false; + private boolean exit = false; + + public Connector(Transport parent) { + this.parent = parent; + thread.setDaemon(true); + thread.start(); + } + + public void connectLater(Connection c) { + if (!connectQueue.enqueue(c)) { + parent.addConnection(c); + } + } + + private void run() { + try { + while (true) { + Connection conn = (Connection) connectQueue.dequeue(); + parent.addConnection(conn.connect()); + } + } catch (EndOfQueueException e) {} + synchronized (this) { + done = true; + notifyAll(); + while (!exit) { + try { wait(); } catch (InterruptedException x) {} + } + } + } + + public Connector shutdown() { + connectQueue.close(); + return this; + } + + public synchronized void waitDone() { + while (!done) { + try { wait(); } catch (InterruptedException x) {} + } + } + + public synchronized Connector exit() { + exit = true; + notifyAll(); + return this; + } + + public void join() { + while (true) { + try { + thread.join(); + return; + } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/src/com/yahoo/jrt/DataArray.java b/jrt/src/com/yahoo/jrt/DataArray.java new file mode 100644 index 00000000000..8871b5df6da --- /dev/null +++ b/jrt/src/com/yahoo/jrt/DataArray.java @@ -0,0 +1,64 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * Data array (an array of byte sequences) + **/ +public class DataArray extends Value +{ + private byte[][] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public DataArray(byte[][] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + DataArray(ByteBuffer src) { + int size = src.getInt(); + value = new byte[size][]; + for (int i = 0; i < size; i++) { + value[i] = new byte[src.getInt()]; + src.get(value[i]); + } + } + + /** + * @return DATA_ARRAY + **/ + public byte type() { return DATA_ARRAY; } + public int count() { return value.length; } + + int bytes() { + int bytes = 4; + for (int i = 0; i < value.length; i++) { + bytes += 4 + value[i].length; + } + return bytes; + } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + for (int i = 0; i < value.length; i++) { + dst.putInt(value[i].length); + dst.put(value[i]); + } + } + + public byte[][] asDataArray() { return value; } + + public @Override String toString() { + return Arrays.toString(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/DataValue.java b/jrt/src/com/yahoo/jrt/DataValue.java new file mode 100644 index 00000000000..284bf2e6a2d --- /dev/null +++ b/jrt/src/com/yahoo/jrt/DataValue.java @@ -0,0 +1,51 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +/** + * Data value (a sequence of bytes) + **/ +public class DataValue extends Value +{ + private byte[] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public DataValue(byte[] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + DataValue(ByteBuffer src) { + int size = src.getInt(); + value = new byte[size]; + src.get(value); + } + + /** + * @return DATA + **/ + public byte type() { return DATA; } + public int count() { return 1; } + + int bytes() { return 4 + value.length; } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + dst.put(value); + } + + public byte[] asData() { return value; } + + public @Override String toString() { + return String.valueOf(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/DoubleArray.java b/jrt/src/com/yahoo/jrt/DoubleArray.java new file mode 100644 index 00000000000..3758abe9873 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/DoubleArray.java @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 64-bit floating-point array + **/ +public class DoubleArray extends Value +{ + private double[] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public DoubleArray(double[] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + DoubleArray(ByteBuffer src) { + int size = src.getInt(); + value = new double[size]; + src.asDoubleBuffer().get(value); + src.position(src.position() + size * 8); + } + + /** + * @return DOUBLE_ARRAY + **/ + public byte type() { return DOUBLE_ARRAY; } + public int count() { return value.length; } + + int bytes() { return 4 + value.length * 8; } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + dst.asDoubleBuffer().put(value); + dst.position(dst.position() + value.length * 8); + } + + public double[] asDoubleArray() { return value; } + + public @Override String toString() { + return Arrays.toString(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/DoubleValue.java b/jrt/src/com/yahoo/jrt/DoubleValue.java new file mode 100644 index 00000000000..e7739e80150 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/DoubleValue.java @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +/** + * 64-bit floating-point value + **/ +public class DoubleValue extends Value +{ + private double value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public DoubleValue(double value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + DoubleValue(ByteBuffer src) { + value = src.getDouble(); + } + + /** + * @return DOUBLE + **/ + public byte type() { return DOUBLE; } + public int count() { return 1; } + + int bytes() { return 8; } + void encode(ByteBuffer dst) { + dst.putDouble(value); + } + + public double asDouble() { return value; } + + public @Override String toString() { + return String.valueOf(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/EndOfQueueException.java b/jrt/src/com/yahoo/jrt/EndOfQueueException.java new file mode 100644 index 00000000000..67ba53fd5d2 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/EndOfQueueException.java @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Checked exception thrown when someone tries to dequeue an object + * from a closed and empty {@link ThreadQueue}. + **/ +class EndOfQueueException extends Exception { + + private static final long serialVersionUID = 1L; + + /** + * Create a EndOfQueueException. + **/ + EndOfQueueException() { + super(); + } +} diff --git a/jrt/src/com/yahoo/jrt/ErrorCode.java b/jrt/src/com/yahoo/jrt/ErrorCode.java new file mode 100644 index 00000000000..38c60bbd6fd --- /dev/null +++ b/jrt/src/com/yahoo/jrt/ErrorCode.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * This class contains the error codes defined by the RPC + * protocol. The error code associated with a request is obtained + * through the {@link Request#errorCode} method. Note that according to + * the RPC protocol, applications may define custom error codes with + * values 65536 (0x10000) and greater. + **/ +public class ErrorCode +{ + /** No error (0) **/ + public static final int NONE = 0; + + /** General error (100) **/ + public static final int GENERAL_ERROR = 100; + + /** Not implemented (101) **/ + public static final int NOT_IMPLEMENTED = 101; + + /** Invocation aborted (102) **/ + public static final int ABORT = 102; + + /** Invocation timed out (103) **/ + public static final int TIMEOUT = 103; + + /** Connection error (104) **/ + public static final int CONNECTION = 104; + + /** Bad request packet (105) **/ + public static final int BAD_REQUEST = 105; + + /** No such method (106) **/ + public static final int NO_SUCH_METHOD = 106; + + /** Illegal parameters (107) **/ + public static final int WRONG_PARAMS = 107; + + /** Request dropped due to server overload (108) **/ + public static final int OVERLOAD = 108; + + /** Illegal return values (109) **/ + public static final int WRONG_RETURN = 109; + + /** Bad reply packet (110) **/ + public static final int BAD_REPLY = 110; + + /** Method failed (111) **/ + public static final int METHOD_FAILED = 111; +} diff --git a/jrt/src/com/yahoo/jrt/ErrorPacket.java b/jrt/src/com/yahoo/jrt/ErrorPacket.java new file mode 100644 index 00000000000..fa0ac835826 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/ErrorPacket.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +class ErrorPacket extends Packet +{ + private int errorCode; + private StringValue errorMessage; + + public ErrorPacket(int flags, int reqId, + int errorCode, + String errorMessage) + { + super(flags, reqId); + this.errorCode = errorCode; + this.errorMessage = new StringValue(errorMessage); + } + + public ErrorPacket(int flags, int reqId, + ByteBuffer src) + { + super(flags, reqId); + errorCode = src.getInt(); + errorMessage = new StringValue(src); + } + + public int bytes() { + return (headerLength + + 4 + + errorMessage.bytes()); + } + + public int packetCode() { + return PCODE_ERROR; + } + + public void encode(ByteBuffer dst) { + dst.putInt(errorCode); + errorMessage.encode(dst); + } + + public int errorCode() { + return errorCode; + } + + public String errorMessage() { + return errorMessage.asString(); + } +} diff --git a/jrt/src/com/yahoo/jrt/FatalErrorHandler.java b/jrt/src/com/yahoo/jrt/FatalErrorHandler.java new file mode 100644 index 00000000000..e3a17ac0151 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/FatalErrorHandler.java @@ -0,0 +1,21 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Interface used to report fatal errors (internal thread + * unwinding). If the application wants to be notified of these + * errors, it must register a handler with the {@link Transport} + * constructor. + **/ +public interface FatalErrorHandler { + + /** + * Invoked when an internal thread crashes due to thread + * unwinding. + * + * @param problem the throwable causing the problem + * @param context the object owning the crashed thread + **/ + public void handleFailure(Throwable problem, Object context); +} diff --git a/jrt/src/com/yahoo/jrt/FloatArray.java b/jrt/src/com/yahoo/jrt/FloatArray.java new file mode 100644 index 00000000000..05556e94276 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/FloatArray.java @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 32-bit floating-point array + **/ +public class FloatArray extends Value +{ + private float[] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public FloatArray(float[] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + FloatArray(ByteBuffer src) { + int size = src.getInt(); + value = new float[size]; + src.asFloatBuffer().get(value); + src.position(src.position() + size * 4); + } + + /** + * @return FLOAT_ARRAY + **/ + public byte type() { return FLOAT_ARRAY; } + public int count() { return value.length; } + + int bytes() { return 4 + value.length * 4; } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + dst.asFloatBuffer().put(value); + dst.position(dst.position() + value.length * 4); + } + + public float[] asFloatArray() { return value; } + + public @Override String toString() { + return Arrays.toString(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/FloatValue.java b/jrt/src/com/yahoo/jrt/FloatValue.java new file mode 100644 index 00000000000..bfba69f0965 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/FloatValue.java @@ -0,0 +1,49 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 32-bit floating-point value + **/ +public class FloatValue extends Value +{ + private float value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public FloatValue(float value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + FloatValue(ByteBuffer src) { + value = src.getFloat(); + } + + /** + * @return FLOAT + **/ + public byte type() { return FLOAT; } + public int count() { return 1; } + + int bytes() { return 4; } + void encode(ByteBuffer dst) { + dst.putFloat(value); + } + + public float asFloat() { return value; } + + public @Override String toString() { + return String.valueOf(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/Int16Array.java b/jrt/src/com/yahoo/jrt/Int16Array.java new file mode 100644 index 00000000000..fbb7f304872 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int16Array.java @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 16-bit integer array + **/ +public class Int16Array extends Value +{ + private short[] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int16Array(short[] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int16Array(ByteBuffer src) { + int size = src.getInt(); + value = new short[size]; + src.asShortBuffer().get(value); + src.position(src.position() + size * 2); + } + + /** + * @return INT16_ARRAY + **/ + public byte type() { return INT16_ARRAY; } + public int count() { return value.length; } + + int bytes() { return 4 + value.length * 2; } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + dst.asShortBuffer().put(value); + dst.position(dst.position() + value.length * 2); + } + + public short[] asInt16Array() { return value; } + + public @Override String toString() { + return Arrays.toString(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/Int16Value.java b/jrt/src/com/yahoo/jrt/Int16Value.java new file mode 100644 index 00000000000..45c10e09f76 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int16Value.java @@ -0,0 +1,49 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 16-bit integer value + **/ +public class Int16Value extends Value +{ + private short value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int16Value(short value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int16Value(ByteBuffer src) { + value = src.getShort(); + } + + /** + * @return INT16 + **/ + public byte type() { return INT16; } + public int count() { return 1; } + + int bytes() { return 2; } + void encode(ByteBuffer dst) { + dst.putShort(value); + } + + public short asInt16() { return value; } + + public @Override String toString() { + return String.valueOf(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/Int32Array.java b/jrt/src/com/yahoo/jrt/Int32Array.java new file mode 100644 index 00000000000..dddcaf83ddc --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int32Array.java @@ -0,0 +1,55 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 32-bit integer array + **/ +public class Int32Array extends Value +{ + private int[] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int32Array(int[] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int32Array(ByteBuffer src) { + int size = src.getInt(); + value = new int[size]; + src.asIntBuffer().get(value); + src.position(src.position() + size * 4); + } + + /** + * @return INT32_ARRAY + **/ + public byte type() { return INT32_ARRAY; } + public int count() { return value.length; } + + int bytes() { return 4 + value.length * 4; } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + dst.asIntBuffer().put(value); + dst.position(dst.position() + value.length * 4); + } + + public int[] asInt32Array() { return value; } + + public @Override String toString() { + return Arrays.toString(value); + } + + +} diff --git a/jrt/src/com/yahoo/jrt/Int32Value.java b/jrt/src/com/yahoo/jrt/Int32Value.java new file mode 100644 index 00000000000..0a57014431a --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int32Value.java @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +/** + * 32-bit integer value + **/ +public class Int32Value extends Value +{ + private int value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int32Value(int value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int32Value(ByteBuffer src) { + value = src.getInt(); + } + + /** + * @return INT32 + **/ + public byte type() { return INT32; } + public int count() { return 1; } + + int bytes() { return 4; } + void encode(ByteBuffer dst) { + dst.putInt(value); + } + + public int asInt32() { return value; } + + public @Override String toString() { + return String.valueOf(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/Int64Array.java b/jrt/src/com/yahoo/jrt/Int64Array.java new file mode 100644 index 00000000000..00427b9eff4 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int64Array.java @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 64-bit integer array + **/ +public class Int64Array extends Value +{ + private long[] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int64Array(long[] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int64Array(ByteBuffer src) { + int size = src.getInt(); + value = new long[size]; + src.asLongBuffer().get(value); + src.position(src.position() + size * 8); + } + + /** + * @return INT64_ARRAY + **/ + public byte type() { return INT64_ARRAY; } + public int count() { return value.length; } + + int bytes() { return 4 + value.length * 8; } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + dst.asLongBuffer().put(value); + dst.position(dst.position() + value.length * 8); + } + + public long[] asInt64Array() { return value; } + + public @Override String toString() { + return Arrays.toString(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/Int64Value.java b/jrt/src/com/yahoo/jrt/Int64Value.java new file mode 100644 index 00000000000..27c5cd5ff45 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int64Value.java @@ -0,0 +1,49 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +/** + * 64-bit integer value + **/ +public class Int64Value extends Value +{ + private long value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int64Value(long value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int64Value(ByteBuffer src) { + value = src.getLong(); + } + + /** + * @return INT64 + **/ + public byte type() { return INT64; } + public int count() { return 1; } + + int bytes() { return 8; } + void encode(ByteBuffer dst) { + dst.putLong(value); + } + + public long asInt64() { return value; } + + public @Override String toString() { + return String.valueOf(value); + } + + +} diff --git a/jrt/src/com/yahoo/jrt/Int8Array.java b/jrt/src/com/yahoo/jrt/Int8Array.java new file mode 100644 index 00000000000..4dc18a75d8e --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int8Array.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * 8-bit integer array + **/ +public class Int8Array extends Value +{ + private byte[] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int8Array(byte[] value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int8Array(ByteBuffer src) { + int size = src.getInt(); + value = new byte[size]; + src.get(value); + } + + /** + * @return INT8_ARRAY + **/ + public byte type() { return INT8_ARRAY; } + public int count() { return value.length; } + + int bytes() { return 4 + value.length; } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + dst.put(value); + } + + public byte[] asInt8Array() { return value; } + + public @Override String toString() { + return Arrays.toString(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/Int8Value.java b/jrt/src/com/yahoo/jrt/Int8Value.java new file mode 100644 index 00000000000..e62460f4bd8 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Int8Value.java @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +/** + * 8-bit integer value + **/ +public class Int8Value extends Value +{ + private byte value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public Int8Value(byte value) { this.value = value; } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + Int8Value(ByteBuffer src) { + value = src.get(); + } + + /** + * @return INT8 + **/ + public byte type() { return INT8; } + public int count() { return 1; } + + int bytes() { return 1; } + void encode(ByteBuffer dst) { + dst.put(value); + } + + public byte asInt8() { return value; } + + public @Override String toString() { + return String.valueOf(value); + } + +} diff --git a/jrt/src/com/yahoo/jrt/InvocationClient.java b/jrt/src/com/yahoo/jrt/InvocationClient.java new file mode 100644 index 00000000000..3d75148793d --- /dev/null +++ b/jrt/src/com/yahoo/jrt/InvocationClient.java @@ -0,0 +1,95 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class InvocationClient implements ReplyHandler, Runnable { + + Connection conn; + Request req; + double timeout; + RequestWaiter reqWaiter; + Integer replyKey; + Task timeoutTask; + + public InvocationClient(Connection conn, Request req, + double timeout, RequestWaiter waiter) { + + this.conn = conn; + this.req = req; + this.timeout = timeout; + this.reqWaiter = waiter; + req.clientHandler(this); + + this.replyKey = conn.allocateKey(); + this.timeoutTask = conn.transport().createTask(this); + } + + public void invoke() { + if (!conn.postPacket(new RequestPacket(0, + replyKey.intValue(), + req.methodName(), + req.parameters()), this)) { + req.setError(ErrorCode.CONNECTION, "Connection error"); + reqWaiter.handleRequestDone(req); + return; + } + timeoutTask.schedule(timeout); + } + + public Integer key() { + return replyKey; + } + + /** + * Handle normal packet reply. The reply may contain either return + * values or an error. + **/ + public void handleReply(Packet packet) { + timeoutTask.kill(); + if (packet == null) { + req.setError(ErrorCode.BAD_REPLY, "Bad reply packet"); + } else { + int pcode = packet.packetCode(); + if (pcode == Packet.PCODE_REPLY) { + ReplyPacket rp = (ReplyPacket) packet; + req.returnValues(rp.returnValues()); + } else if (pcode == Packet.PCODE_ERROR) { + ErrorPacket ep = (ErrorPacket) packet; + req.setError(ep.errorCode(), ep.errorMessage()); + } + } + reqWaiter.handleRequestDone(req); + } + + /** + * Handle user abort. + **/ + public void handleAbort() { + if (!conn.cancelReply(this)) { + return; + } + timeoutTask.kill(); + req.setError(ErrorCode.ABORT, "Aborted by user"); + reqWaiter.handleRequestDone(req); + } + + /** + * Handle connection down. + **/ + public void handleConnectionDown() { + timeoutTask.kill(); + req.setError(ErrorCode.CONNECTION, "Connection error"); + reqWaiter.handleRequestDone(req); + } + + /** + * Handle timeout. + **/ + public void run() { + if (!conn.cancelReply(this)) { + return; + } + req.setError(ErrorCode.TIMEOUT, "Request timed out"); + reqWaiter.handleRequestDone(req); + } +} diff --git a/jrt/src/com/yahoo/jrt/InvocationServer.java b/jrt/src/com/yahoo/jrt/InvocationServer.java new file mode 100644 index 00000000000..5f39bdebe97 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/InvocationServer.java @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class InvocationServer { + + private Connection conn; + private Request request; + private Method method; + private int replyKey; + private boolean noReply; + private TieBreaker done; + + public InvocationServer(Connection conn, Request request, Method method, + int replyKey, boolean noReply) { + + this.conn = conn; + this.request = request; + this.method = method; + this.replyKey = replyKey; + this.noReply = noReply; + request.serverHandler(this); + + done = conn.startRequest(); + } + + public Target getTarget() { + return conn; + } + + public void invoke() { + if (method != null) { + if (method.checkParameters(request)) { + method.invoke(request); + } else { + request.setError(ErrorCode.WRONG_PARAMS, "Parameters in " + request + " does not match " + method); + } + } else { + request.setError(ErrorCode.NO_SUCH_METHOD, "No such method"); + } + if (!request.isDetached()) { + returnRequest(); + } + } + + public void returnRequest() { + if (!conn.completeRequest(done)) { + throw new IllegalStateException("Request already returned"); + } + if (noReply) { + return; + } + if (!request.isError() && !method.checkReturnValues(request)) { + request.setError(ErrorCode.WRONG_RETURN, "Return values in " + request + " does not match " + method); + } + if (request.isError()) { + conn.postPacket(new ErrorPacket(0, replyKey, + request.errorCode(), + request.errorMessage())); + } else { + conn.postPacket(new ReplyPacket(0, replyKey, + request.returnValues())); + } + } +} diff --git a/jrt/src/com/yahoo/jrt/InvokeProxy.java b/jrt/src/com/yahoo/jrt/InvokeProxy.java new file mode 100644 index 00000000000..944871e2ec0 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/InvokeProxy.java @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + +/** + * Invokes asynchronous JRT requests in a blocking method, that can be aborted by calling shutdown(). + * This can be used when one would like to use invokeSync(), but when abortion is required. + * <p> + * This class is thread safe. + * + * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + */ +//@ThreadSafe +public class InvokeProxy { + + private boolean active = true; + private Request pending = null; + + public Request invoke(Request req, Target target, double timeout) { + SingleRequestWaiter waiter = null; + synchronized (this) { + if (active) { + waiter = new SingleRequestWaiter(); + target.invokeAsync(req, timeout, waiter); + pending = req; + } else { + req.setError(ErrorCode.ABORT, "Aborted by user"); + } + } + if (waiter != null) { + waiter.waitDone(); + synchronized (this) { + pending = null; + } + } + return req; + } + + public void shutdown() { + synchronized (this) { + active = false; + if (pending != null) { + pending.abort(); + } + } + } + +} diff --git a/jrt/src/com/yahoo/jrt/ListenFailedException.java b/jrt/src/com/yahoo/jrt/ListenFailedException.java new file mode 100644 index 00000000000..098b9761a00 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/ListenFailedException.java @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Checked exception thrown when listening fails. + * @see Supervisor#listen + **/ +public class ListenFailedException extends Exception { + + private static final long serialVersionUID = 1L; + + /** + * Create a ListenFailedException with the given message. + * + * @param msg the exception message + **/ + ListenFailedException(String msg) { + super(msg); + } + + /** + * Create a ListenFailedException with the given message and cause. + * + * @param msg the exception message + * @param cause what caused this exception + **/ + ListenFailedException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/jrt/src/com/yahoo/jrt/MandatoryMethods.java b/jrt/src/com/yahoo/jrt/MandatoryMethods.java new file mode 100644 index 00000000000..f700a81729c --- /dev/null +++ b/jrt/src/com/yahoo/jrt/MandatoryMethods.java @@ -0,0 +1,98 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.util.Iterator; + + +class MandatoryMethods { + + Supervisor parent; + + public MandatoryMethods(Supervisor parent) { + this.parent = parent; + //--------------------------------------------------------------------- + Method m; + //--------------------------------------------------------------------- + m = new Method("frt.rpc.ping", "", "", this, "ping"); + m.methodDesc("Method that may be used to " + + "check if the server is online"); + parent.addMethod(m); + //--------------------------------------------------------------------- + m = new Method("frt.rpc.getMethodList", "", "SSS", this, + "getMethodList"); + m.methodDesc("Obtain a list of all available methods"); + m.returnDesc(0, "names", "Method names"); + m.returnDesc(1, "params", "Method parameter types"); + m.returnDesc(2, "return", "Method return types"); + parent.addMethod(m); + //--------------------------------------------------------------------- + m = new Method("frt.rpc.getMethodInfo", "s", "sssSSSS", this, + "getMethodInfo"); + m.methodDesc("Obtain detailed information about a single method"); + m.paramDesc (0, "methodName", "The method we want information about"); + m.returnDesc(0, "desc", "Description of what the method does"); + m.returnDesc(1, "params", "Method parameter types"); + m.returnDesc(2, "return", "Method return values"); + m.returnDesc(3, "paramNames", "Method parameter names"); + m.returnDesc(4, "paramDesc", "Method parameter descriptions"); + m.returnDesc(5, "returnNames", "Method return value names"); + m.returnDesc(6, "returnDesc", "Method return value descriptions"); + parent.addMethod(m); + //--------------------------------------------------------------------- + } + + public void ping(Request req) { + // no code needed :) + } + + public void getMethodList(Request req) { + int cnt = parent.methodMap().size(); + String[] ret0_names = new String[cnt]; + String[] ret1_params = new String[cnt]; + String[] ret2_return = new String[cnt]; + + int i = 0; + Iterator<Method> itr = parent.methodMap().values().iterator(); + while (itr.hasNext()) { + Method m = itr.next(); + ret0_names[i] = m.name(); + ret1_params[i] = m.paramTypes(); + ret2_return[i] = m.returnTypes(); + i++; + } + req.returnValues().add(new StringArray(ret0_names)); + req.returnValues().add(new StringArray(ret1_params)); + req.returnValues().add(new StringArray(ret2_return)); + } + + public void getMethodInfo(Request req) { + Method method = parent.methodMap().get(req.parameters().get(0).asString()); + if (method == null) { + req.setError(ErrorCode.METHOD_FAILED, "No Such Method"); + return; + } + req.returnValues().add(new StringValue(method.methodDesc())); + req.returnValues().add(new StringValue(method.paramTypes())); + req.returnValues().add(new StringValue(method.returnTypes())); + + int paramCnt = method.paramTypes().length(); + int returnCnt = method.returnTypes().length(); + String[] ret3_paramName = new String[paramCnt]; + String[] ret4_paramDesc = new String[paramCnt]; + String[] ret5_returnName = new String[returnCnt]; + String[] ret6_returnDesc = new String[returnCnt]; + for (int i = 0; i < paramCnt; i++) { + ret3_paramName[i] = method.paramName(i); + ret4_paramDesc[i] = method.paramDesc(i); + } + for (int i = 0; i < returnCnt; i++) { + ret5_returnName[i] = method.returnName(i); + ret6_returnDesc[i] = method.returnDesc(i); + } + req.returnValues().add(new StringArray(ret3_paramName)); + req.returnValues().add(new StringArray(ret4_paramDesc)); + req.returnValues().add(new StringArray(ret5_returnName)); + req.returnValues().add(new StringArray(ret6_returnDesc)); + } +} diff --git a/jrt/src/com/yahoo/jrt/Method.java b/jrt/src/com/yahoo/jrt/Method.java new file mode 100644 index 00000000000..d752e33b0d8 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Method.java @@ -0,0 +1,290 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * <p>A Method encapsulates the reflective information about a single RPC + * method.</p> + * + * <p>Method parameters and return values are declared with type + * strings. A <i>type string</i> denotes the concatenation of type + * identifiers where a string is used to represent a sequence of + * types. For example 'ii' is the type string for two 32-bit integers, + * while 'iss' is the type string for a single 32-bit integer followed + * by two strings. The complete list of type identifiers can be found + * in the {@link Value} class.</p> + * + * <p>The type strings associated with actual method parameters and + * return values may only contain valid type identifiers. However, + * when you specify the parameters accepted by- or returned from a RPC + * method via the Method constructor, '*' may be used as the last + * character in the type string. Ending a type string specification + * with '*' means that additional values are optional and may have any + * type. This feature can also be used with the {@link + * Request#checkReturnTypes Request.checkReturnTypes} method when + * verifying return types.</p> + * + * @see Supervisor#addMethod + **/ +public class Method { + + private MethodHandler handler; // option 1: interface + private Object object; // option 2: reflection + private java.lang.reflect.Method method; // option 2: reflection + + private String name; + private String paramTypes; + private String returnTypes; + + private String desc; + private String[] paramName; + private String[] paramDesc; + private String[] returnName; + private String[] returnDesc; + + private static final String undocumented = "???"; + private static final Class<?>[] handlerParams = { Request.class }; + + + private void init(String name, String paramTypes, String returnTypes) { + this.name = name; + this.paramTypes = paramTypes; + this.returnTypes = returnTypes; + desc = undocumented; + paramName = new String[this.paramTypes.length()]; + paramDesc = new String[this.paramTypes.length()]; + for (int i = 0; i < this.paramTypes.length(); i++) { + paramName[i] = undocumented; + paramDesc[i] = undocumented; + } + returnName = new String[this.returnTypes.length()]; + returnDesc = new String[this.returnTypes.length()]; + for (int i = 0; i < this.returnTypes.length(); i++) { + returnName[i] = undocumented; + returnDesc[i] = undocumented; + } + } + + /** + * Create a new Method. The parameters define the name of the + * method, the parameter types, the return value types and also + * what method in which object should be invoked to perform the + * method. Please refer to the {@link Method} class description + * for an explanation of type strings. + * + * @param name method name + * @param paramTypes a type string defining the parameter types + * @param returnTypes a type string defining the return value types + * @param handler the object handling this RPC method + * @param handlerMethod the name of the method in the handler that + * should be used to handle invocations of this RPC method. This + * method must be public, return void and take a {@link Request} + * as its only parameter. + * + * @throws MethodCreateException if the handler method cannot be + * resolved. + **/ + public Method(String name, String paramTypes, String returnTypes, + Object handler, String handlerMethod) { + + this.handler = null; + this.object = handler; + try { + this.method = this.object.getClass().getMethod(handlerMethod, + handlerParams); + } catch (Exception e) { + throw new MethodCreateException("Method lookup failed", e); + } + init(name, paramTypes, returnTypes); + } + + /** + * Create a new Method. The parameters define the name of the + * method, the parameter types, the return value types and also + * the handler for the method. Please refer to the {@link Method} + * class description for an explanation of type strings. + * + * @param name method name + * @param paramTypes a type string defining the parameter types + * @param returnTypes a type string defining the return value types + * @param handler the handler for this RPC method + * + * @throws MethodCreateException if the handler is <i>null</i>. + **/ + public Method(String name, String paramTypes, String returnTypes, + MethodHandler handler) { + + this.handler = handler; + this.object = null; + this.method = null; + if (this.handler == null) { + throw new MethodCreateException("Handler is null"); + } + init(name, paramTypes, returnTypes); + } + + /** + * Obtain the name of this method + * + * @return method name + **/ + String name() { + return name; + } + + /** + * Obtain the parameter types of this method + * + * @return parameter types + **/ + String paramTypes() { + return paramTypes; + } + + /** + * Obtain the return value types of this method + * + * @return return value types + **/ + String returnTypes() { + return returnTypes; + } + + /** + * Describe this method. This adds documentation that can be + * obtained through remote reflection. + * + * @return this Method, to allow chaining + **/ + public Method methodDesc(String desc) { + this.desc = desc; + return this; + } + + /** + * Obtain the method description + * + * @return method description + **/ + String methodDesc() { + return desc; + } + + /** + * Describe a parameter of this method. This adds documentation + * that can be obtained through remote reflection. + * + * @return this Method, to allow chaining + * @param index the parameter index + * @param name the parameter name + * @param desc the parameter description + **/ + public Method paramDesc(int index, String name, String desc) { + paramName[index] = name; + paramDesc[index] = desc; + return this; + } + + /** + * Obtain the name of a parameter + * + * @return parameter name + * @param index parameter index + **/ + String paramName(int index) { + return paramName[index]; + } + + /** + * Obtain the description of a parameter + * + * @return parameter description + * @param index parameter index + **/ + String paramDesc(int index) { + return paramDesc[index]; + } + + /** + * Describe a return value of this method. This adds documentation + * that can be obtained through remote reflection. + * + * @return this Method, to allow chaining + * @param index the return value index + * @param name the return value name + * @param desc the return value description + **/ + public Method returnDesc(int index, String name, String desc) { + returnName[index] = name; + returnDesc[index] = desc; + return this; + } + + /** + * Obtain the name of a return value + * + * @return return value name + * @param index return value index + **/ + String returnName(int index) { + return returnName[index]; + } + + /** + * Obtain the description of a return value + * + * @return return value description + * @param index return value index + **/ + String returnDesc(int index) { + return returnDesc[index]; + } + + /** + * Check whether the parameters of the given request satisfies the + * parameters of this method. + * + * @return true if the parameters of the given request satisfies + * the parameters of this method + * @param req a request + **/ + boolean checkParameters(Request req) { + return req.parameters().satisfies(paramTypes); + } + + /** + * Check whether the return values of the given request satisfies + * the return values of this method. + * + * @return true if the return values of the given request satisfies + * the return values of this method + * @param req a request + **/ + boolean checkReturnValues(Request req) { + return req.returnValues().satisfies(returnTypes); + } + + /** + * Invoke this method. The given request holds the parameters and + * will be given as parameter to the handler method. + * + * @param req the request causing this invocation + **/ + void invoke(Request req) { + try { + if (handler != null) { + handler.invoke(req); + } else { + Object[] args = { req }; + method.invoke(object, args); + } + } catch (Exception e) { + req.setError(ErrorCode.METHOD_FAILED, e.toString()); + } + } + + public @Override String toString() { + return "method " + name + "(" + paramTypes + ")" + ( returnTypes.length()>0 ? ": " + returnTypes : ""); + } + +} diff --git a/jrt/src/com/yahoo/jrt/MethodCreateException.java b/jrt/src/com/yahoo/jrt/MethodCreateException.java new file mode 100644 index 00000000000..e598f0f3a54 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/MethodCreateException.java @@ -0,0 +1,34 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Unchecked exception thrown when the {@link Method} constructor + * fails to resolve the method handler. The most usual reasons for + * this is that the method handler simply does not exist or that it + * has the wrong signature. + **/ +public class MethodCreateException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + /** + * Create a MethodCreateException with the given message. + * + * @param msg the exception message + **/ + MethodCreateException(String msg) { + super(msg); + } + + /** + * Create a MethodCreateException with the given message and + * cause. + * + * @param msg the exception message + * @param cause what caused this exception + **/ + MethodCreateException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/jrt/src/com/yahoo/jrt/MethodHandler.java b/jrt/src/com/yahoo/jrt/MethodHandler.java new file mode 100644 index 00000000000..5db43124280 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/MethodHandler.java @@ -0,0 +1,21 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * <p>Interface used to handle the invocation of a method.</p> + * + * <p>The {@link Method} class is used to register rpc methods. There + * are two ways rpc methods can be defined(bound); with this interface + * or with reflection. This choice is reflected by the two different + * constructors in the {@link Method} class.</p> + **/ +public interface MethodHandler { + + /** + * Method used to dispatch an rpc request. + * + * @param req the request + **/ + public void invoke(Request req); +} diff --git a/jrt/src/com/yahoo/jrt/Packet.java b/jrt/src/com/yahoo/jrt/Packet.java new file mode 100644 index 00000000000..3d50d5d0f97 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Packet.java @@ -0,0 +1,50 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +abstract class Packet +{ + public static final int PCODE_REQUEST = 100; + public static final int PCODE_REPLY = 101; + public static final int PCODE_ERROR = 102; + + public static final int FLAG_REVERSE = 0x1; // bit 0 + public static final int FLAG_NOREPLY = 0x2; // bit 1 + + public static final int headerLength = 12; + + public static boolean checkFlag(int flag, int flags) { + return (flags & flag) != 0; + } + + private int flags; + private int requestId; + + public Packet(int flags, int reqId) { + this.flags = flags; + this.requestId = reqId; + } + + public int requestId() { + return requestId; + } + + public boolean reverseByteOrder() { + return checkFlag(FLAG_REVERSE, flags); + } + + public boolean noReply() { + return checkFlag(FLAG_NOREPLY, flags); + } + + public abstract int bytes(); + public abstract int packetCode(); + public abstract void encode(ByteBuffer dst); + + public PacketInfo getPacketInfo() { + return new PacketInfo(bytes(), flags, packetCode(), requestId); + } +} diff --git a/jrt/src/com/yahoo/jrt/PacketInfo.java b/jrt/src/com/yahoo/jrt/PacketInfo.java new file mode 100644 index 00000000000..82502387402 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/PacketInfo.java @@ -0,0 +1,109 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + + +class PacketInfo +{ + private int packetLength; + private int flags; + private int packetCode; + private int requestId; + + private PacketInfo(ByteBuffer src) { + packetLength = src.getInt() + 4; + flags = src.getShort(); + packetCode = src.getShort(); + requestId = src.getInt(); + } + + PacketInfo(int plen, int flags, int pcode, int reqId) { + this.packetLength = plen; + this.flags = flags; + this.packetCode = pcode; + this.requestId = reqId; + } + + public int packetLength() { + return packetLength; + } + + public int flags() { + return flags; + } + + public int packetCode() { + return packetCode; + } + + public int requestId() { + return requestId; + } + + public boolean reverseByteOrder() { + return Packet.checkFlag(Packet.FLAG_REVERSE, flags); + } + + public boolean noReply() { + return Packet.checkFlag(Packet.FLAG_NOREPLY, flags); + } + + public static PacketInfo getPacketInfo(ByteBuffer src) { + if (src.remaining() < Packet.headerLength) { + return null; + } + return new PacketInfo(src.slice()); + } + + public Packet decodePacket(ByteBuffer src) { + int pos = src.position(); + int end = pos + packetLength; + int limit = src.limit(); + try { + src.limit(end); + src.position(src.position() + Packet.headerLength); + if (reverseByteOrder()) { + src.order(ByteOrder.LITTLE_ENDIAN); + } + switch (packetCode) { + case Packet.PCODE_REQUEST: + return new RequestPacket(flags, requestId, src); + case Packet.PCODE_REPLY: + return new ReplyPacket(flags, requestId, src); + case Packet.PCODE_ERROR: + return new ErrorPacket(flags, requestId, src); + } + throw new IllegalArgumentException(); + } finally { + src.order(ByteOrder.BIG_ENDIAN); + src.position(end); + src.limit(limit); + } + } + + public void encodePacket(Packet packet, ByteBuffer dst) { + int pos = dst.position(); + int end = pos + packetLength; + int limit = dst.limit(); + try { + dst.limit(end); + dst.putInt(packetLength - 4); + dst.putShort((short)flags); + dst.putShort((short)packetCode); + dst.putInt(requestId); + if (reverseByteOrder()) { + dst.order(ByteOrder.LITTLE_ENDIAN); + } + packet.encode(dst); + } catch (RuntimeException e) { + dst.position(pos); + throw e; + } finally { + dst.order(ByteOrder.BIG_ENDIAN); + dst.limit(limit); + } + } +} diff --git a/jrt/src/com/yahoo/jrt/Queue.java b/jrt/src/com/yahoo/jrt/Queue.java new file mode 100644 index 00000000000..5feb62c2af1 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Queue.java @@ -0,0 +1,132 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * A queue implementation that is not thread-safe. The implementation + * uses a growable circular array to hold the elements. + **/ +class Queue +{ + private Object[] buf; + private int used; + private int readPos; + private int writePos; + + /** + * Ensure the queue has room for the specified number of + * additional elements. + * + * @param need space needed on queue + **/ + private void ensureFree(int need) { + if (buf.length < used + need) { + int newSize = Math.max(buf.length, 8); + while (newSize < used + need) { + newSize *= 2; + } + Object[] newBuf = new Object[newSize]; + for (int i = 0; i < used; i++) { + newBuf[i] = buf[readPos++]; + if (readPos == buf.length) { + readPos = 0; + } + } + buf = newBuf; + readPos = 0; + writePos = used; // this cannot wrap + } + } + + /** + * Create a queue. If more elements are put on the queue than can + * be held by the initial capacity, the underlying structures will + * be grown as needed. + * + * @param capacity initial queue capacity + **/ + public Queue(int capacity) { + buf = new Object[capacity]; + used = 0; + readPos = 0; + writePos = 0; + } + + /** + * Create a queue with an initial capacity of 64. + **/ + public Queue() { + this(64); + } + + /** + * Enqueue an object on this queue. + * + * @param obj the object to enqueue + **/ + public void enqueue(Object obj) { + ensureFree(1); + buf[writePos++] = obj; + if (writePos == buf.length) { + writePos = 0; + } + used++; + } + + /** + * Dequeue the next object from this queue. + * + * @return the next object from the queue or 'null' if the queue + * is empty + **/ + public Object dequeue() { + if (used == 0) { + return null; + } + Object obj = buf[readPos]; + buf[readPos++] = null; // enable GC of dequeued object + if (readPos == buf.length) { + readPos = 0; + } + used--; + return obj; + } + + /** + * @return whether this queue is empty + **/ + public boolean isEmpty() { + return (used == 0); + } + + /** + * @return the number of elements in this queue + **/ + public int size() { + return used; + } + + /** + * Flush all elements currently in this queue into another + * queue. Note that this will clear the queue. + * + * @return the number of elements flushed + **/ + public int flush(Queue dst) { + int cnt = used; + dst.ensureFree(cnt); + for (int i = 0; i < used; i++) { + dst.buf[dst.writePos++] = buf[readPos]; + buf[readPos++] = null; // enable GC of dequeued object + if (dst.writePos == dst.buf.length) { + dst.writePos = 0; + } + if (readPos == buf.length) { + readPos = 0; + } + } + dst.used += used; + used = 0; + return cnt; + } +} diff --git a/jrt/src/com/yahoo/jrt/ReplyHandler.java b/jrt/src/com/yahoo/jrt/ReplyHandler.java new file mode 100644 index 00000000000..9c7214fb97c --- /dev/null +++ b/jrt/src/com/yahoo/jrt/ReplyHandler.java @@ -0,0 +1,9 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +interface ReplyHandler { + public Integer key(); + public void handleReply(Packet packet); + public void handleConnectionDown(); +} diff --git a/jrt/src/com/yahoo/jrt/ReplyPacket.java b/jrt/src/com/yahoo/jrt/ReplyPacket.java new file mode 100644 index 00000000000..158c901d1f2 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/ReplyPacket.java @@ -0,0 +1,42 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +class ReplyPacket extends Packet +{ + private Values returnValues; + + public ReplyPacket(int flags, int reqId, + Values returnValues) + { + super(flags, reqId); + this.returnValues = returnValues; + } + + public ReplyPacket(int flags, int reqId, + ByteBuffer src) + { + super(flags, reqId); + returnValues = new Values(src); + } + + public int bytes() { + return (headerLength + + returnValues.bytes()); + } + + public int packetCode() { + return PCODE_REPLY; + } + + public void encode(ByteBuffer dst) { + returnValues.encode(dst); + } + + public Values returnValues() { + return returnValues; + } +} diff --git a/jrt/src/com/yahoo/jrt/Request.java b/jrt/src/com/yahoo/jrt/Request.java new file mode 100644 index 00000000000..99d7df8657e --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Request.java @@ -0,0 +1,281 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * A Request bundles information about a single RPC invocation. A + * Request contains the name of the method, the method parameters, the + * method return values and also error information if something went + * wrong. Request objects are used by both RPC clients and RPC + * servers. An RPC client is the one requesting the invocation. An RPC + * server is the one performing the invocation. The RPC client uses a + * {@link Target} to invoke the request. The RPC server registers a + * {@link Method} with the {@link Supervisor}. Note that RPC + * client/server roles are independent of connection client/server + * roles, since invocations can be performed both ways across a {@link + * Target}. + **/ +public class Request +{ + private String methodName; + private Values parameters; + private Values returnValues = new Values(); + private int errorCode = 0; + private String errorMessage = null; + private boolean detached = false; + private Object context = null; + + private InvocationServer serverHandler; + private InvocationClient clientHandler; + + /** + * Create a Request from a method name and a set of + * parameters. This method is used internally on the server side + * to create Requests for incoming invocations. + * + * @param methodName method name + * @param parameters method parameters + **/ + Request(String methodName, Values parameters) { + this.methodName = methodName; + this.parameters = parameters; + } + + /** + * Set the client invocation helper object for this request + * + * @param handler helper object + **/ + void clientHandler(InvocationClient handler) { + clientHandler = handler; + } + + /** + * Set the server invocation helper object for this request + * + * @param handler helper object + **/ + void serverHandler(InvocationServer handler) { + serverHandler = handler; + } + + /** + * Create a new Request with the given method name. + * + * @param methodName name of the method you want to invoke + **/ + public Request(String methodName) { + this(methodName, new Values()); + } + + /** + * Set the application context associated with this request. + * + * @param context application context + **/ + public void setContext(Object context) { + this.context = context; + } + + /** + * Obtain the application context associated with this request. + * + * @return application context + **/ + public Object getContext() { + return context; + } + + /** + * Obtain the method name + * + * @return method name + **/ + public String methodName() { + return methodName; + } + + /** + * Obtain the parameters + * + * @return request parameters + **/ + public Values parameters() { + return parameters; + } + + /** + * Set the return values for this request. Used internally on the + * client side to incorporate the server response into the + * request. + * + * @param returnValues return values + **/ + void returnValues(Values returnValues) { + this.returnValues = returnValues; + } + + /** + * Obtain the return values + * + * @return request return values + **/ + public Values returnValues() { + return returnValues; + } + + /** + * Create a new empty set of parameters for this request. The old + * set of parameters will still be valid, but will no longer be + * part of this request. This method may be used to allow earlier + * garbage collection of large parameters that are no longer + * needed. While the obvious use of this method is to get rid of + * parameters when being an RPC server it can also be used after + * starting an RPC request on a client. + **/ + public void discardParameters() { + parameters = new Values(); + } + + /** + * Obtain the Target representing our end of the connection over + * which this request was invoked. This method may only be invoked + * during method handling (RPC server aspect). + * + * @return Target representing our end of the connection over + * which this request was invoked + * @throws IllegalStateException if invoked inappropriately + **/ + public Target target() { + if (serverHandler == null) { + throw new IllegalStateException("No server handler registered"); + } + return serverHandler.getTarget(); + } + + /** + * Abort a request. This method may only be called by the RPC + * client after an asynchronous method invocation was requested. + * + * @throws IllegalStateException if invoked inappropriately + **/ + public void abort() { + if (clientHandler == null) { + throw new IllegalStateException("No client handler registered"); + } + clientHandler.handleAbort(); + } + + /** + * Detach a method invocation. This method may only be invoked + * during method handling (RPC server aspect). If this method is + * invoked, the method is not returned when the method handler + * returns. Instead, the application must invoke the {@link + * #returnRequest returnRequest} method when it wants the request + * to be returned. + * + * @throws IllegalStateException if invoked inappropriately + **/ + public void detach() { + if (serverHandler == null) { + throw new IllegalStateException("No server handler registered"); + } + detached = true; + } + + /** + * Check whether this method was detached. + * + * @return true if this method was detached + **/ + boolean isDetached() { + return detached; + } + + /** + * Return this request. This method may only be invoked after the + * {@link #detach detach} method has been invoked, and only once + * per request. Note that if you detach a method without invoking + * this method, it will never be returned, causing a resource leak + * (NB: not good). + * + * @throws IllegalStateException if invoked inappropriately + **/ + public void returnRequest() { + if (!detached) { + throw new IllegalStateException("Request not detached"); + } + serverHandler.returnRequest(); + } + + /** + * Register the fact that an error has occurred. + * + * @param errorCode the error code (see {@link ErrorCode}) + * @param errorMessage the error message + **/ + public void setError(int errorCode, String errorMessage) { + this.errorCode = errorCode; + this.errorMessage = errorMessage; + } + + /** + * Check if this Request contains return types compatible with the + * given type string. If this Request contains an error it is + * considered incompatible with all possible type strings. If the + * return values are not compatible with the given type string and + * an error condition is not set, the {@link + * ErrorCode#WRONG_RETURN} error will be set. This method is + * intended to be used by the RPC client after a method has been + * invoked to verify the return value types. Please refer to the + * {@link Method} class description for an explanation of type + * strings. + * + * @return true if all is ok and the return types are compatible + * with 'returnTypes' + * @param returnTypes type string + **/ + public boolean checkReturnTypes(String returnTypes) { + if (errorCode != ErrorCode.NONE) { + return false; + } + if (returnValues.satisfies(returnTypes)) { + return true; + } + setError(ErrorCode.WRONG_RETURN, + "checkReturnValues: Wrong return values"); + return false; + } + + /** + * Check if an error has occurred with this Request + * + * @return true if an error has occurred + **/ + public boolean isError() { + return (errorCode != ErrorCode.NONE); + } + + /** + * Obtain the error code associated with this Request + * + * @return error code + **/ + public int errorCode() { + return errorCode; + } + + /** + * Obtain the error message associated with this Request, if any + * + * @return error message + **/ + public String errorMessage() { + return errorMessage; + } + + public @Override String toString() { + return "request " + methodName + "(" + parameters + ")" + ( returnValues.size()>0 ? ": " + returnValues : ""); + } + +} diff --git a/jrt/src/com/yahoo/jrt/RequestPacket.java b/jrt/src/com/yahoo/jrt/RequestPacket.java new file mode 100644 index 00000000000..bffcbd05f81 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/RequestPacket.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +class RequestPacket extends Packet +{ + private StringValue methodName; + private Values parameters; + + public RequestPacket(int flags, int reqId, + String methodName, + Values parameters) + { + super(flags, reqId); + this.methodName = new StringValue(methodName); + this.parameters = parameters; + } + + public RequestPacket(int flags, int reqId, + ByteBuffer src) + { + super(flags, reqId); + methodName = new StringValue(src); + parameters = new Values(src); + } + + public int bytes() { + return (headerLength + + methodName.bytes() + + parameters.bytes()); + } + + public int packetCode() { + return PCODE_REQUEST; + } + + public void encode(ByteBuffer dst) { + methodName.encode(dst); + parameters.encode(dst); + } + + public String methodName() { + return methodName.asString(); + } + + public Values parameters() { + return parameters; + } +} diff --git a/jrt/src/com/yahoo/jrt/RequestWaiter.java b/jrt/src/com/yahoo/jrt/RequestWaiter.java new file mode 100644 index 00000000000..523f2b25223 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/RequestWaiter.java @@ -0,0 +1,18 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Interface used to wait for the completion of a {@link + * Request}. This interface is used with the {@link Target#invokeAsync + * Target.invokeAsync} method. + **/ +public interface RequestWaiter { + + /** + * Invoked when a request has completed. + * + * @param req the completed request + **/ + public void handleRequestDone(Request req); +} diff --git a/jrt/src/com/yahoo/jrt/Scheduler.java b/jrt/src/com/yahoo/jrt/Scheduler.java new file mode 100644 index 00000000000..403cb4cae99 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Scheduler.java @@ -0,0 +1,136 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class Scheduler { + private static final int TICK = 100; + private static final int SLOTS = 512; + private static final int MASK = 511; + private static final int SHIFT = 9; + + private Task[] slots = new Task[SLOTS + 1]; + private int[] counts = new int[SLOTS + 1]; + private Queue queue = new Queue(TICK); + private int currIter = 0; + private int currSlot = 0; + private long nextTick; + + private static boolean isActive(Task task) { + return (task.next() != null); + } + + private void linkIn(Task task) { + Task head = slots[task.slot()]; + if (head == null) { + task.next(task); + task.prev(task); + slots[task.slot()] = task; + } else { + task.next(head); + task.prev(head.prev()); + head.prev().next(task); + head.prev(task); + } + ++counts[task.slot()]; + } + + private void linkOut(Task task) { + Task head = slots[task.slot()]; + if (task.next() == task) { + slots[task.slot()] = null; + } else { + task.prev().next(task.next()); + task.next().prev(task.prev()); + if (head == task) { + slots[task.slot()] = task.next(); + } + } + task.next(null); + task.prev(null); + --counts[task.slot()]; + } + + public Scheduler(long now) { + nextTick = now + TICK; + } + + public synchronized void schedule(Task task, double seconds) { + if (task.isKilled()) { + return; + } + if (seconds < 0.0) { + throw new IllegalArgumentException("cannot schedule a Task in the past"); + } + int ticks = 1 + (int) (seconds * 10.0 + 0.5); + if (isActive(task)) { + linkOut(task); + } + task.slot((ticks + currSlot) & MASK); + task.iter(currIter + ((ticks + currSlot) >> SHIFT)); + linkIn(task); + } + + public synchronized void scheduleNow(Task task) { + if (task.isKilled()) { + return; + } + if (isActive(task)) { + linkOut(task); + } + task.slot(SLOTS); + task.iter(0); + linkIn(task); + } + + public synchronized boolean unschedule(Task task) { + if (isActive(task)) { + linkOut(task); + return true; + } + return false; + } + + public synchronized boolean kill(Task task) { + task.setKilled(); + if (isActive(task)) { + linkOut(task); + return true; + } + return false; + } + + private void queueTasks(int slot, int iter) { + int cnt = counts[slot]; + Task task = slots[slot]; + for (int i = 0; i < cnt; i++) { + Task next = task.next(); + if (task.iter() == iter) { + linkOut(task); + queue.enqueue(task); + } + task = next; + } + } + + public void checkTasks(long now) { + if (slots[SLOTS] == null && now < nextTick) { + return; + } + synchronized (this) { + queueTasks(SLOTS, 0); + for (int i = 0; now >= nextTick; i++, nextTick += TICK) { + if (i < 3) { + if (++currSlot >= SLOTS) { + currSlot = 0; + currIter++; + } + queueTasks(currSlot, currIter); + } + } + } + while (!queue.isEmpty()) { + Task task = (Task) queue.dequeue(); + task.perform(); + } + } +} diff --git a/jrt/src/com/yahoo/jrt/SessionHandler.java b/jrt/src/com/yahoo/jrt/SessionHandler.java new file mode 100644 index 00000000000..0f9411b7173 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/SessionHandler.java @@ -0,0 +1,66 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Interface used to handle the lifetime of a {@link Target}. The word + * session is used to denote all the RPC activity across a single + * {@link Target} during its lifetime. This interface gives the + * application information about when different {@link Target} objects + * enter different stages in their lifetime. Combined with the ability + * to bind application specific data to a {@link Target} with the + * {@link Target#setContext} method, this enables method invocations + * in the same session to share state information. Usage of this + * interface is optional. It is typically useful for server + * applications needing state to be shared between RPC method + * invocation on a session. Each {@link Supervisor} can only have a + * single session handler. Use the {@link Supervisor#setSessionHandler + * Supervisor.setSessionHandler} method to set the session + * handler. The different callbacks may be called from several + * different threads, but for a single target there will be no + * overlapping of session callbacks, and the order will always be the + * same; init, live (not always called), down, fini. + **/ +public interface SessionHandler { + + /** + * Invoked when a new {@link Target} is created. This is a nice + * place to initialize and attach application context to the + * {@link Target}. + * + * @param target the target + **/ + public void handleSessionInit(Target target); + + /** + * Invoked when a connection is established with the peer. Note + * that if a connection could not be established with the peer, + * this method is never invoked. + * + * @param target the target + **/ + public void handleSessionLive(Target target); + + /** + * Invoked when the target becomes invalid. This is typically + * caused by the network connection with the peer going down. Note + * that this method is invoked also when a connection with the + * peer could not be established at all. + * + * @param target the target + **/ + public void handleSessionDown(Target target); + + /** + * Invoked when the target is invalid and no more RPC invocations + * are active on our side of this target (invoked from the other + * side; we being the server). If you need to perform cleanup + * related to the application data associated with the target, you + * should wait until this method is invoked, to avoid cleaning up + * the {@link Target} application context under the feet of active + * invocations. + * + * @param target the target + **/ + public void handleSessionFini(Target target); +} diff --git a/jrt/src/com/yahoo/jrt/SingleRequestWaiter.java b/jrt/src/com/yahoo/jrt/SingleRequestWaiter.java new file mode 100644 index 00000000000..64f354aaf89 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/SingleRequestWaiter.java @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class SingleRequestWaiter implements RequestWaiter { + + private boolean done = false; + + public synchronized void handleRequestDone(Request req) { + done = true; + notify(); + } + + public synchronized void waitDone() { + while (!done) { + try { wait(); } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/src/com/yahoo/jrt/Spec.java b/jrt/src/com/yahoo/jrt/Spec.java new file mode 100644 index 00000000000..7ed0aa69920 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Spec.java @@ -0,0 +1,133 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.net.SocketAddress; +import java.net.InetSocketAddress; + + +/** + * A Spec is a network address used for either listening or + * connecting. + **/ +public class Spec +{ + private SocketAddress address; + private String host; + private int port; + private boolean malformed; + + /** + * Create a Spec from a string. The form of the input string is + * 'tcp/host:port' or 'tcp/port' where 'host' is the host name and + * 'port' is the port number. + * + * @param spec input string to be parsed + * @see #malformed + **/ + public Spec(String spec) { + if (spec.startsWith("tcp/")) { + int sep = spec.indexOf(':'); + String portStr = null; + if (sep == -1) { + portStr = spec.substring(4); + } else { + host = spec.substring(4, sep); + portStr = spec.substring(sep + 1); + } + try { + port = Integer.parseInt(portStr); + } catch (NumberFormatException e) { + host = null; + port = 0; + malformed = true; + } + } else { + malformed = true; + } + } + + /** + * Create a Spec from a host name and a port number. + * + * @param host host name + * @param port port number + **/ + public Spec(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * Create a Spec from a port number. + * + * @param port port number + **/ + public Spec(int port) { + this.port = port; + } + + /** + * Obtain the host name of this address + * + * @return host name + **/ + public String host() { + return host; + } + + /** + * Obtain the port number if this address + * + * @return port number + **/ + public int port() { + return port; + } + + /** + * If this Spec was created from a string, this method will tell + * you whether that string was malformed. + * + * @return true if this address is malformed + **/ + public boolean malformed() { + return malformed; + } + + /** + * Resolve the socket address for this Spec. If this Spec is + * malformed, this method will return null. + * + * @return socket address + **/ + SocketAddress address() { + if (malformed) { + return null; + } + if (address == null) { + if (host == null) { + address = new InetSocketAddress(port); + } else { + address = new InetSocketAddress(host, port); + } + } + return address; + } + + /** + * Obtain a string representation of this address. The return + * value from this method may be used to create a new Spec. + * + * @return string representation of this address + **/ + public String toString() { + if (malformed) { + return "MALFORMED"; + } + if (host == null) { + return "tcp/" + port; + } + return "tcp/" + host + ":" + port; + } +} diff --git a/jrt/src/com/yahoo/jrt/StringArray.java b/jrt/src/com/yahoo/jrt/StringArray.java new file mode 100644 index 00000000000..8195a586f9d --- /dev/null +++ b/jrt/src/com/yahoo/jrt/StringArray.java @@ -0,0 +1,82 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * String array. The internal string representation is UTF-8 encoded + * bytes. This means that creating an object of this class as well as + * extracting the value contained with the {@link #asStringArray + * asStringArray} method will incur a string conversion overhead. + **/ +public class StringArray extends Value +{ + private byte[][] value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public StringArray(String[] value) { + this.value = new byte[value.length][]; + for (int i = 0; i < value.length; i++) { + try { + this.value[i] = value[i].getBytes("UTF-8"); + } catch(java.io.UnsupportedEncodingException e) {} + } + } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + StringArray(ByteBuffer src) { + int size = src.getInt(); + value = new byte[size][]; + for (int i = 0; i < size; i++) { + value[i] = new byte[src.getInt()]; + src.get(value[i]); + } + } + + /** + * @return STRING_ARRAY + **/ + public byte type() { return STRING_ARRAY; } + public int count() { return value.length; } + + int bytes() { + int bytes = 4; + for (int i = 0; i < value.length; i++) { + bytes += 4 + value[i].length; + } + return bytes; + } + void encode(ByteBuffer dst) { + dst.putInt(value.length); + for (int i = 0; i < value.length; i++) { + dst.putInt(value[i].length); + dst.put(value[i]); + } + } + + public String[] asStringArray() { + String[] ret = new String[value.length]; + for (int i = 0; i < value.length; i++) { + try { + ret[i] = new String(value[i], "UTF-8"); + } catch(java.io.UnsupportedEncodingException e) {} + } + return ret; + } + + public @Override String toString() { + return Arrays.toString(asStringArray()); + } + +} diff --git a/jrt/src/com/yahoo/jrt/StringValue.java b/jrt/src/com/yahoo/jrt/StringValue.java new file mode 100644 index 00000000000..175884c435b --- /dev/null +++ b/jrt/src/com/yahoo/jrt/StringValue.java @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import com.yahoo.text.Utf8Array; +import com.yahoo.text.Utf8String; + +import java.nio.ByteBuffer; + + +/** + * String value. The internal string representation is UTF-8 encoded + * bytes. This means that creating an object of this class as well as + * extracting the value contained with the {@link #asString asString} + * method will incur a string conversion overhead. + **/ +public class StringValue extends Value +{ + private Utf8Array value; + + /** + * Create from a Java-type value + * + * @param value the value + **/ + public StringValue(String value) { + this.value = new Utf8String(value); + } + public StringValue(Utf8String value) { + this.value = value; + } + public StringValue(Utf8Array value) { + this.value = value; + } + + /** + * Create by decoding the value from the given buffer + * + * @param src buffer where the value is stored + **/ + StringValue(ByteBuffer src) { + int size = src.getInt(); + value = new Utf8String(new Utf8Array(src, size)); + } + + /** + * @return STRING + **/ + public byte type() { return STRING; } + public int count() { return 1; } + + int bytes() { return 4 + value.getByteLength(); } + void encode(ByteBuffer dst) { + dst.putInt(value.getByteLength()); + value.writeTo(dst); + } + + public String asString() { + return value.toString(); + } + + public @Override Utf8Array asUtf8Array() { return value; } + + public @Override String toString() { + return asString(); + } + +} diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java new file mode 100644 index 00000000000..89f677c04f0 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Supervisor.java @@ -0,0 +1,320 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.util.HashMap; + + +/** + * A Supervisor keeps a method repository and handles dispatching of + * incoming invocation requests. Each end-point of a connection is + * represented by a {@link Target} object and each {@link Target} is + * associated with a single Supervisor that handles the invocation + * requests obtained from that {@link Target}. Note that RPC + * invocations can be performed both ways across a connection, so even + * the client side of a connection has RPC server capabilities. + **/ +public class Supervisor { + + private class AddMethod implements Runnable { + private Method method; + AddMethod(Method method) { + this.method = method; + } + public void run() { + methodMap.put(method.name(), method); + } + } + + private class RemoveMethod implements Runnable { + private String methodName; + private Method method = null; + RemoveMethod(String methodName) { + this.methodName = methodName; + } + RemoveMethod(Method method) { + this.methodName = method.name(); + this.method = method; + } + public void run() { + Method m = methodMap.remove(methodName); + if (method != null && m != method) { + methodMap.put(method.name(), method); + } + } + } + + private Transport transport; + private SessionHandler sessionHandler = null; + private HashMap<String, Method> methodMap = new HashMap<>(); + private int maxInputBufferSize = 0; + private int maxOutputBufferSize = 0; + + /** + * Create a new Supervisor based on the given {@link Transport} + * + * @param transport object performing low-level operations for + * this Supervisor + **/ + public Supervisor(Transport transport) { + this.transport = transport; + new MandatoryMethods(this); + } + + /** + * Set maximum input buffer size. This value will only affect + * connections that use a common input buffer when decoding + * incoming packets. Note that this value is not an absolute + * max. The buffer will still grow larger than this value if + * needed to decode big packets. However, when the buffer becomes + * larger than this value, it will be shrunk back when possible. + * + * @param bytes buffer size in bytes. 0 means unlimited. + **/ + public void setMaxInputBufferSize(int bytes) { + maxInputBufferSize = bytes; + } + + /** + * Set maximum output buffer size. This value will only affect + * connections that use a common output buffer when encoding + * outgoing packets. Note that this value is not an absolute + * max. The buffer will still grow larger than this value if needed + * to encode big packets. However, when the buffer becomes larger + * than this value, it will be shrunk back when possible. + * + * @param bytes buffer size in bytes. 0 means unlimited. + **/ + public void setMaxOutputBufferSize(int bytes) { + maxOutputBufferSize = bytes; + } + + /** + * Obtain the method map for this Supervisor + * + * @return the method map + **/ + HashMap<String, Method> methodMap() { + return methodMap; + } + + /** + * Obtain the underlying Transport object. + * + * @return underlying Transport object + **/ + public Transport transport() { + return transport; + } + + /** + * Set the session handler for this Supervisor + * + * @param handler the session handler + **/ + public void setSessionHandler(SessionHandler handler) { + sessionHandler = handler; + } + + /** + * Add a method to the set of methods held by this Supervisor + * + * @param method the method to add + **/ + public void addMethod(Method method) { + transport.perform(new AddMethod(method)); + } + + /** + * Remove a method from the set of methods held by this Supervisor + * + * @param methodName name of the method to remove + **/ + public void removeMethod(String methodName) { + transport.perform(new RemoveMethod(methodName)); + } + + /** + * Remove a method from the set of methods held by this + * Supervisor. Use this if you know exactly which method to remove + * and not only the name. + * + * @param method the method to remove + **/ + public void removeMethod(Method method) { + transport.perform(new RemoveMethod(method)); + } + + /** + * Connect to the given address. The new {@link Target} will be + * associated with this Supervisor. + * + * @return Target representing our end of the connection + * @param spec where to connect + * @see #connect(com.yahoo.jrt.Spec, java.lang.Object) + **/ + public Target connect(Spec spec) { + return transport.connect(this, spec, null, false); + } + + /** + * Connect to the given address. The new {@link Target} will be + * associated with this Supervisor. This method will perform a + * synchronous connect in the calling thread. + * + * @return Target representing our end of the connection + * @param spec where to connect + * @see #connectSync(com.yahoo.jrt.Spec, java.lang.Object) + **/ + public Target connectSync(Spec spec) { + return transport.connect(this, spec, null, true); + } + + /** + * Connect to the given address. The new {@link Target} will be + * associated with this Supervisor and will have 'context' as + * application context. + * + * @return Target representing our end of the connection + * @param spec where to connect + * @param context application context for the Target + * @see Target#getContext + **/ + public Target connect(Spec spec, Object context) { + return transport.connect(this, spec, context, false); + } + + /** + * Connect to the given address. The new {@link Target} will be + * associated with this Supervisor and will have 'context' as + * application context. This method will perform a synchronous + * connect in the calling thread. + * + * @return Target representing our end of the connection + * @param spec where to connect + * @param context application context for the Target + * @see Target#getContext + **/ + public Target connectSync(Spec spec, Object context) { + return transport.connect(this, spec, context, true); + } + + /** + * Listen to the given address. + * + * @return active object accepting new connections that will be + * associated with this Supervisor + * @param spec the address to listen to + **/ + public Acceptor listen(Spec spec) throws ListenFailedException { + return transport.listen(this, spec); + } + + /** + * Convenience method for connecting to a peer, invoking a method + * and disconnecting. + * + * @param spec the address to connect to + * @param req the invocation request + * @param timeout request timeout in seconds + **/ + public void invokeBatch(Spec spec, Request req, double timeout) { + Target target = connectSync(spec); + try { + target.invokeSync(req, timeout); + } finally { + target.close(); + } + } + + /** + * This method is invoked when a new target is created + * + * @param target the target + **/ + void sessionInit(Target target) { + if (target instanceof Connection) { + Connection conn = (Connection) target; + conn.setMaxInputSize(maxInputBufferSize); + conn.setMaxOutputSize(maxOutputBufferSize); + } + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionInit(target); + } + } + + /** + * This method is invoked when a target establishes a connection + * with its peer + * + * @param target the target + **/ + void sessionLive(Target target) { + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionLive(target); + } + } + + /** + * This method is invoked when a target becomes invalid + * + * @param target the target + **/ + void sessionDown(Target target) { + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionDown(target); + } + } + + /** + * This method is invoked when a target is invalid and no more + * invocations are active + * + * @param target the target + **/ + void sessionFini(Target target) { + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionFini(target); + } + } + + /** + * This method is invoked each time we write a packet. This method + * is empty and only used for testing through sub-classing. + * + * @param info information about the written packet + **/ + void writePacket(PacketInfo info) {} + + /** + * This method is invoked each time we read a packet. This method + * is empty and only used for testing through sub-classing. + * + * @param info information about the read packet + **/ + void readPacket(PacketInfo info) {} + + /** + * Handle a packet received on one of the connections associated + * with this Supervisor. This method is invoked for all packets + * not handled by a {@link ReplyHandler} + * + * @param conn where the packet came from + * @param packet the packet + **/ + void handlePacket(Connection conn, Packet packet) { + if (packet.packetCode() != Packet.PCODE_REQUEST) { + return; + } + RequestPacket rp = (RequestPacket) packet; + Request req = new Request(rp.methodName(), rp.parameters()); + Method method = methodMap.get(req.methodName()); + new InvocationServer(conn, req, method, + packet.requestId(), + packet.noReply()).invoke(); + } +} diff --git a/jrt/src/com/yahoo/jrt/Target.java b/jrt/src/com/yahoo/jrt/Target.java new file mode 100644 index 00000000000..eb7e2d0e38d --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Target.java @@ -0,0 +1,147 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * A Target represents a connection endpoint with RPC + * capabilities. Each such connection has a client and a server + * side. The client side is the one initiating the connection. RPC + * requests may be invoked across the connection from both the client + * and the server side. + **/ +public abstract class Target { + + private Object context; + + /** + * Create a Target with the given application context. + * + * @param context application context + **/ + Target(Object context) { + this.context = context; + } + + /** + * Create a Target without any application context. + **/ + Target() { + this(null); + } + + /** + * Set the application context associated with this target. + * + * @param context application context + **/ + public void setContext(Object context) { + this.context = context; + } + + /** + * Obtain the application context associated with this target. + * + * @return application context + **/ + public Object getContext() { + return context; + } + + /** + * Check if this target is still valid for invocations. + * + * @return true if this target is still valid + **/ + public abstract boolean isValid(); + + /** + * Obtain the low-level reason behind losing the connection for + * which this target is an endpoint. If the target is still valid + * or if the target became invalid because it was closed, this + * method will return null. In other cases this method may or may + * not return an exception indicating why the connection was + * lost. Also, if an exception is returned, its nature may vary + * based on implementation details across platforms. + * + * @return exception causing connection loss or null + **/ + public Exception getConnectionLostReason() { return null; } + + /** + * Check if this target represents the client side of a + * connection. + * + * @return true if this is a client-side target + **/ + public abstract boolean isClient(); + + /** + * Check if this target represents the server side of a + * connection. + * + * @return true if this is a server-side target + **/ + public abstract boolean isServer(); + + /** + * Invoke a request on this target and wait for it to return. + * + * @param req the request + * @param timeout timeout in seconds + **/ + public abstract void invokeSync(Request req, double timeout); + + /** + * Invoke a request on this target and let the completion be + * signalled with a callback. + * + * @param req the request + * @param timeout timeout in seconds + * @param waiter callback handler + **/ + public abstract void invokeAsync(Request req, double timeout, + RequestWaiter waiter); + + /** + * Invoke a request on this target, but ignore the return + * value(s). The success or failure of the invocation is also + * ignored. However, the return value gives a little hint by + * indicating whether the invocation has been attempted at all. + * + * @return false if the invocation was not attempted due to the + * target being invalid + * @param req the request + **/ + public abstract boolean invokeVoid(Request req); + + /** + * Add a watcher to this target. A watcher is notified if the + * target becomes invalid. If the target is already invalid when + * this method is invoked, no operation is performed and false is + * returned. Multiple adds of the same watcher has no additional + * effect. + * + * @return true if the add operation was performed + * @param watcher the watcher to be added + **/ + public abstract boolean addWatcher(TargetWatcher watcher); + + /** + * Remove a watcher from this target. If the target is already + * invalid when this method is invoked, no operation is performed + * and false is returned. Multiple removes of the same watcher has + * no additional effect. + * + * @return true if the remove operation was performed + * @param watcher the watcher to be removed + * @see #addWatcher + **/ + public abstract boolean removeWatcher(TargetWatcher watcher); + + /** + * Close this target. Note that the close operation is + * asynchronous. If you need to wait for the target to become + * invalid, use the {@link Transport#sync Transport.sync} method. + **/ + public abstract void close(); +} diff --git a/jrt/src/com/yahoo/jrt/TargetWatcher.java b/jrt/src/com/yahoo/jrt/TargetWatcher.java new file mode 100644 index 00000000000..9b75bcdede4 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/TargetWatcher.java @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Interface used to notify when a {@link Target} becomes + * invalid. Listening is controlled with the {@link Target#addWatcher + * Target.addWatcher} and {@link Target#removeWatcher + * Target.removeWatcher} methods. + **/ +public interface TargetWatcher { + + /** + * Invoked when a target becomes invalid. + * + * @param target the target that has become invalid. + **/ + public void notifyTargetInvalid(Target target); +} diff --git a/jrt/src/com/yahoo/jrt/Task.java b/jrt/src/com/yahoo/jrt/Task.java new file mode 100644 index 00000000000..36b96d6fda7 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Task.java @@ -0,0 +1,99 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * A Task enables a Runnable to be scheduled for execution in the + * transport thread some time in the future. Tasks are used internally + * to handle RPC timeouts. Use the {@link Transport#createTask + * Transport.createTask} method to create a task associated with a + * {@link Transport} object. Note that Task execution is designed to + * be low-cost, so do not expect extreme accuracy. Also note that any + * tasks that are pending execution when the owning {@link Transport} + * object is shut down will never be run. + **/ +public class Task { + private Scheduler owner; + private Runnable doit; + private int slot; + private int iter; + private Task next; + private Task prev; + private boolean killed; + + // methods used by the scheduler + int slot() { return slot; } + void slot(int val) { slot = val; } + int iter() { return iter; } + void iter(int val) { iter = val; } + Task next() { return next; } + void next(Task val) { next = val; } + Task prev() { return prev; } + void prev(Task val) { prev = val; } + boolean isKilled() { return killed; } + void setKilled() { killed = true; } + void perform() { doit.run(); } + + /** + * Create a Task owned by the given scheduler + * + * @param owner the scheduler owning this task + * @param doit what to run when the task is executed + **/ + Task(Scheduler owner, Runnable doit) { + this.owner = owner; + this.doit = doit; + } + + /** + * Schedule this task for execution. A task may be scheduled + * multiple times, but may only have a single pending execution + * time. Re-scheduling a task that is not yet run will move the + * execution time. If the task has already been executed, + * scheduling works just like if the task was never run. + * + * @param seconds the number of seconds until the task should be + * executed + * @see #kill + **/ + public void schedule(double seconds) { + owner.schedule(this, seconds); + } + + /** + * Schedule this task for execution as soon as possible. This will + * result in the task being executed the next time the reactor + * loop inside the owning {@link Transport} object checks for + * tasks to run. If you have something that is even more urgent, + * or something you need to be executed even if the {@link + * Transport} is shut down, use the {@link Transport#perform} + * method instead. + * @see #kill + **/ + public void scheduleNow() { + owner.scheduleNow(this); + } + + /** + * Cancel the execution of this task. + * + * @return true if the task was scheduled and we managed to avoid + * execution + **/ + public boolean unschedule() { + return owner.unschedule(this); + } + + /** + * Cancel the execution of this task and make sure it can never be + * scheduled for execution again. After this method is invoked, + * invoking the {@link #schedule schedule} and {@link #scheduleNow + * scheduleNow} methods will have no effect. + * + * @return true if the task was scheduled and we managed to avoid + * execution + **/ + public boolean kill() { + return owner.kill(this); + } +} diff --git a/jrt/src/com/yahoo/jrt/ThreadQueue.java b/jrt/src/com/yahoo/jrt/ThreadQueue.java new file mode 100644 index 00000000000..e5a2a28b5d9 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/ThreadQueue.java @@ -0,0 +1,63 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * A thread-safe queue wrapper used to pass objects between threads. + **/ +class ThreadQueue +{ + private Queue queue = new Queue(); + private boolean closed = false; + + public ThreadQueue() {} + + /** + * Enqueue an object on this queue. If the queue has been closed, + * the object will not be queued, and this method will return + * false. + * + * @return true if the object was enqueued, false if this queue + * was closed + * @param obj the object to enqueue + **/ + public synchronized boolean enqueue(Object obj) { + if (closed) { + return false; + } + queue.enqueue(obj); + if (queue.size() == 1) { + notify(); // assume only one reader + } + return true; + } + + /** + * Close this queue. After this method is invoked, no more objects + * may be enqueued on this queue. Also, trying to dequeue an + * object form a queue that is both empty and closed will cause a + * {@link EndOfQueueException}. + **/ + public synchronized void close() { + closed = true; + notify(); + } + + /** + * Dequeue the next object from this queue. This method will block + * until at least one object is available in the queue. + * + * @return the next object from the queue + * @throws EndOfQueueException if the queue is both empty and + * closed + **/ + public synchronized Object dequeue() throws EndOfQueueException { + while (queue.isEmpty() && !closed) { + try { wait(); } catch (InterruptedException x) {} + } + if (queue.isEmpty()) { + throw new EndOfQueueException(); + } + return queue.dequeue(); + } +} diff --git a/jrt/src/com/yahoo/jrt/TieBreaker.java b/jrt/src/com/yahoo/jrt/TieBreaker.java new file mode 100644 index 00000000000..b327328fae6 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/TieBreaker.java @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * When multiple actors race to come to a certain point and only one + * should be allowed to continue, an object of this class may be used + * to settle who should be allowed to continue. Note that external + * synchronization is needed if this class is used with multiple + * threads. + **/ +class TieBreaker { + private boolean first = true; + + /** + * Are we the first to come here? + * + * @return true if you are first, false otherwise. + **/ + public boolean first() { + boolean ret = first; + first = false; + return ret; + } +} diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java new file mode 100644 index 00000000000..85bfed79732 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -0,0 +1,401 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.logging.Level; +import java.util.logging.Logger; + + +/** + * The Transport class is the core needed to make your {@link + * Supervisor} tick. It implements the reactor pattern to perform + * multiplexed network IO, handles scheduled tasks and keeps track of + * some additional helper threads. A single Transport object can back + * multiple {@link Supervisor} objects. + **/ +public class Transport { + + private static final int OPEN = 1; + private static final int CLOSING = 2; + private static final int CLOSED = 3; + + private class Run implements Runnable { + public void run() { + try { + Transport.this.run(); + } catch (Throwable problem) { + handleFailure(problem, Transport.this); + } + } + } + + private class AddConnectionCmd implements Runnable { + private Connection conn; + AddConnectionCmd(Connection conn) { this.conn = conn; } + public void run() { handleAddConnection(conn); } + } + + private class CloseConnectionCmd implements Runnable { + private Connection conn; + CloseConnectionCmd(Connection conn) { this.conn = conn; } + public void run() { handleCloseConnection(conn); } + } + + private class EnableWriteCmd implements Runnable { + private Connection conn; + EnableWriteCmd(Connection conn) { this.conn = conn; } + public void run() { handleEnableWrite(conn); } + } + + private class SyncCmd implements Runnable { + boolean done = false; + public synchronized void waitDone() { + while (!done) { + try { wait(); } catch (InterruptedException e) {} + } + } + public synchronized void run() { + done = true; + notify(); + } + } + + private static Logger log = Logger.getLogger(Transport.class.getName()); + + private FatalErrorHandler fatalHandler; // NB: this must be set first + private Thread thread; + private Queue queue; + private Queue myQueue; + private Connector connector; + private Closer closer; + private Scheduler scheduler; + private int state; + private Selector selector; + + private void handleAddConnection(Connection conn) { + if (conn.isClosed()) { + if (conn.hasSocket()) { + closer.closeLater(conn); + } + return; + } + if (!conn.init(selector)) { + handleCloseConnection(conn); + } + } + + private void handleCloseConnection(Connection conn) { + if (conn.isClosed()) { + return; + } + conn.fini(); + if (conn.hasSocket()) { + closer.closeLater(conn); + } + } + + private void handleEnableWrite(Connection conn) { + if (conn.isClosed()) { + return; + } + conn.enableWrite(); + } + + private boolean postCommand(Runnable cmd) { + boolean wakeup; + synchronized (this) { + if (state == CLOSED) { + return false; + } + wakeup = queue.isEmpty(); + queue.enqueue(cmd); + } + if (wakeup) { + selector.wakeup(); + } + return true; + } + + private void handleEvents() { + synchronized (this) { + queue.flush(myQueue); + } + while (!myQueue.isEmpty()) { + ((Runnable)myQueue.dequeue()).run(); + } + } + + private boolean handleIOEvents(Connection conn, + SelectionKey key) { + if (conn.isClosed()) { + return true; + } + if (key.isReadable()) { + try { + conn.read(); + } catch (IOException e) { + conn.setLostReason(e); + return false; + } + } + if (key.isWritable()) { + try { + conn.write(); + } catch (IOException e) { + conn.setLostReason(e); + return false; + } + } + return true; + } + + /** + * Create a new Transport object with the given fatal error handler. + * + * @param fatalHandler fatal error handler + **/ + public Transport(FatalErrorHandler fatalHandler) { + synchronized (this) { + this.fatalHandler = fatalHandler; // NB: this must be set first + } + thread = new Thread(new Run(), "<transport>"); + queue = new Queue(); + myQueue = new Queue(); + connector = new Connector(this); + closer = new Closer(this); + scheduler = new Scheduler(System.currentTimeMillis()); + state = OPEN; + try { + selector = Selector.open(); + } catch (Exception e) { + throw new Error("Could not open transport selector", e); + } + thread.setDaemon(true); + thread.start(); + } + + /** + * Create a Transport object with no fatal error handler. If a + * fatal error occurs when no fatal error handler is registered, + * the default action is to log the error and exit with exit code + * 1. + **/ + public Transport() { + this(null); + } + + /** + * Proxy method used to dispatch fatal errors to the fatal error + * handler. If no handler is registered, the default action is to + * log the error and halt the Java VM. + * + * @param problem the throwable causing the failure + * @param context the object owning the crashing thread + **/ + void handleFailure(Throwable problem, Object context) { + if (fatalHandler != null) { + fatalHandler.handleFailure(problem, context); + return; + } + try { + log.log(Level.SEVERE, "fatal error in " + context, problem); + } catch (Throwable ignore) {} + Runtime.getRuntime().halt(1); + } + + /** + * Listen to the given address. This method is called by a {@link + * Supervisor} object. + * + * @return active object accepting new connections + * @param owner the one calling this method + * @param spec the address to listen to + **/ + Acceptor listen(Supervisor owner, Spec spec) throws ListenFailedException { + return new Acceptor(this, owner, spec); + } + + /** + * Connect to the given address. This method is called by a {@link + * Supervisor} object. + * + * @return the new connection + * @param owner the one calling this method + * @param spec the address to connect to + * @param context application context for the new connection + * @param sync perform a synchronous connect in the calling thread + * if this flag is set + **/ + Connection connect(Supervisor owner, Spec spec, + Object context, boolean sync) { + Connection conn = new Connection(this, owner, spec, context); + if (sync) { + addConnection(conn.connect()); + } else { + connector.connectLater(conn); + } + return conn; + } + + /** + * Add a connection to the set of connections handled by this + * Transport. Invoked by the {@link Connector} class. + * + * @param conn the connection to add + **/ + void addConnection(Connection conn) { + if (!postCommand(new AddConnectionCmd(conn))) { + perform(new CloseConnectionCmd(conn)); + } + } + + /** + * Request an asynchronous close of a connection. + * + * @param conn the connection to close + **/ + void closeConnection(Connection conn) { + postCommand(new CloseConnectionCmd(conn)); + } + + /** + * Request an asynchronous enabling of write events for a + * connection. + * + * @param conn the connection to enable write events for + **/ + void enableWrite(Connection conn) { + if (Thread.currentThread() == thread) { + handleEnableWrite(conn); + } else { + postCommand(new EnableWriteCmd(conn)); + } + } + + /** + * Create a {@link Task} that can be scheduled for execution in + * the transport thread. + * + * @return the newly created Task + * @param cmd what to run when the task is executed + **/ + public Task createTask(Runnable cmd) { + return new Task(scheduler, cmd); + } + + /** + * Perform the given command in such a way that it does not run + * concurrently with the transport thread or other commands + * performed by invoking this method. This method will continue to + * work even after the transport thread has been shut down. + * + * @param cmd the command to perform + **/ + public void perform(Runnable cmd) { + if (Thread.currentThread() == thread) { + cmd.run(); + return; + } + if (!postCommand(cmd)) { + join(); + synchronized (thread) { + cmd.run(); + } + } + } + + /** + * Synchronize with the transport thread. This method will block + * until all commands issued before this method was invoked has + * completed. If the transport thread has been shut down (or is in + * the progress of being shut down) this method will instead wait + * for the transport thread to complete, since no more commands + * will be performed, and waiting would be forever. Invoking this + * method from the transport thread is not a good idea. + * + * @return this object, to enable chaining + **/ + public Transport sync() { + SyncCmd cmd = new SyncCmd(); + if (postCommand(cmd)) { + cmd.waitDone(); + } else { + join(); + } + return this; + } + + private void run() { + while (state == OPEN) { + + // perform I/O selection + try { + selector.select(100); + } catch (IOException e) { + log.log(Level.WARNING, "error during select", e); + } + + // handle internal events + handleEvents(); + + // handle I/O events + Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + Connection conn = (Connection) key.attachment(); + keys.remove(); + if (!handleIOEvents(conn, key)) { + handleCloseConnection(conn); + } + } + + // check scheduled tasks + scheduler.checkTasks(System.currentTimeMillis()); + } + connector.shutdown().waitDone(); + synchronized (this) { + state = CLOSED; + } + handleEvents(); + Iterator<SelectionKey> keys = selector.keys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + Connection conn = (Connection) key.attachment(); + handleCloseConnection(conn); + } + try { selector.close(); } catch (Exception e) {} + closer.shutdown().join(); + connector.exit().join(); + } + + /** + * Initiate controlled shutdown of the transport thread. + * + * @return this object, to enable chaining with join + **/ + public Transport shutdown() { + synchronized (this) { + if (state == OPEN) { + state = CLOSING; + selector.wakeup(); + } + } + return this; + } + + /** + * Wait for the transport thread to finish. + **/ + public void join() { + while (true) { + try { + thread.join(); + return; + } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/src/com/yahoo/jrt/Value.java b/jrt/src/com/yahoo/jrt/Value.java new file mode 100644 index 00000000000..7effb7f6e2a --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Value.java @@ -0,0 +1,290 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import com.yahoo.text.Utf8Array; + +import java.nio.ByteBuffer; + + +/** + * <p>A single value that may be either a parameter or a return value + * associated with a {@link Request}. Multiple values are bundled + * together with the {@link Values} class. The value type identifiers + * are defined by the RPC protocol. Each identifier matches the value + * of an ASCII character (listed after the Java class for the type).</p> + * + * <p>Most Value subclasses that are constructed from a Java array + * will not copy the array. This enables the same data to back + * multiple Value objects, but it also means that the application + * should be careful not to change the backing data under the feet of + * a Value object.</p> + **/ +public abstract class Value +{ + /** type identifier for {@link Int8Value} (b) **/ + public static final byte INT8 = 'b'; + + /** type identifier for {@link Int8Array} (B) **/ + public static final byte INT8_ARRAY = 'B'; + + /** type identifier for {@link Int16Value} (h) **/ + public static final byte INT16 = 'h'; + + /** type identifier for {@link Int16Array} (H) **/ + public static final byte INT16_ARRAY = 'H'; + + /** type identifier for {@link Int32Value} (i) **/ + public static final byte INT32 = 'i'; + + /** type identifier for {@link Int32Array} (I) **/ + public static final byte INT32_ARRAY = 'I'; + + /** type identifier for {@link Int64Value} (l) **/ + public static final byte INT64 = 'l'; + + /** type identifier for {@link Int64Array} (L) **/ + public static final byte INT64_ARRAY = 'L'; + + /** type identifier for {@link FloatValue} (f) **/ + public static final byte FLOAT = 'f'; + + /** type identifier for {@link FloatArray} (F) **/ + public static final byte FLOAT_ARRAY = 'F'; + + /** type identifier for {@link DoubleValue} (d) **/ + public static final byte DOUBLE = 'd'; + + /** type identifier for {@link DoubleArray} (D) **/ + public static final byte DOUBLE_ARRAY = 'D'; + + /** type identifier for {@link StringValue} (s) **/ + public static final byte STRING = 's'; + + /** type identifier for {@link StringArray} (S) **/ + public static final byte STRING_ARRAY = 'S'; + + /** type identifier for {@link DataValue} (x) **/ + public static final byte DATA = 'x'; + + /** type identifier for {@link DataArray} (X) **/ + public static final byte DATA_ARRAY = 'X'; + + /** + * Obtain the type identifier for this value + * + * @return type identifier + **/ + public abstract byte type(); + + /** + * Obtain the number of entries stored in this value. This is 1 + * for basic data types and the size of the array for array types. + * + * @return the number of entries stored in this value + **/ + public abstract int count(); + + /** + * Determine the number of bytes needed to store this value when + * encoded into a buffer + * + * @return number of bytes needed for encoding this value + **/ + abstract int bytes(); + + /** + * Encode this value into the given buffer + * + * @param dst where to encode this value + **/ + abstract void encode(ByteBuffer dst); + + /** + * Decode a value from the given buffer. This method also acts as + * a factory for value objects + * + * @return the decoded value + * @param type value type identifier + * @param src where the value is stored + * @throws IllegalArgumentException if the given type identifier is illegal + **/ + static Value decode(byte type, ByteBuffer src) { + switch (type) { + case INT8: return new Int8Value(src); + case INT8_ARRAY: return new Int8Array(src); + case INT16: return new Int16Value(src); + case INT16_ARRAY: return new Int16Array(src); + case INT32: return new Int32Value(src); + case INT32_ARRAY: return new Int32Array(src); + case INT64: return new Int64Value(src); + case INT64_ARRAY: return new Int64Array(src); + case FLOAT: return new FloatValue(src); + case FLOAT_ARRAY: return new FloatArray(src); + case DOUBLE: return new DoubleValue(src); + case DOUBLE_ARRAY: return new DoubleArray(src); + case STRING: return new StringValue(src); + case STRING_ARRAY: return new StringArray(src); + case DATA: return new DataValue(src); + case DATA_ARRAY: return new DataArray(src); + } + throw new IllegalArgumentException(); + } + + /** + * Interpret this value as a {@link Int8Value} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int8Value} + **/ + public byte asInt8() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link Int8Array} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int8Array} + **/ + public byte[] asInt8Array() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link Int16Value} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int16Value} + **/ + public short asInt16() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link Int16Array} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int16Array} + **/ + public short[] asInt16Array() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link Int32Value} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int32Value} + **/ + public int asInt32() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link Int32Array} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int32Array} + **/ + public int[] asInt32Array() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link Int64Value} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int64Value} + **/ + public long asInt64() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link Int64Array} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Int64Array} + **/ + public long[] asInt64Array() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link FloatValue} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link FloatValue} + **/ + public float asFloat() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link FloatArray} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link FloatArray} + **/ + public float[] asFloatArray() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link DoubleValue} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link DoubleValue} + **/ + public double asDouble() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link DoubleArray} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link DoubleArray} + **/ + public double[] asDoubleArray() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link StringValue} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link StringValue} + **/ + public String asString() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link StringValue} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link Utf8Array} + **/ + public Utf8Array asUtf8Array() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link StringArray} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link StringArray} + **/ + public String[] asStringArray() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link DataValue} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link DataValue} + **/ + public byte[] asData() { throw new ClassCastException(); } + + /** + * Interpret this value as a {@link DataArray} and return the + * contents as an appropriate Java type + * + * @return the value contained in this object as a Java type + * @throws ClassCastException if this is not a {@link DataArray} + **/ + public byte[][] asDataArray() { throw new ClassCastException(); } + + /** Force a proper toString */ + public abstract @Override String toString(); + +} diff --git a/jrt/src/com/yahoo/jrt/Values.java b/jrt/src/com/yahoo/jrt/Values.java new file mode 100644 index 00000000000..05a444e4c6d --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Values.java @@ -0,0 +1,140 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.util.ArrayList; +import java.util.List; +import java.nio.ByteBuffer; + + +/** + * A sequence of values used to represent parameters and return values + * associated with a {@link Request}. The individual values are + * represented by {@link Value} objects. + **/ +public class Values +{ + private List<Value> values = new ArrayList<Value>(16); + + /** + * Check whether the values stored in this object satisfies the + * given type string. + * + * @return true if this value sequence satisfies 'types' + * @param types type string + **/ + public boolean satisfies(String types) { + int off = 0; + int len = Math.min(types.length(), size()); + while (off < len && types.charAt(off) == get(off).type()) { + off++; + } + return ((off == types.length() && off == size()) || + (off + 1 == types.length() && types.charAt(off) == '*')); + } + + /** + * Create an empty sequence of values + **/ + public Values() { + } + + /** + * Create a sequence of values by decoding them from the given + * buffer + * + * @param src buffer containing a contained value sequence + **/ + Values(ByteBuffer src) { + decode(src); + } + + /** + * Add a value to the end of the sequence + * + * @return this, to enable chaining + * @param value the value to add + **/ + public Values add(Value value) { + values.add(value); + return this; + } + + /** + * Obtain a specific value from this sequence + * + * @return a value from this sequence + * @param idx the index of the value to obtain + **/ + public Value get(int idx) { + return values.get(idx); + } + + /** + * Obtain the number of values in this sequence + * + * @return the number of values in this sequence + **/ + public int size() { + return values.size(); + } + + /** + * Determine the number of bytes needed to store this value + * sequence when encoded into a buffer + * + * @return number of bytes needed for encoding this value sequence + **/ + int bytes() { + int bytes = 4 + values.size(); + for (int i = 0; i < values.size(); i++) { + bytes += get(i).bytes(); + } + return bytes; + } + + /** + * Encode this value sequence into the given buffer + * + * @param dst where to encode this value sequence + **/ + void encode(ByteBuffer dst) { + byte[] types = new byte[values.size()]; + for (int i = 0; i < types.length; i++) { + types[i] = get(i).type(); + } + dst.putInt(types.length); + dst.put(types); + for (int i = 0; i < types.length; i++) { + get(i).encode(dst); + } + } + + /** + * Decode a value sequence from the given buffer into this object + * + * @param src where the value sequence is stored + **/ + void decode(ByteBuffer src) { + values.clear(); + int cnt = src.getInt(); + byte[] types = new byte[cnt]; + src.get(types); + for (int i = 0; i < cnt; i++) { + values.add(Value.decode(types[i], src)); + } + } + + public @Override String toString() { + if (values.size()==0) return ""; + if (values.size()==1) return values.get(0).toString(); + StringBuffer buffer=new StringBuffer(); + for (int i=0; i<values.size(); i++) { + buffer.append(values.get(i).toString()); + if (i<values.size()-1) + buffer.append(","); + } + return buffer.toString(); + } + +} diff --git a/jrt/src/com/yahoo/jrt/package-info.java b/jrt/src/com/yahoo/jrt/package-info.java new file mode 100644 index 00000000000..2af54efd92f --- /dev/null +++ b/jrt/src/com/yahoo/jrt/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.jrt; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/BackOff.java b/jrt/src/com/yahoo/jrt/slobrok/api/BackOff.java new file mode 100644 index 00000000000..ead299a7a8d --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/api/BackOff.java @@ -0,0 +1,30 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + + +class BackOff implements BackOffPolicy +{ + private double time = 0.50; + + public void reset() { + time = 0.50; + } + + public double get() { + double ret = time; + if (time < 5.0) { + time += 0.5; + } else if (time < 10.0) { + time += 1.0; + } else if (time < 30.0) { + time += 5; + } else { + // max retry time is 30 seconds + } + return ret; + } + + public boolean shouldWarn(double t) { + return ((t > 8.1 && t < 9.9) || (t > 29.9)); + } +} diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/BackOffPolicy.java b/jrt/src/com/yahoo/jrt/slobrok/api/BackOffPolicy.java new file mode 100644 index 00000000000..4fcbe157c44 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/api/BackOffPolicy.java @@ -0,0 +1,38 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + +/** + * Interface used to control how fast the mirror and register classes + * will retry in case of errors. The reset method is used to indicate + * that all is ok. When things start failing, the get method will be + * invoked for each new attempt in order to get the appropriate delay + * between retries (typically increasing for each invocation). The + * shouldWarn method is used to ask if a certain delay returned from + * get should result in a warning being logged. When things get back + * to normal operation, the reset method is invoked to indicate that + * we are no longer in a failure state. + **/ +public interface BackOffPolicy +{ + /** + * Reset backoff logic. + **/ + public void reset(); + + /** + * Obtain the number of seconds to wait before the next + * attempt. + * + * @return delay in seconds + **/ + public double get(); + + /** + * Check if a certain delay should result in a warning being + * logged. + * + * @return true if we should log + * @param t current delay value + **/ + public boolean shouldWarn(double t); +} diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java new file mode 100644 index 00000000000..3662e6ad5b9 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/api/IMirror.java @@ -0,0 +1,34 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + +/** + * Defines an interface for the name server lookup. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + **/ +public interface IMirror { + + /** + * Obtain all the services matching a given pattern. + * + * The pattern is matched against all service names in the local mirror repository. A service name may contain '/' + * as a separator token. A pattern may contain '*' to match anything up to the next '/' (or the end of the + * name). This means that the pattern 'foo/<!-- slash-star -->*<!-- star-slash -->/baz' would match the service + * names 'foo/bar/baz' and 'foo/xyz/baz'. The pattern 'foo/b*' would match 'foo/bar', but neither 'foo/xyz' nor + * 'foo/bar/baz'. The pattern 'a*b' will never match anything. + * As a special case, a pattern can end in '**' to match the rest of a name including '/' separators. + * + * @return a list of all matching services, with corresponding connect specs + * @param pattern The pattern used for matching + **/ + public Mirror.Entry[] lookup(String pattern); + + /** + * Obtain the number of updates seen by this mirror. The value may wrap, but will never become 0 again. This can be + * used for name lookup optimization, because the results returned by lookup() will never change unless this number + * also changes. + * + * @return number of slobrok updates seen + **/ + public int updates(); +} diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java new file mode 100644 index 00000000000..5e62cb61b76 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java @@ -0,0 +1,337 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + + +import com.yahoo.jrt.*; + +import java.util.Arrays; +import java.util.Random; +import java.util.ArrayList; +import java.util.logging.Logger; +import java.util.logging.Level; + + +/** + * A Mirror object is used to keep track of the services registered + * with a slobrok cluster. + * + * Updates to the service repository are fetched in the + * background. Lookups against this object is done using an internal + * mirror of the service repository. + **/ +public class Mirror implements IMirror { + + private static Logger log = Logger.getLogger(Mirror.class.getName()); + + /** + * An Entry contains the name and connection spec for a single + * service. + **/ + public static final class Entry implements Comparable<Entry> { + private final String name; + private final String spec; + private final char [] nameArray; + + public Entry(String name, String spec) { + this.name = name; + this.spec = spec; + this.nameArray = name.toCharArray(); + } + + public boolean equals(Object rhs) { + if (rhs == null || !(rhs instanceof Entry)) { + return false; + } + Entry e = (Entry) rhs; + return (name.equals(e.name) && spec.equals(e.spec)); + } + + public int hashCode() { + return (name.hashCode() + spec.hashCode()); + } + + public int compareTo(Entry b) { + int diff = name.compareTo(b.name); + return diff != 0 + ? diff + : spec.compareTo(b.spec); + } + char [] getNameArray() { return nameArray; } + public String getName() { return name; } + public String getSpec() { return spec; } + } + + private Supervisor orb; + private SlobrokList slobroks; + private String currSlobrok; + private BackOffPolicy backOff; + private volatile int updates = 0; + private boolean reqDone = false; + private volatile Entry[] specs = new Entry[0]; + private int specsGen = 0; + private Task updateTask = null; + private RequestWaiter reqWait = null; + private Target target = null; + private Request req = null; + + /** + * Create a new MirrorAPI using the given Supervisor and slobrok + * connect specs. + * + * @param orb the Supervisor to use + * @param slobroks slobrok connect spec list + * @param bop custom backoff policy, mostly useful for testing + **/ + public Mirror(Supervisor orb, SlobrokList slobroks, BackOffPolicy bop) { + this.orb = orb; + this.slobroks = slobroks; + this.backOff = bop; + updateTask = orb.transport().createTask(new Runnable() { + public void run() { handleUpdate(); } + }); + reqWait = new RequestWaiter() { + public void handleRequestDone(Request req) { + reqDone = true; + updateTask.scheduleNow(); + } + }; + updateTask.scheduleNow(); + } + + /** + * Create a new MirrorAPI using the given Supervisor and slobrok + * connect specs. + * + * @param orb the Supervisor to use + * @param slobroks slobrok connect spec list + **/ + public Mirror(Supervisor orb, SlobrokList slobroks) { + this(orb, slobroks, new BackOff()); + } + + /** + * Shut down the Mirror. This will close any open connections and + * stop the regular mirror updates. + **/ + public void shutdown() { + updateTask.kill(); + orb.transport().perform(new Runnable() { + public void run() { handleShutdown(); } + }); + } + + @Override + public Entry[] lookup(String pattern) { + ArrayList<Entry> found = new ArrayList<Entry>(); + Entry [] e = specs; + char [] p = pattern.toCharArray(); + for (int i = 0; i < e.length; i++) { + if (match(e[i].getNameArray(), p)) { + found.add(e[i]); + } + } + return found.toArray(new Entry[found.size()]); + } + + @Override + public int updates() { + return updates; + } + + /** + * Ask if the MirrorAPI has got any useful information from the Slobrok. + * + * On application startup it is often useful to run the event loop for some time until this functions returns true + * (or if it never does, time out and tell the user there was no answer from any Service Location Broker). + * + * @return true if the MirrorAPI object has asked for updates from a Slobrok and got any answer back + **/ + public boolean ready() { + return (updates != 0); + } + + /** + * Returns whether this mirror is connected to the slobrok server at this + * time or not. + */ + public boolean connected() { + return (target != null); + } + + /** + * Match a single name against a pattern. + * A pattern can contain '*' to match until the next '/' separator, + * and end with '**' to match the rest of the name. + * Note that this isn't quite globbing, as there is no backtracking. + * + * @return true if the name matches the pattern + * @param name the name + * @param pattern the pattern + **/ + static boolean match(char [] name, char [] pattern) { + int ni = 0; + int pi = 0; + + while (ni < name.length && pi < pattern.length) { + if (name[ni] == pattern[pi]) { + ni++; + pi++; + } else if (pattern[pi] == '*') { + pi++; + while (ni < name.length && name[ni] != '/') { + ni++; + } + if (pi < pattern.length && pattern[pi] == '*') { + pi++; + ni = name.length; + } + } else { + return false; + } + } + while (pi < pattern.length && pattern[pi] == '*') { + pi++; + } + return ((ni == name.length) && (pi == pattern.length)); + } + + /** + * Invoked by the update task. + **/ + private void handleUpdate() { + if (reqDone) { + reqDone = false; + + if (req.errorCode() == ErrorCode.NONE && + req.returnValues().satisfies("SSi") && + req.returnValues().get(0).count() == req.returnValues().get(1).count()) + { + Values answer = req.returnValues(); + + if (specsGen != answer.get(2).asInt32()) { + + int numNames = answer.get(0).count(); + String[] n = answer.get(0).asStringArray(); + String[] s = answer.get(1).asStringArray(); + Entry[] newSpecs = new Entry[numNames]; + + for (int idx = 0; idx < numNames; idx++) { + newSpecs[idx] = new Entry(n[idx], s[idx]); + } + + specs = newSpecs; + + specsGen = answer.get(2).asInt32(); + int u = (updates + 1); + if (u == 0) { + u++; + } + updates = u; + } + backOff.reset(); + updateTask.schedule(0.1); // be nice + return; + } + if (!req.checkReturnTypes("iSSSi") + || (req.returnValues().get(2).count() != + req.returnValues().get(3).count())) + { + target.close(); + target = null; + updateTask.scheduleNow(); // try next slobrok + return; + } + + + Values answer = req.returnValues(); + + int diffFrom = answer.get(0).asInt32(); + int diffTo = answer.get(4).asInt32(); + + if (specsGen != diffTo) { + + int nRemoves = answer.get(1).count(); + String[] r = answer.get(1).asStringArray(); + + int numNames = answer.get(2).count(); + String[] n = answer.get(2).asStringArray(); + String[] s = answer.get(3).asStringArray(); + + + Entry[] newSpecs; + if (diffFrom == 0) { + newSpecs = new Entry[numNames]; + + for (int idx = 0; idx < numNames; idx++) { + newSpecs[idx] = new Entry(n[idx], s[idx]); + } + } else { + java.util.HashMap<String, Entry> map = new java.util.HashMap<String, Entry>(); + for (Entry e : specs) { + map.put(e.getName(), e); + } + for (String rem : r) { + map.remove(rem); + } + for (int idx = 0; idx < numNames; idx++) { + map.put(n[idx], new Entry(n[idx], s[idx])); + } + newSpecs = new Entry[map.size()]; + int idx = 0; + for (Entry e : map.values()) { + newSpecs[idx++] = e; + } + } + + specs = newSpecs; + + specsGen = diffTo; + int u = (updates + 1); + if (u == 0) { + u++; + } + updates = u; + } + backOff.reset(); + updateTask.schedule(0.1); // be nice + return; + } + if (target != null && ! slobroks.contains(currSlobrok)) { + target.close(); + target = null; + } + if (target == null) { + currSlobrok = slobroks.nextSlobrokSpec(); + if (currSlobrok == null) { + double delay = backOff.get(); + updateTask.schedule(delay); + if (backOff.shouldWarn(delay)) { + log.log(Level.INFO, "no location brokers available " + + "(retry in " + delay + " seconds) for: " + slobroks); + } + return; + } + target = orb.connect(new Spec(currSlobrok)); + specsGen = 0; + } + req = new Request("slobrok.incremental.fetch"); + req.parameters().add(new Int32Value(specsGen)); // gencnt + req.parameters().add(new Int32Value(5000)); // mstimeout + target.invokeAsync(req, 40.0, reqWait); + } + + /** + * Invoked from the transport thread, requested by the shutdown + * method. + **/ + private void handleShutdown() { + if (req != null) { + req.abort(); + req = null; + } + if (target != null) { + target.close(); + target = null; + } + } +} diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Register.java b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java new file mode 100644 index 00000000000..84720501ff8 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/api/Register.java @@ -0,0 +1,269 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + + +import com.yahoo.jrt.*; +import java.util.ArrayList; +import java.util.Random; +import java.util.logging.Logger; +import java.util.logging.Level; + + +/** + * A Register object is used to register and unregister services with + * a slobrok cluster. + * + * The register/unregister operations performed against this object + * are stored in a todo list that will be performed asynchronously + * against the slobrok cluster as soon as possible. + **/ +public class Register { + + private static Logger log = Logger.getLogger(Register.class.getName()); + + private Supervisor orb; + private SlobrokList slobroks; + private String currSlobrok; + private String mySpec; + private BackOffPolicy backOff; + private boolean reqDone = false; + private ArrayList<String> names = new ArrayList<String>(); + private ArrayList<String> pending = new ArrayList<String>(); + private ArrayList<String> unreg = new ArrayList<String>(); + private Task updateTask = null; + private RequestWaiter reqWait = null; + private Target target = null; + private Request req = null; + private Method m_list = null; + private Method m_unreg = null; + + /** + * Remove all instances of name from list. + **/ + private void discard(ArrayList<String> list, String name) { + ArrayList<String> tmp = new ArrayList<String>(); + tmp.add(name); + list.removeAll(tmp); + } + + /** + * Create a new Register using the given Supervisor, slobrok + * connect specs, hostname and port + * + * @param orb the Supervisor to use + * @param slobroks slobrok connect spec list + * @param spec the Spec representing hostname and port for this host + * @param bop custom backoff policy, mostly useful for testing + **/ + public Register(Supervisor orb, SlobrokList slobroks, Spec spec, BackOffPolicy bop) { + this.orb = orb; + this.slobroks = slobroks; + this.backOff = bop; + mySpec = spec.toString(); + updateTask = orb.transport().createTask(new Runnable() { + public void run() { handleUpdate(); } + }); + reqWait = new RequestWaiter() { + public void handleRequestDone(Request req) { + reqDone = true; + updateTask.scheduleNow(); + } + }; + m_list = new Method("slobrok.callback.listNamesServed", + "", "S", new MethodHandler() { + public void invoke(Request req) { + handleRpcList(req); + } + }) + .methodDesc("List rpcserver names") + .returnDesc(0, "names", + "The rpcserver names this server wants to serve"); + orb.addMethod(m_list); + m_unreg = new Method("slobrok.callback.notifyUnregistered", + "s", "", new MethodHandler() { + public void invoke(Request req) { + handleRpcUnreg(req); + } + }) + .methodDesc("Notify a server about removed registration") + .paramDesc(0, "name", "RpcServer name"); + orb.addMethod(m_unreg); + updateTask.scheduleNow(); + } + + /** + * Create a new Register using the given Supervisor, slobrok + * connect specs, hostname and port + * + * @param orb the Supervisor to use + * @param slobroks slobrok connect spec list + * @param spec the Spec representing hostname and port for this host + **/ + public Register(Supervisor orb, SlobrokList slobroks, Spec spec) { + this(orb, slobroks, spec, new BackOff()); + } + + /** + * Create a new Register using the given Supervisor, slobrok + * connect specs, hostname and port + * + * @param orb the Supervisor to use + * @param slobroks slobrok connect spec list + * @param myHost the hostname of this host + * @param myPort the port number we are listening to + **/ + public Register(Supervisor orb, SlobrokList slobroks, + String myHost, int myPort) { + this(orb, slobroks, new Spec(myHost, myPort)); + } + + + /** + * Shut down the Register. This will close any open connections + * and stop the regular re-registration. + **/ + public void shutdown() { + updateTask.kill(); + orb.transport().perform(new Runnable() { + public void run() { handleShutdown(); } + }); + } + + /** + * Register a service with the slobrok cluster. + * + * @param name service name + **/ + public synchronized void registerName(String name) { + if (names.indexOf(name) >= 0) { + return; + } + names.add(name); + pending.add(name); + discard(unreg, name); + updateTask.scheduleNow(); + } + + /** + * Unregister a service with the slobrok cluster + * + * @param name service name + **/ + public synchronized void unregisterName(String name) { + discard(names, name); + discard(pending, name); + unreg.add(name); + updateTask.scheduleNow(); + } + + /** + * Invoked by the update task. + **/ + private void handleUpdate() { + if (reqDone) { + reqDone = false; + if (req.isError()) { + if (req.errorCode() != ErrorCode.METHOD_FAILED) { + log.log(Level.FINE, "register failed: " + + req.errorMessage() + + " (code " + req.errorCode() + ")"); + target.close(); + target = null; + } else { + log.log(Level.WARNING, "register failed: " + + req.errorMessage() + + " (code " + req.errorCode() + ")"); + } + } else { + backOff.reset(); + } + req = null; + } + if (req != null) { + log.log(Level.FINEST, "req in progress"); + return; // current request still in progress + } + if (target != null && ! slobroks.contains(currSlobrok)) { + target.close(); + target = null; + } + if (target == null) { + currSlobrok = slobroks.nextSlobrokSpec(); + if (currSlobrok == null) { + double delay = backOff.get(); + updateTask.schedule(delay); + if (backOff.shouldWarn(delay)) { + log.log(Level.WARNING, "slobrok connection problems " + + "(retry in " + delay + " seconds) to: " + slobroks); + } else { + log.log(Level.FINE, "slobrok retry in " + delay + + " seconds"); + } + return; + } + target = orb.connect(new Spec(currSlobrok)); + synchronized (this) { + pending.clear(); + pending.addAll(names); + } + } + boolean rem = false; + boolean reg = false; + String name; + synchronized (this) { + if (unreg.size() > 0) { + name = unreg.remove(unreg.size() - 1); + rem = true; + } else if (pending.size() > 0) { + name = pending.remove(pending.size() - 1); + reg = true; + } else { + pending.addAll(names); + log.log(Level.FINE, "done, reschedule in 30s"); + updateTask.schedule(30.0); + return; + } + } + + if (rem) { + req = new Request("slobrok.unregisterRpcServer"); + req.parameters().add(new StringValue(name)); + log.log(Level.FINE, "unregister [" + name + "]"); + req.parameters().add(new StringValue(mySpec)); + target.invokeAsync(req, 35.0, reqWait); + } else if (reg) { + req = new Request("slobrok.registerRpcServer"); + req.parameters().add(new StringValue(name)); + log.log(Level.FINE, "register [" + name + "]"); + req.parameters().add(new StringValue(mySpec)); + target.invokeAsync(req, 35.0, reqWait); + } + } + + private synchronized void handleRpcList(Request req) { + Values dst = req.returnValues(); + dst.add(new StringArray(names.toArray(new String[names.size()]))); + } + + private void handleRpcUnreg(Request req) { + log.log(Level.WARNING, "unregistered name " + + req.parameters().get(0).asString()); + } + + /** + * Invoked from the transport thread, requested by the shutdown + * method. + **/ + private void handleShutdown() { + orb.removeMethod(m_list); + orb.removeMethod(m_unreg); + if (req != null) { + req.abort(); + req = null; + } + if (target != null) { + target.close(); + target = null; + } + } +} diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/SlobrokList.java b/jrt/src/com/yahoo/jrt/slobrok/api/SlobrokList.java new file mode 100644 index 00000000000..43c1c41faa5 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/api/SlobrokList.java @@ -0,0 +1,94 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + +import java.util.Arrays; +import java.util.Random; + +public class SlobrokList { + + private final Internal internal; + private String[] slobroks; + private int idx = 0; + + public SlobrokList() { + this.internal = new Internal(); + } + + public SlobrokList(SlobrokList sibling) { + this.internal = sibling.internal; + } + + private void checkUpdate() { + synchronized (internal) { + if (slobroks != internal.slobroks) { + slobroks = internal.slobroks; + idx = 0; + } + } + } + + + public String nextSlobrokSpec() { + checkUpdate(); + if (idx < slobroks.length) { + return slobroks[idx++]; + } + idx = 0; + return null; + } + + public void setup(String[] slobroks) { + internal.setup(slobroks); + } + + public int length() { + return internal.length(); + } + + public boolean contains(String slobrok) { + checkUpdate(); + for (String s : slobroks) { + if (s == slobrok) return true; + } + return false; + } + + @Override + public String toString() { + return internal.toString(); + } + + private static class Internal { + + String[] slobroks = new String[0]; + + void setup(String[] slobroks) { + String[] next = new String[slobroks.length]; + for (int i = 0; i < slobroks.length; i++) { + next[i] = slobroks[i]; + } + Random rnd = new Random(); + for (int i = 0; i + 1 < next.length; i++) { + int lim = next.length - i; + int x = rnd.nextInt(lim); + if (x != 0) { + String tmp = next[i]; + next[i] = next[i+x]; + next[i+x] = tmp; + } + } + synchronized (this) { + this.slobroks = next; + } + } + + synchronized int length() { + return slobroks.length; + } + + @Override + public synchronized String toString() { + return Arrays.toString(slobroks); + } + } +} diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/package-info.java b/jrt/src/com/yahoo/jrt/slobrok/api/package-info.java new file mode 100644 index 00000000000..0c9bc4d7407 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/api/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.jrt.slobrok.api; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jrt/src/com/yahoo/jrt/slobrok/package-info.java b/jrt/src/com/yahoo/jrt/slobrok/package-info.java new file mode 100644 index 00000000000..a1d921df174 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.jrt.slobrok; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java new file mode 100644 index 00000000000..085489897b5 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java @@ -0,0 +1,270 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.server; + +import com.yahoo.jrt.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class Slobrok { + + private class RegisterCallback implements RequestWaiter { + + Request registerReq; + String name; + String spec; + Target target; + + public RegisterCallback(Request req, String name, String spec) { + req.detach(); + registerReq = req; + this.name = name; + this.spec = spec; + target = orb.connect(new Spec(spec)); + Request cbReq = new Request("slobrok.callback.listNamesServed"); + target.invokeAsync(cbReq, 5.0, this); + } + + public void handleRequestDone(Request req) { + if (!req.checkReturnTypes("S")) { + registerReq.setError(ErrorCode.METHOD_FAILED, "error during register callback: " + + req.errorMessage()); + registerReq.returnRequest(); + target.close(); + return; + } + String[] names = req.returnValues().get(0).asStringArray(); + boolean found = false; + for (String n : names) { + if (n.equals(name)) { + found = true; + } + } + if (!found) { + registerReq.setError(ErrorCode.METHOD_FAILED, "register failed: " + + "served names does not contain name"); + registerReq.returnRequest(); + target.close(); + return; + } + handleRegisterCallbackDone(registerReq, name, spec, target); + } + } + + private class FetchMirror implements Runnable { + public final Request req; + public final Task task; + + public FetchMirror(Request req, int timeout) { + req.detach(); + this.req = req; + task = orb.transport().createTask(this); + task.schedule(((double)timeout)/1000.0); + } + public void run() { // timeout + handleFetchMirrorTimeout(this); + } + } + + private class TargetMonitor implements TargetWatcher { + public void notifyTargetInvalid(Target target) { + handleTargetDown(target); + } + } + + Supervisor orb; + Acceptor listener; + HashMap<String,String> services = new HashMap<String,String>(); + ArrayList<FetchMirror> pendingFetch = new ArrayList<FetchMirror>(); + HashMap<String,Target> targets = new HashMap<String,Target>(); + TargetMonitor monitor = new TargetMonitor(); + int gencnt = 1; + + public String lookup(String name) { + return services.get(name); + } + + + public Slobrok(int port) throws ListenFailedException { + orb = new Supervisor(new Transport()); + registerMethods(); + try { + listener = orb.listen(new Spec(port)); + } catch (ListenFailedException e) { + orb.transport().shutdown().join(); + throw e; + } + } + + public Slobrok() throws ListenFailedException { + this(0); + } + + public int port() { + return listener.port(); + } + + public void stop() { + orb.transport().shutdown().join(); + listener.shutdown().join(); + } + + public String configId() { + return "raw:slobrok[1]\n" + + "slobrok[0].connectionspec \"" + new Spec("localhost", listener.port()).toString() + "\"\n"; + } + + private void updated() { + gencnt++; + if (gencnt == 0) { + gencnt++; + } + handleFetchMirrorFlush(); + } + + private void handleRegisterCallbackDone(Request req, + String name, String spec, + Target target) + { + String stored = services.get(name); + if (stored != null) { // too late + if (!stored.equals(spec)) { + req.setError(ErrorCode.METHOD_FAILED, + "service '" + name + "' registered with another spec"); + } + req.returnRequest(); + target.close(); + return; + } + target.setContext(name); + target.addWatcher(monitor); + services.put(name, spec); + targets.put(name, target); + req.returnRequest(); + updated(); + } + + private void handleTargetDown(Target target) { + String name = (String) target.getContext(); + targets.remove(name); + services.remove(name); + updated(); + } + + private void dumpServices(Request req) { + ArrayList<String> names = new ArrayList<String>(); + ArrayList<String> specs = new ArrayList<String>(); + for (Map.Entry<String,String> entry : services.entrySet()) { + names.add(entry.getKey()); + specs.add(entry.getValue()); + } + req.returnValues().add(new StringArray(names.toArray(new String[names.size()]))); + req.returnValues().add(new StringArray(specs.toArray(new String[specs.size()]))); + req.returnValues().add(new Int32Value(gencnt)); + } + + private void handleFetchMirrorTimeout(FetchMirror fetch) { + pendingFetch.remove(fetch); + fetch.req.returnValues().add(new StringArray(new String[0])); + fetch.req.returnValues().add(new StringArray(new String[0])); + fetch.req.returnValues().add(new Int32Value(gencnt)); + fetch.req.returnRequest(); + } + + private void handleFetchMirrorFlush() { + for (FetchMirror fetch : pendingFetch) { + fetch.task.kill(); + dumpServices(fetch.req); + fetch.req.returnRequest(); + } + pendingFetch.clear(); + } + + private void registerMethods() { + orb.addMethod(new Method("slobrok.registerRpcServer", "ss", "", + new MethodHandler() { + public void invoke(Request req) { + rpc_register(req); + } + }) + .methodDesc("Register a rpcserver") + .paramDesc(0, "name", "RpcServer name") + .paramDesc(1, "spec", "The connection specification")); + orb.addMethod(new Method("slobrok.unregisterRpcServer", "ss", "", + new MethodHandler() { + public void invoke(Request req) { + rpc_unregister(req); + } + }) + .methodDesc("Unregister a rpcserver") + .paramDesc(0, "name", "RpcServer name") + .paramDesc(1, "spec", "The connection specification")); + + orb.addMethod(new Method("slobrok.incremental.fetch", "ii", "iSSSi", + new MethodHandler() { + public void invoke(Request req) { + rpc_fetchIncremental(req); + } + }) + .methodDesc("Fetch or update mirror of name to spec map") + .paramDesc(0, "gencnt", "generation already known by client") + .paramDesc(1, "timeout", "How many milliseconds to wait for changes" + + "before returning if nothing has changed (max=10000)") + .returnDesc(0, "oldgen", "diff from generation already known by client") + .returnDesc(1, "removed", "Array of RpcServer names to remove") + .returnDesc(2, "names", "Array of RpcServer names with new values") + .returnDesc(3, "specs", "Array of connection specifications (same order)") + .returnDesc(4, "newgen", "Generation count for new version of the map")); + } + + private void rpc_register(Request req) { + String name = req.parameters().get(0).asString(); + String spec = req.parameters().get(1).asString(); + String stored = services.get(name); + if (stored == null) { + new RegisterCallback(req, name, spec); + } else { + if (stored.equals(spec)) { + // ok, already stored + } else { + req.setError(ErrorCode.METHOD_FAILED, + "service '" + name + "' registered with another spec"); + } + } + } + + private void rpc_unregister(Request req) { + String name = req.parameters().get(0).asString(); + String spec = req.parameters().get(1).asString(); + String stored = services.get(name); + if (stored != null) { + if (stored.equals(spec)) { + Target target = targets.remove(name); + target.removeWatcher(monitor); + services.remove(name); + target.close(); + updated(); + } else { + req.setError(ErrorCode.METHOD_FAILED, + "service '" + name + "' registered with another spec"); + } + } + } + + private void rpc_fetchIncremental(Request req) { + int gencnt = req.parameters().get(0).asInt32(); + int timeout = req.parameters().get(1).asInt32(); + + // for now, always make "full diff" from generation 0 + req.returnValues().add(new Int32Value(0)); + req.returnValues().add(new StringArray(new String[0])); + + if (gencnt == this.gencnt) { + pendingFetch.add(new FetchMirror(req, timeout)); + } else { + dumpServices(req); + } + } + +} diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/package-info.java b/jrt/src/com/yahoo/jrt/slobrok/server/package-info.java new file mode 100644 index 00000000000..2e77e20852b --- /dev/null +++ b/jrt/src/com/yahoo/jrt/slobrok/server/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.jrt.slobrok.server; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java new file mode 100644 index 00000000000..1742b365697 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java @@ -0,0 +1,107 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.tool; + +import com.yahoo.jrt.*; + +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; + +/** + * A generic rpc invoker for use by command line tools + * + * @author <a href="bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class RpcInvoker { + + private Value getArgument(Request request,String parameter) { + if (parameter.length()<=1 || parameter.charAt(1)!=':') + return new StringValue(parameter); + + String value=parameter.substring(2); + switch (parameter.charAt(0)) { + case 'b': + return new Int8Value(Byte.parseByte(value)); + case 'h': + return new Int16Value(Short.parseShort(value)); + case 'i': + return new Int32Value(Integer.parseInt(value)); + case 'l': + return new Int64Value(Long.parseLong(value)); + case 'f': + return new FloatValue(Float.parseFloat(value)); + case 'd': + return new DoubleValue(Double.parseDouble(value)); + case 's': + return new StringValue(value); + } + + throw new IllegalArgumentException("The first letter in '" + parameter + "' must be a type argument. " + + "There is no jrt type identified by '" + parameter.charAt(0) + "'"); + } + + protected Request createRequest(String method,List<String> arguments) { + Request request=new Request(method); + if (arguments!=null) { + for (String argument : arguments) + request.parameters().add(getArgument(request,argument)); + } + return request; + } + + /** + * Invokes a rpc method without throwing an exception + * + * @param connectspec the rpc server connection spec + * @param method the name of the method to invoke + * @param arguments the argument to the method, or null or an empty list if there are no arguments + */ + public void invoke(String connectspec,String method, List<String> arguments) { + Supervisor supervisor=null; + Target target=null; + + try { + if (connectspec.indexOf('/')<0) + connectspec="tcp/" + connectspec; + + supervisor=new Supervisor(new Transport()); + target = supervisor.connect(new Spec(connectspec)); + Request request=createRequest(method,arguments); + target.invokeSync(request,10.0); + if (request.isError()) { + System.err.println("error(" + request.errorCode() + "): " + request.errorMessage()); + return; + } + Values returned=request.returnValues(); + for (int i=0; i<returned.size(); i++) { + System.out.println(returned.get(i)); + } + } + finally { + if (target!=null) + target.close(); + if (supervisor!=null) + supervisor.transport().shutdown().join(); + } + } + + public static void main(String[] args) { + if (args.length<1) { + System.err.println("usage: invoke [-h <connectspec>] <method> [arguments]"); + System.err.println(" Connectspec: This is on the form hostname:port, or tcp/hostname:port"); + System.err.println(" if omitted, localhost:8086 is used"); + System.err.println(" Arguments: each argument must be a string or on the form <type>:<value>"); + System.err.println(" supported types: {'b','h','i','l','f','d','s'}"); + System.exit(0); + } + List<String> arguments=new ArrayList<String>(Arrays.asList(args)); + String connectSpec="localhost:8086"; + if ("-h".equals(arguments.get(0)) && arguments.size()>=3) { + arguments.remove(0); // Consume -h + connectSpec=arguments.remove(0); + } + String method=arguments.remove(0); + new RpcInvoker().invoke(connectSpec,method,arguments); + } + +} diff --git a/jrt/src/com/yahoo/jrt/tool/package-info.java b/jrt/src/com/yahoo/jrt/tool/package-info.java new file mode 100644 index 00000000000..ccb5473362c --- /dev/null +++ b/jrt/src/com/yahoo/jrt/tool/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.jrt.tool; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jrt/tests/com/yahoo/jrt/AbortTest.java b/jrt/tests/com/yahoo/jrt/AbortTest.java new file mode 100644 index 00000000000..356203be4be --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/AbortTest.java @@ -0,0 +1,76 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class AbortTest extends junit.framework.TestCase { + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + Test.Barrier barrier; + + public AbortTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + server.addMethod(new Method("test", "i", "i", this, "rpc_test")); + barrier = new Test.Barrier(); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void rpc_test(Request req) { + barrier.waitFor(); + int value = req.parameters().get(0).asInt32(); + req.returnValues().add(new Int32Value(value)); + } + + public void testAbort() { + Test.Waiter w = new Test.Waiter(); + Request req = new Request("test"); + req.parameters().add(new Int32Value(20)); + target.invokeAsync(req, 5.0, w); + req.abort(); + barrier.breakIt(); + w.waitDone(); + assertTrue(req.isError()); + assertEquals(ErrorCode.ABORT, req.errorCode()); + assertEquals(0, req.returnValues().size()); + + Request req2 = new Request("test"); + req2.parameters().add(new Int32Value(30)); + target.invokeSync(req2, 5.0); + assertTrue(!req2.isError()); + assertEquals(1, req2.returnValues().size()); + assertEquals(30, req2.returnValues().get(0).asInt32()); + + req2.abort(); + assertTrue(!req2.isError()); + assertEquals(1, req2.returnValues().size()); + assertEquals(30, req2.returnValues().get(0).asInt32()); + + assertTrue(req.isError()); + assertEquals(ErrorCode.ABORT, req.errorCode()); + assertEquals(0, req.returnValues().size()); + } + + public void testBogusAbort() { + Request req = new Request("test"); + req.parameters().add(new Int32Value(40)); + try { + req.abort(); + assertTrue(false); + } catch (IllegalStateException e) {} + } +} diff --git a/jrt/tests/com/yahoo/jrt/BackTargetTest.java b/jrt/tests/com/yahoo/jrt/BackTargetTest.java new file mode 100644 index 00000000000..056f9ed60f0 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/BackTargetTest.java @@ -0,0 +1,110 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class BackTargetTest extends junit.framework.TestCase { + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + int serverValue; + int clientValue; + Target serverBackTarget; + Target clientBackTarget; + + public BackTargetTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + + server.addMethod(new Method("inc", "", "", this, "server_inc")); + server.addMethod(new Method("sample_target", "", "", this, + "server_sample_target")); + server.addMethod(new Method("back_inc", "", "", this, "back_inc")); + + client.addMethod(new Method("inc", "", "", this, "client_inc")); + client.addMethod(new Method("sample_target", "", "", this, + "client_sample_target")); + client.addMethod(new Method("back_inc", "", "", this, "back_inc")); + + serverValue = 0; + clientValue = 0; + serverBackTarget = null; + clientBackTarget = null; + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void server_inc(Request req) { + serverValue++; + } + + public void server_sample_target(Request req) { + serverBackTarget = req.target(); + } + + public void client_inc(Request req) { + clientValue++; + } + + public void client_sample_target(Request req) { + clientBackTarget = req.target(); + } + + public void back_inc(Request req) { + Target t = req.target(); + t.invokeVoid(new Request("inc")); + } + + private void checkValues(int server, int client) { + assertEquals(server, serverValue); + assertEquals(client, clientValue); + } + + private void checkTargets(boolean server, boolean client) { + assertTrue(server == (serverBackTarget != null)); + assertTrue(client == (clientBackTarget != null)); + } + + public void testBackTarget() { + checkTargets(false, false); + target.invokeSync(new Request("sample_target"), 5.0); + checkTargets(true, false); + serverBackTarget.invokeSync(new Request("sample_target"), 5.0); + checkTargets(true, true); + + checkValues(0, 0); + target.invokeSync(new Request("inc"), 5.0); + checkValues(1, 0); + serverBackTarget.invokeSync(new Request("inc"), 5.0); + checkValues(1, 1); + clientBackTarget.invokeSync(new Request("inc"), 5.0); + checkValues(2, 1); + + target.invokeSync(new Request("back_inc"), 5.0); + checkValues(2, 2); + serverBackTarget.invokeSync(new Request("back_inc"), 5.0); + checkValues(3, 2); + clientBackTarget.invokeSync(new Request("back_inc"), 5.0); + checkValues(3, 3); + } + + public void testBogusBackTarget() { + Request req = new Request("inc"); + try { + req.target(); + assertTrue(false); + } catch (IllegalStateException e) {} + } +} diff --git a/jrt/tests/com/yahoo/jrt/BufferTest.java b/jrt/tests/com/yahoo/jrt/BufferTest.java new file mode 100644 index 00000000000..a4a594b69e5 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/BufferTest.java @@ -0,0 +1,265 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +public class BufferTest extends junit.framework.TestCase { + + public BufferTest(String name) { + super(name); + } + + + public void testBuffer() { + + int size = Buffer.MAX_IO + (Buffer.MAX_IO / 10); + Buffer buf = new Buffer(1024); + ByteBuffer b = null; + + byte[] x = new byte[size]; + byte[] y = new byte[size]; + + Arrays.fill(x, (byte) 10); + Arrays.fill(y, (byte) 55); + + assertEquals(buf.bytes(), 0); + assertFalse(Arrays.equals(x, y)); + + b = buf.getWritable(512); + assertEquals(buf.bytes(), 0); + assertTrue(b.remaining() >= 512); + b.put((byte)42); + assertEquals(buf.bytes(), 1); + + b = buf.getReadable(); + assertEquals(buf.bytes(), 1); + assertEquals(b.remaining(), 1); + assertEquals(b.get(), 42); + assertEquals(buf.bytes(), 0); + assertEquals(b.remaining(), 0); + + b = buf.getWritable(512); + assertTrue(b.remaining() >= 512); + assertEquals(buf.bytes(), 0); + b.put((byte)42); + assertEquals(buf.bytes(), 1); + + b = buf.getWritable(size); + assertTrue(b.remaining() >= size); + assertEquals(buf.bytes(), 1); + b.put(x); + assertEquals(buf.bytes(), size + 1); + + b = buf.getReadable(); + assertEquals(buf.bytes(), size + 1); + assertEquals(b.remaining(), size + 1); + assertEquals(b.get(), 42); + assertEquals(buf.bytes(), size); + assertEquals(b.remaining(), size); + b.get(y); + assertEquals(buf.bytes(), 0); + assertEquals(b.remaining(), 0); + assertTrue(Arrays.equals(x, y)); + } + + public void testBufferCompact() { + Buffer buf = new Buffer(10); + buf.getWritable(3).put((byte)10).put((byte)20).put((byte)30); + assertEquals(10, buf.getReadable().capacity()); + buf.getWritable(3).put((byte)11).put((byte)21).put((byte)31); + buf.getWritable(3).put((byte)12).put((byte)22).put((byte)32); + { + ByteBuffer bb = buf.getReadable(); + assertEquals(10, bb.get()); + assertEquals(20, bb.get()); + assertEquals(30, bb.get()); + } + { + ByteBuffer bb = buf.getReadable(); + assertEquals(11, bb.get()); + assertEquals(21, bb.get()); + assertEquals(31, bb.get()); + } + buf.getWritable(3).put((byte)13).put((byte)23).put((byte)33); + assertEquals(10, buf.getReadable().capacity()); + { + ByteBuffer bb = buf.getReadable(); + assertEquals(12, bb.get()); + assertEquals(22, bb.get()); + assertEquals(32, bb.get()); + } + { + ByteBuffer bb = buf.getReadable(); + assertEquals(13, bb.get()); + assertEquals(23, bb.get()); + assertEquals(33, bb.get()); + } + { + ByteBuffer bb = buf.getReadable(); + assertEquals(bb.position(), bb.limit()); + } + } + + public void testBufferMax() { + + int size = Buffer.MAX_IO + (Buffer.MAX_IO / 10); + Buffer buf = new Buffer(1024); + ByteBuffer b = null; + + byte[] x = new byte[size]; + byte[] y = new byte[size]; + + Arrays.fill(x, (byte) 10); + Arrays.fill(y, (byte) 55); + + assertEquals(buf.bytes(), 0); + assertFalse(Arrays.equals(x, y)); + + b = buf.getChannelWritable(size); + assertEquals(b.remaining(), Buffer.MAX_IO); + assertTrue(b.remaining() < size); + assertEquals(buf.bytes(), 0); + b.put(x, 0, Buffer.MAX_IO); + assertEquals(buf.bytes(), Buffer.MAX_IO); + assertEquals(b.remaining(), 0); + + b = buf.getChannelWritable(size - Buffer.MAX_IO); + assertTrue(b.remaining() >= size - Buffer.MAX_IO); + assertEquals(buf.bytes(), Buffer.MAX_IO); + b.put(x, Buffer.MAX_IO, x.length - Buffer.MAX_IO); + assertEquals(buf.bytes(), size); + + b = buf.getChannelReadable(); + assertEquals(buf.bytes(), size); + + b = buf.getChannelWritable(512); + assertEquals(buf.bytes(), size); + b.put((byte)42); + assertEquals(buf.bytes(), size + 1); + + b = buf.getChannelReadable(); + assertEquals(buf.bytes(), size + 1); + assertEquals(b.remaining(), Buffer.MAX_IO); + b.get(y, 0, Buffer.MAX_IO); + assertEquals(buf.bytes(), size - Buffer.MAX_IO + 1); + + b = buf.getChannelReadable(); + assertEquals(buf.bytes(), size - Buffer.MAX_IO + 1); + assertEquals(b.remaining(), size - Buffer.MAX_IO + 1); + b.get(y, Buffer.MAX_IO, y.length - Buffer.MAX_IO); + assertEquals(buf.bytes(), 1); + assertEquals(b.remaining(), 1); + assertEquals(b.get(), 42); + assertEquals(buf.bytes(), 0); + assertEquals(b.remaining(), 0); + + assertTrue(Arrays.equals(x, y)); + } + + public void testBufferShrink() { + Buffer buf = new Buffer(500); + ByteBuffer b = null; + { + b = buf.getWritable(10); + assertEquals(500, b.capacity()); + b.put((byte)10); + b.put((byte)20); + b.put((byte)30); + b.put((byte)40); + b.put((byte)50); + + assertTrue(buf.shrink(400)); + b = buf.getReadable(); + assertEquals(400, b.capacity()); + assertEquals(5, b.remaining()); + assertEquals(10, b.get()); + assertEquals(20, b.get()); + assertEquals(30, b.get()); + assertEquals(40, b.get()); + assertEquals(50, b.get()); + } + { + b = buf.getWritable(10); + assertEquals(400, b.capacity()); + b.put((byte)10); + b.put((byte)20); + b.put((byte)30); + b.put((byte)40); + b.put((byte)50); + + assertTrue(buf.shrink(300)); + b = buf.getReadable(); + assertEquals(300, b.capacity()); + assertEquals(5, b.remaining()); + assertEquals(10, b.get()); + assertEquals(20, b.get()); + assertEquals(30, b.get()); + assertEquals(40, b.get()); + assertEquals(50, b.get()); + } + { + b = buf.getWritable(10); + assertEquals(300, b.capacity()); + b.put((byte)10); + b.put((byte)20); + b.put((byte)30); + b.put((byte)40); + b.put((byte)50); + + b = buf.getReadable(); + assertTrue(buf.shrink(200)); + b = buf.getReadable(); + assertEquals(200, b.capacity()); + assertEquals(5, b.remaining()); + assertEquals(10, b.get()); + assertEquals(20, b.get()); + assertEquals(30, b.get()); + assertEquals(40, b.get()); + assertEquals(50, b.get()); + } + { + b = buf.getWritable(10); + assertEquals(200, b.capacity()); + b.put((byte)10); + b.put((byte)20); + b.put((byte)30); + b.put((byte)40); + b.put((byte)50); + + b = buf.getReadable(); + assertFalse(buf.shrink(500)); + b = buf.getReadable(); + assertEquals(200, b.capacity()); + assertEquals(5, b.remaining()); + assertEquals(10, b.get()); + assertEquals(20, b.get()); + assertEquals(30, b.get()); + assertEquals(40, b.get()); + assertEquals(50, b.get()); + } + { + b = buf.getWritable(10); + assertEquals(200, b.capacity()); + b.put((byte)10); + b.put((byte)20); + b.put((byte)30); + b.put((byte)40); + b.put((byte)50); + + b = buf.getReadable(); + assertTrue(buf.shrink(5)); + assertFalse(buf.shrink(4)); + b = buf.getReadable(); + assertEquals(5, b.capacity()); + assertEquals(5, b.remaining()); + assertEquals(10, b.get()); + assertEquals(20, b.get()); + assertEquals(30, b.get()); + assertEquals(40, b.get()); + assertEquals(50, b.get()); + } + } +} diff --git a/jrt/tests/com/yahoo/jrt/ConnectTest.java b/jrt/tests/com/yahoo/jrt/ConnectTest.java new file mode 100644 index 00000000000..9597927b06a --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/ConnectTest.java @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class ConnectTest extends junit.framework.TestCase { + + public ConnectTest(String name) { + super(name); + } + + public void testConnect() throws ListenFailedException { + Test.Orb server = new Test.Orb(new Transport()); + Test.Orb client = new Test.Orb(new Transport()); + Acceptor acceptor = server.listen(new Spec(Test.PORT)); + + assertTrue(server.checkLifeCounts(0, 0)); + assertTrue(client.checkLifeCounts(0, 0)); + + Target target = client.connect(new Spec("localhost", Test.PORT)); + + for (int i = 0; i < 100; i++) { + if (client.initCount == 1 && server.initCount == 1) { + break; + } + try { Thread.sleep(100); } catch (InterruptedException e) {} + } + + assertTrue(server.checkLifeCounts(1, 0)); + assertTrue(client.checkLifeCounts(1, 0)); + + target.close(); + + for (int i = 0; i < 100; i++) { + if (client.finiCount == 1 && server.finiCount == 1) { + break; + } + try { Thread.sleep(100); } catch (InterruptedException e) {} + } + + assertTrue(server.checkLifeCounts(1, 1)); + assertTrue(client.checkLifeCounts(1, 1)); + + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } +} diff --git a/jrt/tests/com/yahoo/jrt/DetachTest.java b/jrt/tests/com/yahoo/jrt/DetachTest.java new file mode 100644 index 00000000000..10f2485ae6d --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/DetachTest.java @@ -0,0 +1,136 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class DetachTest extends junit.framework.TestCase { + + Test.Orb server; + Acceptor acceptor; + Test.Orb client; + Target target; + Test.Receptor receptor; + Test.Barrier barrier; + + public DetachTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Test.Orb(new Transport()); + client = new Test.Orb(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + + server.addMethod(new Method("d_inc", "i", "i", this, + "rpc_detach_inc")); + server.addMethod(new Method("d_inc_r", "i", "i", this, + "rpc_detach_inc_return")); + server.addMethod(new Method("inc_b", "i", "i", this, + "rpc_inc_barrier")); + receptor = new Test.Receptor(); + barrier = new Test.Barrier(); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + Request detached = null; + + public void rpc_detach_inc(Request req) { + req.detach(); + int value = req.parameters().get(0).asInt32(); + req.returnValues().add(new Int32Value(value + 1)); + detached = req; + } + + public void rpc_detach_inc_return(Request req) { + req.detach(); + int value = req.parameters().get(0).asInt32(); + req.returnValues().add(new Int32Value(value + 1)); + req.returnRequest(); + } + + public void rpc_inc_barrier(Request req) { + int value = req.parameters().get(0).asInt32(); + req.returnValues().add(new Int32Value(value + 1)); + receptor.put(req); + barrier.waitFor(); + } + + public void testDetach() { + Test.Waiter w1 = new Test.Waiter(); + Request req1 = new Request("d_inc"); + req1.parameters().add(new Int32Value(50)); + target.invokeAsync(req1, 5.0, w1); + + Request req2 = new Request("d_inc_r"); + req2.parameters().add(new Int32Value(60)); + target.invokeSync(req2, 5.0); + + assertTrue(!req2.isError()); + assertEquals(1, req2.returnValues().size()); + assertEquals(61, req2.returnValues().get(0).asInt32()); + + assertTrue(detached != null); + assertTrue(!w1.isDone()); + assertTrue(server.checkReadCounts(2, 0, 0)); + assertTrue(server.checkWriteCounts(0, 1, 0)); + assertTrue(client.checkReadCounts(0, 1, 0)); + assertTrue(client.checkWriteCounts(2, 0, 0)); + assertTrue(server.readBytes == client.writeBytes); + assertTrue(client.readBytes == server.writeBytes); + + detached.returnRequest(); + try { + detached.returnRequest(); + assertTrue(false); + } catch (IllegalStateException e) {} + detached = null; + w1.waitDone(); + + assertTrue(!req1.isError()); + assertEquals(1, req1.returnValues().size()); + assertEquals(51, req1.returnValues().get(0).asInt32()); + assertTrue(server.checkReadCounts(2, 0, 0)); + assertTrue(server.checkWriteCounts(0, 2, 0)); + assertTrue(client.checkReadCounts(0, 2, 0)); + assertTrue(client.checkWriteCounts(2, 0, 0)); + assertTrue(server.readBytes == client.writeBytes); + assertTrue(client.readBytes == server.writeBytes); + } + + public void testBogusDetach() { + Request req1 = new Request("inc_b"); + req1.parameters().add(new Int32Value(200)); + try { + req1.detach(); + assertTrue(false); + } catch (IllegalStateException e) {} + + Request req2 = new Request("inc_b"); + req2.parameters().add(new Int32Value(200)); + try { + req2.returnRequest(); + assertTrue(false); + } catch (IllegalStateException e) {} + + Test.Waiter w = new Test.Waiter(); + Request req3 = new Request("inc_b"); + req3.parameters().add(new Int32Value(100)); + target.invokeAsync(req3, 5.0, w); + Request blocked = (Request) receptor.get(); + try { + blocked.returnRequest(); + assertTrue(false); + } catch (IllegalStateException e) {} + barrier.breakIt(); + w.waitDone(); + assertTrue(!req3.isError()); + assertEquals(1, req3.returnValues().size()); + assertEquals(101, req3.returnValues().get(0).asInt32()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/EchoTest.java b/jrt/tests/com/yahoo/jrt/EchoTest.java new file mode 100644 index 00000000000..a858ea76925 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/EchoTest.java @@ -0,0 +1,86 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class EchoTest extends junit.framework.TestCase { + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + Values refValues; + + public EchoTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + server.addMethod(new Method("echo", "*", "*", this, "rpc_echo")); + refValues = new Values(); + byte[] dataValue = { 1, 2, 3, 4 }; + byte[] int8Array = { 1, 2, 3, 4 }; + short[] int16Array = { 2, 4, 6, 8 }; + int[] int32Array = { 4, 8, 12, 16 }; + long[] int64Array = { 8, 16, 24, 32 }; + float[] floatArray = { 1.5f, 2.0f, 2.5f, 3.0f }; + double[] doubleArray = { 1.25, 1.50, 1.75, 2.00 }; + byte[][] dataArray = {{ 1, 0, 1, 0 }, + { 0, 2, 0, 2 }, + { 3, 0, 3, 0 }, + { 0, 4, 0, 4 }}; + String[] stringArray = { "one", "two", "three", "four" }; + refValues.add(new Int8Value((byte)1)); + refValues.add(new Int8Array(int8Array)); + refValues.add(new Int16Value((short)2)); + refValues.add(new Int16Array(int16Array)); + refValues.add(new Int32Value(4)); + refValues.add(new Int32Array(int32Array)); + refValues.add(new Int64Value(8)); + refValues.add(new Int64Array(int64Array)); + refValues.add(new FloatValue(2.5f)); + refValues.add(new FloatArray(floatArray)); + refValues.add(new DoubleValue(3.75)); + refValues.add(new DoubleArray(doubleArray)); + refValues.add(new DataValue(dataValue)); + refValues.add(new DataArray(dataArray)); + refValues.add(new StringValue("test")); + refValues.add(new StringArray(stringArray)); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void rpc_echo(Request req) { + if (!Test.equals(req.parameters(), refValues)) { + System.err.println("Parameters does not match reference values"); + req.setError(ErrorCode.METHOD_FAILED, "parameter mismatch"); + return; + } + Values p = req.parameters(); + Values r = req.returnValues(); + for (int i = 0; i < p.size(); i++) { + r.add(p.get(i)); + } + } + + public void testEcho() { + Request req = new Request("echo"); + Values p = req.parameters(); + for (int i = 0; i < refValues.size(); i++) { + p.add(refValues.get(i)); + } + target.invokeSync(req, 60.0); + assertTrue(req.checkReturnTypes("bBhHiIlLfFdDxXsS")); + assertTrue(Test.equals(req.returnValues(), req.parameters())); + assertTrue(Test.equals(req.returnValues(), refValues)); + assertTrue(Test.equals(req.parameters(), refValues)); + } +} diff --git a/jrt/tests/com/yahoo/jrt/InvokeAsyncTest.java b/jrt/tests/com/yahoo/jrt/InvokeAsyncTest.java new file mode 100644 index 00000000000..bc1f39c4a47 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/InvokeAsyncTest.java @@ -0,0 +1,62 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class InvokeAsyncTest extends junit.framework.TestCase { + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + Test.Barrier barrier; + + public InvokeAsyncTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + server.addMethod(new Method("concat", "ss", "s", this, "rpc_concat") + .methodDesc("Concatenate 2 strings") + .paramDesc(0, "str1", "a string") + .paramDesc(1, "str2", "another string") + .returnDesc(0, "ret", "str1 followed by str2")); + barrier = new Test.Barrier(); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void rpc_concat(Request req) { + barrier.waitFor(); + req.returnValues().add(new StringValue(req.parameters() + .get(0).asString() + + req.parameters() + .get(1).asString())); + } + + public void testAsync() { + + Request req = new Request("concat"); + req.parameters().add(new StringValue("abc")); + req.parameters().add(new StringValue("def")); + + Test.Waiter w = new Test.Waiter(); + target.invokeAsync(req, 5.0, w); + assertFalse(w.isDone()); + barrier.breakIt(); + w.waitDone(); + assertTrue(w.isDone()); + + assertTrue(!req.isError()); + assertEquals(1, req.returnValues().size()); + assertEquals("abcdef", req.returnValues().get(0).asString()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/InvokeErrorTest.java b/jrt/tests/com/yahoo/jrt/InvokeErrorTest.java new file mode 100644 index 00000000000..8116abd5a15 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/InvokeErrorTest.java @@ -0,0 +1,149 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class InvokeErrorTest extends junit.framework.TestCase { + final double timeout=60.0; + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + Test.Barrier barrier; + + public InvokeErrorTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + server.addMethod(new Method("test", "iib", "i", this, "rpc_test")); + server.addMethod(new Method("test_barrier", "iib", "i", this, + "rpc_test_barrier")); + barrier = new Test.Barrier(); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void rpc_test(Request req) { + int value = req.parameters().get(0).asInt32(); + int error = req.parameters().get(1).asInt32(); + int extra = req.parameters().get(2).asInt8(); + + req.returnValues().add(new Int32Value(value)); + if (extra != 0) { + req.returnValues().add(new Int32Value(value)); + } + if (error != 0) { + req.setError(error, "Custom error"); + } + } + + public void rpc_test_barrier(Request req) { + rpc_test(req); + barrier.waitFor(); + } + + public void testNoError() { + Request req1 = new Request("test"); + req1.parameters().add(new Int32Value(42)); + req1.parameters().add(new Int32Value(0)); + req1.parameters().add(new Int8Value((byte)0)); + target.invokeSync(req1, timeout); + assertTrue(!req1.isError()); + assertEquals(1, req1.returnValues().size()); + assertEquals(42, req1.returnValues().get(0).asInt32()); + } + + public void testNoSuchMethod() { + Request req1 = new Request("bogus"); + target.invokeSync(req1, timeout); + assertTrue(req1.isError()); + assertEquals(0, req1.returnValues().size()); + assertEquals(ErrorCode.NO_SUCH_METHOD, req1.errorCode()); + } + + public void testWrongParameters() { + Request req1 = new Request("test"); + req1.parameters().add(new Int32Value(42)); + req1.parameters().add(new Int32Value(0)); + req1.parameters().add(new Int32Value(0)); + target.invokeSync(req1, timeout); + assertTrue(req1.isError()); + assertEquals(0, req1.returnValues().size()); + assertEquals(ErrorCode.WRONG_PARAMS, req1.errorCode()); + + Request req2 = new Request("test"); + req2.parameters().add(new Int32Value(42)); + req2.parameters().add(new Int32Value(0)); + target.invokeSync(req2, timeout); + assertTrue(req2.isError()); + assertEquals(0, req2.returnValues().size()); + assertEquals(ErrorCode.WRONG_PARAMS, req2.errorCode()); + + Request req3 = new Request("test"); + req3.parameters().add(new Int32Value(42)); + req3.parameters().add(new Int32Value(0)); + req3.parameters().add(new Int8Value((byte)0)); + req3.parameters().add(new Int8Value((byte)0)); + target.invokeSync(req3, timeout); + assertTrue(req3.isError()); + assertEquals(0, req3.returnValues().size()); + assertEquals(ErrorCode.WRONG_PARAMS, req3.errorCode()); + } + + public void testWrongReturnValues() { + Request req1 = new Request("test"); + req1.parameters().add(new Int32Value(42)); + req1.parameters().add(new Int32Value(0)); + req1.parameters().add(new Int8Value((byte)1)); + target.invokeSync(req1, timeout); + assertTrue(req1.isError()); + assertEquals(0, req1.returnValues().size()); + assertEquals(ErrorCode.WRONG_RETURN, req1.errorCode()); + } + + public void testMethodFailed() { + Request req1 = new Request("test"); + req1.parameters().add(new Int32Value(42)); + req1.parameters().add(new Int32Value(75000)); + req1.parameters().add(new Int8Value((byte)0)); + target.invokeSync(req1, timeout); + assertTrue(req1.isError()); + assertEquals(0, req1.returnValues().size()); + assertEquals(75000, req1.errorCode()); + + Request req2 = new Request("test"); + req2.parameters().add(new Int32Value(42)); + req2.parameters().add(new Int32Value(75000)); + req2.parameters().add(new Int8Value((byte)1)); + target.invokeSync(req2, timeout); + assertTrue(req2.isError()); + assertEquals(0, req2.returnValues().size()); + assertEquals(75000, req2.errorCode()); + } + + public void testConnectionError() { + Test.Waiter w = new Test.Waiter(); + Request req1 = new Request("test_barrier"); + req1.parameters().add(new Int32Value(42)); + req1.parameters().add(new Int32Value(0)); + req1.parameters().add(new Int8Value((byte)0)); + target.invokeAsync(req1, timeout, w); + target.close(); + client.transport().sync(); + barrier.breakIt(); + w.waitDone(); + assertTrue(!target.isValid()); + assertTrue(req1.isError()); + assertEquals(0, req1.returnValues().size()); + assertEquals(ErrorCode.CONNECTION, req1.errorCode()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/InvokeSyncTest.java b/jrt/tests/com/yahoo/jrt/InvokeSyncTest.java new file mode 100644 index 00000000000..d8af986f588 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/InvokeSyncTest.java @@ -0,0 +1,82 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + +import java.io.ByteArrayOutputStream; +import java.io.FileDescriptor; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import com.yahoo.jrt.tool.RpcInvoker; + + +public class InvokeSyncTest extends junit.framework.TestCase { + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + + public InvokeSyncTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + server.addMethod(new Method("concat", "ss", "s", this, "rpc_concat") + .methodDesc("Concatenate 2 strings") + .paramDesc(0, "str1", "a string") + .paramDesc(1, "str2", "another string") + .returnDesc(0, "ret", "str1 followed by str2")); + server.addMethod(new Method("alltypes", "bhilfds", "s", this, "rpc_alltypes") + .methodDesc("Method taking all types of params")); + } + + public void tearDown() { + System.setOut(new PrintStream(new FileOutputStream(FileDescriptor.out))); + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void rpc_concat(Request req) { + req.returnValues().add(new StringValue(req.parameters() + .get(0).asString() + + req.parameters() + .get(1).asString())); + } + + public void rpc_alltypes(Request req) { + req.returnValues().add(new StringValue("This was alltypes. The string param was: "+req.parameters().get(6).asString())); + } + + public void testSync() { + Request req = new Request("concat"); + req.parameters().add(new StringValue("abc")); + req.parameters().add(new StringValue("def")); + + target.invokeSync(req, 5.0); + + assertTrue(!req.isError()); + assertEquals(1, req.returnValues().size()); + assertEquals("abcdef", req.returnValues().get(0).asString()); + } + + public void testRpcInvoker() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + RpcInvoker.main(new String[] {"-h", "localhost:"+Test.PORT, "concat", "s:foo", "s:bar"}); + baos.flush(); + assertEquals(baos.toString(), "foobar\n"); + baos.reset(); + System.setOut(new PrintStream(baos)); + RpcInvoker.main(new String[] {"-h", "localhost:"+Test.PORT, "alltypes", "b:1", "h:2", "i:3", "l:4", "f:5.0", "d:6.0", "s:baz"}); + baos.flush(); + assertEquals(baos.toString(), "This was alltypes. The string param was: baz\n"); + } + +} diff --git a/jrt/tests/com/yahoo/jrt/InvokeVoidTest.java b/jrt/tests/com/yahoo/jrt/InvokeVoidTest.java new file mode 100644 index 00000000000..4860b421146 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/InvokeVoidTest.java @@ -0,0 +1,74 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class InvokeVoidTest extends junit.framework.TestCase { + + Test.Orb server; + Acceptor acceptor; + Test.Orb client; + Target target; + + public InvokeVoidTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Test.Orb(new Transport()); + client = new Test.Orb(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + + server.addMethod(new Method("set", "i", "", this, "rpc_set") + .methodDesc("Set the stored value") + .paramDesc(0, "value", "the new value")); + server.addMethod(new Method("inc", "", "", this, "rpc_inc") + .methodDesc("Increase the stored value")); + server.addMethod(new Method("get", "", "i", this, "rpc_get") + .methodDesc("Get the stored value") + .returnDesc(0, "value", "the stored value")); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + private int value = 0; + + public void rpc_set(Request req) { + value = req.parameters().get(0).asInt32(); + } + public void rpc_inc(Request req) { + value++; + } + public void rpc_get(Request req) { + req.returnValues().add(new Int32Value(value)); + } + + public void testInvokeVoid() { + + Request req = new Request("set"); + req.parameters().add(new Int32Value(40)); + target.invokeSync(req, 5.0); + assertTrue(!req.isError()); + assertEquals(0, req.returnValues().size()); + + target.invokeVoid(new Request("inc")); + target.invokeVoid(new Request("inc")); + + req = new Request("get"); + target.invokeSync(req, 5.0); + assertTrue(!req.isError()); + assertEquals(42, req.returnValues().get(0).asInt32()); + + assertTrue(server.checkReadCounts(4, 0, 0)); + assertTrue(server.checkWriteCounts(0, 2, 0)); + assertTrue(client.checkReadCounts(0, 2, 0)); + assertTrue(client.checkWriteCounts(4, 0, 0)); + assertTrue(server.readBytes == client.writeBytes); + assertTrue(client.readBytes == server.writeBytes); + } +} diff --git a/jrt/tests/com/yahoo/jrt/ListenTest.java b/jrt/tests/com/yahoo/jrt/ListenTest.java new file mode 100644 index 00000000000..d42c7dece9f --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/ListenTest.java @@ -0,0 +1,98 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class ListenTest extends junit.framework.TestCase { + + Supervisor server; + + public ListenTest(String name) { + super(name); + } + + public void setUp() { + server = new Supervisor(new Transport()); + } + + public void tearDown() { + server.transport().shutdown().join(); + } + + public void testListen() { + try { + Acceptor a = server.listen(new Spec(Test.PORT)); + assertEquals(Test.PORT, a.port()); + a.shutdown().join(); + assertEquals(-1, a.port()); + } catch (ListenFailedException e) { + assertTrue(false); + } + try { + Acceptor a = server.listen(new Spec(null, Test.PORT)); + assertEquals(Test.PORT, a.port()); + a.shutdown().join(); + assertEquals(-1, a.port()); + } catch (ListenFailedException e) { + assertTrue(false); + } + try { + Acceptor a = server.listen(new Spec("tcp/" + Test.PORT)); + assertEquals(Test.PORT, a.port()); + a.shutdown().join(); + assertEquals(-1, a.port()); + } catch (ListenFailedException e) { + assertTrue(false); + } + try { + Acceptor a = server.listen(new Spec(Test.PORT_0)); + Acceptor b = server.listen(new Spec(Test.PORT_1)); + Acceptor c = server.listen(new Spec(Test.PORT_2)); + assertEquals(Test.PORT_0, a.port()); + assertEquals(Test.PORT_1, b.port()); + assertEquals(Test.PORT_2, c.port()); + a.shutdown().join(); + assertEquals(-1, a.port()); + b.shutdown().join(); + assertEquals(-1, b.port()); + c.shutdown().join(); + assertEquals(-1, c.port()); + } catch (ListenFailedException e) { + assertTrue(false); + } + } + + public void testBogusListen() { + try { + Acceptor a = server.listen(new Spec("bogus")); + assertTrue(false); + } catch (ListenFailedException e) {} + + try { + Acceptor a = server.listen(new Spec(Test.PORT)); + assertEquals(Test.PORT, a.port()); + // try { + // Acceptor b = server.listen(new Spec(Test.PORT)); + // assertTrue(false); + // } catch (ListenFailedException e) {} + a.shutdown().join(); + assertEquals(-1, a.port()); + } catch (ListenFailedException e) { + assertTrue(false); + } + } + + public void testListenAnyPort() { + try { + Acceptor a = server.listen(new Spec("tcp/0")); + assertTrue(a.port() > 0); + // try { + // Acceptor b = server.listen(new Spec(a.port())); + // assertTrue(false); + // } catch (ListenFailedException e) {} + a.shutdown().join(); + assertEquals(-1, a.port()); + } catch (ListenFailedException e) { + assertTrue(false); + } + } +} diff --git a/jrt/tests/com/yahoo/jrt/MandatoryMethodsTest.java b/jrt/tests/com/yahoo/jrt/MandatoryMethodsTest.java new file mode 100644 index 00000000000..7f2e073e4d8 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/MandatoryMethodsTest.java @@ -0,0 +1,97 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.util.HashSet; + + +public class MandatoryMethodsTest extends junit.framework.TestCase { + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + + public MandatoryMethodsTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void testPing() { + + Request req = new Request("frt.rpc.ping"); + target.invokeSync(req, 5.0); + + assertFalse(req.isError()); + assertEquals(0, req.returnValues().size()); + } + + public void testGetMethodList() { + + Request req = new Request("frt.rpc.getMethodList"); + target.invokeSync(req, 5.0); + + assertFalse(req.isError()); + assertTrue(req.checkReturnTypes("SSS")); + String[] names = req.returnValues().get(0).asStringArray(); + String[] param = req.returnValues().get(1).asStringArray(); + String[] ret = req.returnValues().get(2).asStringArray(); + assertEquals(3, names.length); + assertTrue(names.length == param.length); + assertTrue(names.length == ret.length); + HashSet<String> foundSet = new HashSet<String>(); + for (int i = 0; i < names.length; i++) { + if (names[i].equals("frt.rpc.ping")) { + assertEquals("", param[i]); + assertEquals("", ret[i]); + } else if (names[i].equals("frt.rpc.getMethodList")) { + assertEquals("", param[i]); + assertEquals("SSS", ret[i]); + } else if (names[i].equals("frt.rpc.getMethodInfo")) { + assertEquals("s", param[i]); + assertEquals("sssSSSS", ret[i]); + } + foundSet.add(names[i]); + } + assertEquals(3, foundSet.size()); + assertTrue(foundSet.contains("frt.rpc.ping")); + assertTrue(foundSet.contains("frt.rpc.getMethodList")); + assertTrue(foundSet.contains("frt.rpc.getMethodInfo")); + } + + public void testGetMethodInfo() { + Request req = new Request("frt.rpc.getMethodInfo"); + req.parameters().add(new StringValue("frt.rpc.getMethodInfo")); + target.invokeSync(req, 5.0); + + assertFalse(req.isError()); + assertTrue(req.checkReturnTypes("sssSSSS")); + + String desc = req.returnValues().get(0).asString(); + String param = req.returnValues().get(1).asString(); + String ret = req.returnValues().get(2).asString(); + String[] paramName = req.returnValues().get(3).asStringArray(); + String[] paramDesc = req.returnValues().get(4).asStringArray(); + String[] retName = req.returnValues().get(5).asStringArray(); + String[] retDesc = req.returnValues().get(6).asStringArray(); + assertEquals("s", param); + assertEquals("sssSSSS", ret); + assertEquals(1, paramName.length); + assertTrue(paramName.length == paramDesc.length); + assertEquals(7, retName.length); + assertTrue(retName.length == retDesc.length); + } +} diff --git a/jrt/tests/com/yahoo/jrt/PacketTest.java b/jrt/tests/com/yahoo/jrt/PacketTest.java new file mode 100644 index 00000000000..86c05525763 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/PacketTest.java @@ -0,0 +1,129 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.ByteBuffer; + + +public class PacketTest extends junit.framework.TestCase { + + public PacketTest(String name) { + super(name); + } + + + public void testRequestPacket() { + + Values params = new Values(); + params.add(new Int32Value(123)); + + Packet packet = new RequestPacket(Packet.FLAG_NOREPLY, 42, + "foobar", params); + PacketInfo info = packet.getPacketInfo(); + + ByteBuffer buf = ByteBuffer.allocate(info.packetLength()); + info.encodePacket(packet, buf); + buf.flip(); + + int bytes = 12 + 4 + 6 + params.bytes(); + ByteBuffer ref = ByteBuffer.allocate(bytes); + ref.putInt(bytes - 4); // plen + ref.putShort((short)2); // flags (no reply) + ref.putShort((short)100); // pcode (request) + ref.putInt(42); // reqId + ref.putInt(6); // method name length + ref.put((byte)'f').put((byte)'o').put((byte)'o') + .put((byte)'b').put((byte)'a').put((byte)'r'); + params.encode(ref); + assertEquals(0, ref.remaining()); + ref.flip(); + assertTrue(buf.equals(ref)); + + PacketInfo info2 = PacketInfo.getPacketInfo(buf); + assertTrue(info2 != null); + assertEquals(info2.packetLength(), buf.remaining()); + Packet packet2 = info2.decodePacket(buf); + assertEquals(0, buf.remaining()); + + assertEquals(packet2.requestId(), 42); + assertEquals(((RequestPacket)packet2).methodName(), "foobar"); + Values params2 = ((RequestPacket)packet2).parameters(); + assertEquals(params2.size(), 1); + assertEquals(params2.get(0).type(), Value.INT32); + assertEquals(params2.get(0).asInt32(), 123); + } + + + public void testReplyPacket() { + + Values ret = new Values(); + ret.add(new Int32Value(123)); + + Packet packet = new ReplyPacket(0, 42, ret); + PacketInfo info = packet.getPacketInfo(); + + ByteBuffer buf = ByteBuffer.allocate(info.packetLength()); + info.encodePacket(packet, buf); + buf.flip(); + + int bytes = 12 + ret.bytes(); + ByteBuffer ref = ByteBuffer.allocate(bytes); + ref.putInt(bytes - 4); // plen + ref.putShort((short)0); // flags + ref.putShort((short)101); // pcode (reply) + ref.putInt(42); // reqId + ret.encode(ref); + assertEquals(0, ref.remaining()); + ref.flip(); + assertTrue(buf.equals(ref)); + + PacketInfo info2 = PacketInfo.getPacketInfo(buf); + assertTrue(info2 != null); + assertEquals(info2.packetLength(), buf.remaining()); + Packet packet2 = info2.decodePacket(buf); + assertEquals(0, buf.remaining()); + + assertEquals(packet2.requestId(), 42); + Values ret2 = ((ReplyPacket)packet2).returnValues(); + assertEquals(ret2.size(), 1); + assertEquals(ret2.get(0).type(), Value.INT32); + assertEquals(ret2.get(0).asInt32(), 123); + } + + + public void testErrorPacket() { + + String errStr = "NSM"; + Packet packet = + new ErrorPacket(0, 42, ErrorCode.NO_SUCH_METHOD, errStr); + PacketInfo info = packet.getPacketInfo(); + + ByteBuffer buf = ByteBuffer.allocate(info.packetLength()); + info.encodePacket(packet, buf); + buf.flip(); + + int bytes = 12 + 4 + 4 + 3; + ByteBuffer ref = ByteBuffer.allocate(bytes); + ref.putInt(bytes - 4); // plen + ref.putShort((short)0); // flags + ref.putShort((short)102); // pcode (error) + ref.putInt(42); // reqId + ref.putInt(ErrorCode.NO_SUCH_METHOD); + ref.putInt(3); // length of errorMessage + ref.put((byte)'N').put((byte)'S').put((byte)'M'); + assertEquals(0, ref.remaining()); + ref.flip(); + assertTrue(buf.equals(ref)); + + PacketInfo info2 = PacketInfo.getPacketInfo(buf); + assertTrue(info2 != null); + assertEquals(info2.packetLength(), buf.remaining()); + Packet packet2 = info2.decodePacket(buf); + assertEquals(0, buf.remaining()); + + assertEquals(packet2.requestId(), 42); + assertEquals(ErrorCode.NO_SUCH_METHOD, + ((ErrorPacket)packet2).errorCode()); + assertEquals(errStr, ((ErrorPacket)packet2).errorMessage()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/QueueTest.java b/jrt/tests/com/yahoo/jrt/QueueTest.java new file mode 100644 index 00000000000..f89d31923a5 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/QueueTest.java @@ -0,0 +1,97 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + +public class QueueTest extends junit.framework.TestCase { + + public QueueTest(String name) { + super(name); + } + + public void testEmpty() { + Queue queue = new Queue(); + + assertTrue(queue.isEmpty()); + assertTrue(queue.size() == 0); + assertTrue(queue.dequeue() == null); + queue.enqueue(new Object()); + assertFalse(queue.isEmpty()); + assertFalse(queue.size() == 0); + assertFalse(queue.dequeue() == null); + } + + public void testEnqueueDequeue() { + Queue queue = new Queue(); + Integer int1 = new Integer(1); + Integer int2 = new Integer(2); + Integer int3 = new Integer(3); + Integer int4 = new Integer(4); + Integer int5 = new Integer(5); + + assertEquals(queue.size(), 0); + queue.enqueue(int1); + assertEquals(queue.size(), 1); + assertTrue(queue.dequeue() == int1); + assertEquals(queue.size(), 0); + + queue.enqueue(int1); + assertEquals(queue.size(), 1); + queue.enqueue(int2); + assertEquals(queue.size(), 2); + queue.enqueue(int3); + assertEquals(queue.size(), 3); + assertTrue(queue.dequeue() == int1); + assertEquals(queue.size(), 2); + assertTrue(queue.dequeue() == int2); + assertEquals(queue.size(), 1); + assertTrue(queue.dequeue() == int3); + assertEquals(queue.size(), 0); + + queue.enqueue(int1); + assertEquals(queue.size(), 1); + queue.enqueue(int2); + assertEquals(queue.size(), 2); + queue.enqueue(int3); + assertEquals(queue.size(), 3); + assertTrue(queue.dequeue() == int1); + assertEquals(queue.size(), 2); + assertTrue(queue.dequeue() == int2); + assertEquals(queue.size(), 1); + queue.enqueue(int4); + assertEquals(queue.size(), 2); + queue.enqueue(int5); + assertEquals(queue.size(), 3); + + assertTrue(queue.dequeue() == int3); + assertEquals(queue.size(), 2); + assertTrue(queue.dequeue() == int4); + assertEquals(queue.size(), 1); + assertTrue(queue.dequeue() == int5); + assertEquals(queue.size(), 0); + } + + public void testFlush() { + Queue src = new Queue(); + Queue dst = new Queue(); + Integer int1 = new Integer(1); + Integer int2 = new Integer(2); + Integer int3 = new Integer(3); + + assertTrue(src.flush(dst) == 0); + assertEquals(src.size(), 0); + assertEquals(dst.size(), 0); + + src.enqueue(int1); + src.enqueue(int2); + src.enqueue(int3); + + assertEquals(src.size(), 3); + assertEquals(dst.size(), 0); + assertTrue(src.flush(dst) == 3); + assertEquals(src.size(), 0); + assertEquals(dst.size(), 3); + + assertTrue(dst.dequeue() == int1); + assertTrue(dst.dequeue() == int2); + assertTrue(dst.dequeue() == int3); + } +} diff --git a/jrt/tests/com/yahoo/jrt/SchedulerTest.java b/jrt/tests/com/yahoo/jrt/SchedulerTest.java new file mode 100644 index 00000000000..785bc3ff719 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/SchedulerTest.java @@ -0,0 +1,238 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + +import java.util.Random; + +public class SchedulerTest extends junit.framework.TestCase { + + long now; // fake time + Scheduler scheduler; + + private class MyTask implements Runnable { + private Task task; + private long target; + private long actual = 0; + private boolean done = false; + private boolean multiple = false; + + public MyTask(long target) { + task = new Task(scheduler, this); + this.target = target; + } + + public void schedule() { + task.schedule(target / 1000.0); + } + + public boolean unschedule() { + return task.unschedule(); + } + + public boolean kill() { + return task.kill(); + } + + public boolean done() { + return done; + } + + public boolean check() { + if (!done || multiple) { + return false; + } + if (actual < target) { + return false; + } + // 2 * Scheduler.TICK == 200 + return ((actual - target) <= 200); + } + + public void run() { + multiple = done; + done = true; + actual = now; + } + } + + private class RTTask implements Runnable { + private Task task; + private int cnt = 0; + + public RTTask() { + task = new Task(scheduler, this); + } + + public Task task() { + return task; + } + + public void schedule() { + task.scheduleNow(); + } + + public boolean unschedule() { + return task.unschedule(); + } + + public boolean kill() { + return task.kill(); + } + + public int cnt() { + return cnt; + } + + public void run() { + cnt++; + task.scheduleNow(); + } + } + + public SchedulerTest(String name) { + super(name); + } + + public void setUp() { + now = 0; + scheduler = new Scheduler(now); + } + + public void tearDown() { + scheduler = null; + } + + public void testTimeliness() { + Random rand = new Random(73201242); + + RTTask rt1 = new RTTask(); + RTTask rt2 = new RTTask(); + RTTask rt3 = new RTTask(); + rt1.schedule(); + rt2.schedule(); + rt3.schedule(); + + MyTask[] tasks = new MyTask[250000]; + for (int i = 0; i < tasks.length; i++) { + tasks[i] = new MyTask(rand.nextInt(131072)); + tasks[i].schedule(); + } + int iterations = 0; + while (now < 135000) { + now += 10; + scheduler.checkTasks(now); + iterations++; + } + assertEquals(iterations, rt1.cnt()); + assertEquals(iterations, rt2.cnt()); + assertEquals(iterations, rt3.cnt()); + for (int i = 0; i < tasks.length; i++) { + assertTrue(tasks[i].check()); + } + } + + public void testUnschedule() { + MyTask t1 = new MyTask(1000); + MyTask t2 = new MyTask(1000); + MyTask t3 = new MyTask(1000); + MyTask t4 = new MyTask(1000); + MyTask t5 = new MyTask(1000); + + RTTask rt1 = new RTTask(); + RTTask rt2 = new RTTask(); + RTTask rt3 = new RTTask(); + RTTask rt4 = new RTTask(); + RTTask rt5 = new RTTask(); + + assertFalse(t4.kill()); + + t1.schedule(); + t2.schedule(); + t3.schedule(); + t4.schedule(); + t5.schedule(); + + assertFalse(rt4.kill()); + + rt1.schedule(); + rt2.schedule(); + rt3.schedule(); + rt4.schedule(); + rt5.schedule(); + + assertTrue(t2.unschedule()); + assertTrue(t1.unschedule()); + assertTrue(t5.unschedule()); + + assertFalse(t2.unschedule()); + assertFalse(t1.unschedule()); + assertFalse(t5.unschedule()); + + t2.schedule(); + t1.schedule(); + assertTrue(t2.kill()); + t2.schedule(); + assertFalse(t2.kill()); + + int cnt = 0; + while (now < 5000) { + scheduler.checkTasks(now); + now += 10; + cnt++; + } + int old_cnt = cnt; + assertTrue(rt1.kill()); + assertTrue(rt3.unschedule()); + assertTrue(rt2.unschedule()); + rt1.schedule(); + rt2.schedule(); + while (now < 10000) { + scheduler.checkTasks(now); + now += 10; + cnt++; + } + + assertTrue(t1.check()); + assertFalse(t2.done()); + assertTrue(t3.check()); + assertFalse(t4.done()); + assertFalse(t5.done()); + + assertEquals(old_cnt, rt1.cnt()); + assertEquals(cnt, rt2.cnt()); + assertEquals(old_cnt, rt3.cnt()); + assertEquals(0, rt4.cnt()); + assertEquals(cnt, rt5.cnt()); + } + + public void testSlowEventLoop() { + scheduler.checkTasks(now); + now += 10000; + MyTask task1 = new MyTask(5000); + task1.schedule(); + int cnt1 = 0; + while (true) { + scheduler.checkTasks(now); + if (task1.done()) { + break; + } + cnt1++; + now += 10; + } + assertTrue(cnt1 > 400 && cnt1 < 500); + + scheduler.checkTasks(now); + now += 10000; + MyTask task2 = new MyTask(5000); + task2.schedule(); + int cnt2 = 0; + while (true) { + scheduler.checkTasks(now); + if (task2.done()) { + break; + } + cnt2++; + now += 10000; + } + assertTrue(cnt2 > 10 && cnt2 < 30); + } +} diff --git a/jrt/tests/com/yahoo/jrt/SessionTest.java b/jrt/tests/com/yahoo/jrt/SessionTest.java new file mode 100644 index 00000000000..800c28bc6ce --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/SessionTest.java @@ -0,0 +1,451 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class SessionTest extends junit.framework.TestCase + implements SessionHandler { + + private static class Session { + private static int cnt = 0; + private static boolean error = false; + + private int value = 0; + private boolean gotInit = false; + private boolean gotLive = false; + private boolean gotDown = false; + private boolean gotFini = false; + + private static synchronized void add() { + cnt++; + } + + private static synchronized void sub() { + cnt--; + } + + public Session() { + add(); + } + + public void init() { + if (gotInit || gotLive || gotDown || gotFini) { + setError(); + } + gotInit = true; + } + + public void live() { + if (!gotInit || gotLive || gotDown || gotFini) { + setError(); + } + gotLive = true; + } + + public void touch() { + if (!gotInit || gotFini) { + setError(); + } + } + + public int value() { + if (!gotInit || gotFini) { + setError(); + } + return value; + } + + public void value(int value) { + if (!gotInit || gotFini) { + setError(); + } + this.value = value; + } + + public void down() { + if (!gotInit || gotDown || gotFini) { + setError(); + } + gotDown = true; + } + + public void fini() { + if (!gotInit || !gotDown || gotFini) { + setError(); + } + gotFini = true; + sub(); + } + + public static int cnt() { + return cnt; + } + + public static void setError() { + error = true; + Throwable e = new RuntimeException("ERROR TRACE"); + e.printStackTrace(); + } + + public static boolean getError() { + return error; + } + + public static void reset() { + error = false; + cnt = 0; + } + } + + Test.Orb server; + Acceptor acceptor; + Test.Orb client; + Target target; + Test.Receptor receptor; + + public SessionTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + Session.reset(); + server = new Test.Orb(new Transport()); + server.setSessionHandler(this); + client = new Test.Orb(new Transport()); + client.setSessionHandler(this); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT), + new Session()); + + server.addMethod(new Method("set", "i", "", this, + "rpc_set")); + server.addMethod(new Method("get", "", "i", this, + "rpc_get")); + server.addMethod(new Method("call_detach", "", "", this, + "rpc_call_detach")); + client.addMethod(new Method("detach", "", "", this, + "rpc_detach")); + receptor = new Test.Receptor(); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void handleSessionInit(Target t) { + Object ctx = t.getContext(); + if (t.isClient()) { + if (ctx == null) { + Session.setError(); + } + } + if (t.isServer()) { + if (ctx != null) { + Session.setError(); + } + t.setContext(new Session()); + } + Session s = (Session) t.getContext(); + if (s == null) { + Session.setError(); + } else { + s.init(); + } + } + + public void handleSessionLive(Target t) { + Session s = (Session) t.getContext(); + if (s == null) { + Session.setError(); + } else { + s.live(); + } + } + + public void handleSessionDown(Target t) { + Session s = (Session) t.getContext(); + if (s == null) { + Session.setError(); + } else { + s.down(); + } + } + + public void handleSessionFini(Target t) { + Session s = (Session) t.getContext(); + if (s == null) { + Session.setError(); + } else { + s.fini(); + } + } + + public void rpc_set(Request req) { + Session s = (Session) req.target().getContext(); + s.value(req.parameters().get(0).asInt32()); + } + + public void rpc_get(Request req) { + Session s = (Session) req.target().getContext(); + req.returnValues().add(new Int32Value(s.value())); + } + + public void rpc_call_detach(Request req) { + Session s = (Session) req.target().getContext(); + s.touch(); + req.target().invokeVoid(new Request("detach")); + } + + public void rpc_detach(Request req) { + Session s = (Session) req.target().getContext(); + if (s == null) { + Session.setError(); + } else { + s.touch(); + } + req.detach(); + receptor.put(req); + } + + public void waitState(int sessionCount, + int serverInitCount, + int serverLiveCount, + int serverDownCount, + int serverFiniCount, + int clientInitCount, + int clientLiveCount, + int clientDownCount, + int clientFiniCount) { + server.transport().sync().sync(); + client.transport().sync().sync(); + for (int i = 0; i < 100; i++) { + if ((sessionCount == Session.cnt() || sessionCount < 0) && + (serverInitCount == server.initCount || serverInitCount < 0) && + (serverLiveCount == server.liveCount || serverLiveCount < 0) && + (serverDownCount == server.downCount || serverDownCount < 0) && + (serverFiniCount == server.finiCount || serverFiniCount < 0) && + (clientInitCount == client.initCount || clientInitCount < 0) && + (clientLiveCount == client.liveCount || clientLiveCount < 0) && + (clientDownCount == client.downCount || clientDownCount < 0) && + (clientFiniCount == client.finiCount || clientFiniCount < 0)) { + break; + } + try { Thread.sleep(100); } catch (InterruptedException e) {} + } + server.transport().sync().sync(); + client.transport().sync().sync(); + } + + public void testConnDownLast() { + waitState(2, 1, 1, 0, 0, 1, 1, 0, 0); + assertEquals(2, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(0, server.downCount); + assertEquals(0, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(0, client.downCount); + assertEquals(0, client.finiCount); + + Request req = new Request("get"); + target.invokeSync(req, 5.0); + assertEquals(0, req.returnValues().get(0).asInt32()); + + req = new Request("set"); + req.parameters().add(new Int32Value(42)); + target.invokeSync(req, 5.0); + assertTrue(!req.isError()); + + req = new Request("get"); + target.invokeSync(req, 5.0); + assertEquals(42, req.returnValues().get(0).asInt32()); + + assertEquals(2, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(0, server.downCount); + assertEquals(0, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(0, client.downCount); + assertEquals(0, client.finiCount); + + target.close(); + waitState(0, 1, 1, 1, 1, 1, 1, 1, 1); + assertEquals(0, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(1, server.downCount); + assertEquals(1, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(1, client.downCount); + assertEquals(1, client.finiCount); + assertFalse(Session.getError()); + } + + public void testReqDoneLast() { + waitState(2, 1, 1, 0, 0, 1, 1, 0, 0); + assertEquals(2, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(0, server.downCount); + assertEquals(0, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(0, client.downCount); + assertEquals(0, client.finiCount); + + Request req = new Request("get"); + target.invokeSync(req, 5.0); + assertEquals(0, req.returnValues().get(0).asInt32()); + + req = new Request("set"); + req.parameters().add(new Int32Value(42)); + target.invokeSync(req, 5.0); + assertTrue(!req.isError()); + + req = new Request("get"); + target.invokeSync(req, 5.0); + assertEquals(42, req.returnValues().get(0).asInt32()); + + assertEquals(2, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(0, server.downCount); + assertEquals(0, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(0, client.downCount); + assertEquals(0, client.finiCount); + + req = new Request("call_detach"); + target.invokeSync(req, 5.0); + assertTrue(!req.isError()); + Request detached = (Request) receptor.get(); + + target.close(); + waitState(1, 1, 1, 1, 1, 1, 1, 1, 0); + assertEquals(1, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(1, server.downCount); + assertEquals(1, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(1, client.downCount); + assertEquals(0, client.finiCount); + + detached.returnRequest(); + waitState(0, 1, 1, 1, 1, 1, 1, 1, 1); + assertEquals(0, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(1, server.downCount); + assertEquals(1, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(1, client.downCount); + assertEquals(1, client.finiCount); + assertFalse(Session.getError()); + } + + public void testNeverLive() { + waitState(2, 1, 1, 0, 0, 1, 1, 0, 0); + assertEquals(2, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(0, server.downCount); + assertEquals(0, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(0, client.downCount); + assertEquals(0, client.finiCount); + + target.close(); + waitState(0, 1, 1, 1, 1, 1, 1, 1, 1); + assertEquals(0, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(1, server.downCount); + assertEquals(1, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(1, client.downCount); + assertEquals(1, client.finiCount); + + Target bogus = client.connect(new Spec("bogus"), + new Session()); + waitState(0, 1, 1, 1, 1, 2, 1, 2, 2); + assertEquals(0, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(1, server.downCount); + assertEquals(1, server.finiCount); + assertEquals(2, client.initCount); + assertEquals(1, client.liveCount); // <--- NB + assertEquals(2, client.downCount); + assertEquals(2, client.finiCount); + assertFalse(Session.getError()); + } + + public void testTransportDown() { + waitState(2, 1, 1, 0, 0, 1, 1, 0, 0); + assertEquals(2, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(0, server.downCount); + assertEquals(0, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(0, client.downCount); + assertEquals(0, client.finiCount); + + server.transport().shutdown().join(); + + waitState(0, 1, 1, 1, 1, 1, 1, 1, 1); + assertEquals(0, Session.cnt()); + assertEquals(1, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(1, server.downCount); + assertEquals(1, server.finiCount); + assertEquals(1, client.initCount); + assertEquals(1, client.liveCount); + assertEquals(1, client.downCount); + assertEquals(1, client.finiCount); + + target = client.connect(new Spec("localhost", Test.PORT), + new Session()); + + waitState(0, 2, 1, 2, 2, 2, -1, 2, 2); + assertEquals(0, Session.cnt()); + assertEquals(2, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(2, server.downCount); + assertEquals(2, server.finiCount); + assertEquals(2, client.initCount); + int oldClientLive = client.liveCount; + assertEquals(2, client.downCount); + assertEquals(2, client.finiCount); + + client.transport().shutdown().join(); + + target = client.connect(new Spec("localhost", Test.PORT), + new Session()); + + waitState(0, 2, 1, 2, 2, 3, oldClientLive, 3, 3); + assertEquals(0, Session.cnt()); + assertEquals(2, server.initCount); + assertEquals(1, server.liveCount); + assertEquals(2, server.downCount); + assertEquals(2, server.finiCount); + assertEquals(3, client.initCount); + assertEquals(oldClientLive, client.liveCount); + assertEquals(3, client.downCount); + assertEquals(3, client.finiCount); + assertFalse(Session.getError()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/SlobrokTest.java b/jrt/tests/com/yahoo/jrt/SlobrokTest.java new file mode 100644 index 00000000000..14c265705c4 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/SlobrokTest.java @@ -0,0 +1,235 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; + +import com.yahoo.jrt.slobrok.api.SlobrokList; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.jrt.slobrok.api.Register; +import com.yahoo.jrt.slobrok.api.Mirror.Entry; +import com.yahoo.jrt.slobrok.server.Slobrok; + + +public class SlobrokTest extends junit.framework.TestCase { + + private static class SpecList extends ArrayList<Mirror.Entry> { + public SpecList add(String name, String spec) { + add(new Mirror.Entry(name, spec)); + return this; + } + } + + String[] slobroks; + boolean error = false; + Supervisor server = new Supervisor(new Transport()); + Supervisor client = new Supervisor(new Transport()); + Acceptor acceptor = null; + Mirror mirror = null; + Register register = null; + String mySpec = null; + Slobrok slobrok; + + public SlobrokTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + slobrok = new Slobrok(); + slobroks = new String[1]; + slobroks[0] = new Spec("localhost", slobrok.port()).toString(); + SlobrokList slobroklist = new SlobrokList(); + slobroklist.setup(slobroks); + acceptor = server.listen(new Spec(0)); + mirror = new Mirror(client, slobroklist); + register = new Register(server, slobroklist, + "localhost", acceptor.port()); + mySpec = new Spec("localhost", acceptor.port()).toString(); + } + + public void tearDown() { + register.shutdown(); + mirror.shutdown(); + acceptor.shutdown(); + client.transport().shutdown(); + server.transport().shutdown(); + slobrok.stop(); + } + + void check(String pattern, ArrayList<Entry> result) { + if (error) { + err("already failed, skipping test"); + return; + } + Comparator<Entry> cmp = new Comparator<Entry>() { + public int compare(Entry a, Entry b) { + return a.compareTo(b); + } + }; + Mirror.Entry[] expect = + result.toArray(new Mirror.Entry[result.size()]); + Arrays.sort(expect, cmp); + Mirror.Entry[] actual = new Mirror.Entry[0]; + for (int i = 0; i < 1000; i++) { + actual = mirror.lookup(pattern); + Arrays.sort(actual, cmp); + if (Arrays.equals(actual, expect)) { + // err("lookup successful for pattern: " + pattern); + return; + } + try { Thread.sleep(10); } catch (InterruptedException e) {} + } + error = true; + err("lookup failed for pattern: " + pattern); + err("actual values:"); + if (actual.length == 0) { + err(" { EMPTY }"); + } + for (int i = 0; i < actual.length; i++) { + err(" {" + actual[i].getName() + ", " + actual[i].getSpec() + "}"); + } + err("expected values:"); + if (expect.length == 0) { + err(" { EMPTY }"); + } + for (int i = 0; i < expect.length; i++) { + err(" {" + expect[i].getName() + ", " + expect[i].getSpec() + "}"); + } + } + + public void testSlobrok() { + String wantName = "A/x/w"; + register.registerName(wantName); + check(wantName, new SpecList().add(wantName, mySpec)); + check("*/*", new SpecList()); + check("*/*/*", new SpecList().add(wantName, mySpec)); + + assertTrue(mirror.ready()); + assertTrue(mirror.updates() > 0); + + Mirror.Entry[] oneArr = mirror.lookup("*/*/*"); + assertTrue(oneArr.length == 1); + Mirror.Entry one = oneArr[0]; + assertTrue(one.equals(new Mirror.Entry(wantName, mySpec))); + assertFalse(one.equals(new Mirror.Entry("B/x/w", mySpec))); + assertFalse(one.equals(new Mirror.Entry(wantName, "foo:99"))); + assertFalse(one.equals(null)); + assertFalse(one.equals(register)); + assertTrue(one.getName().equals(wantName)); + assertTrue(one.getSpec().equals(mySpec)); + int wantHC = mySpec.hashCode() + wantName.hashCode(); + assertTrue(one.hashCode() == wantHC); + + register.registerName("B/x"); + check("B/x", new SpecList().add("B/x", mySpec)); + check("*/*", new SpecList().add("B/x", mySpec)); + check("*/*/*", new SpecList().add("A/x/w", mySpec)); + + register.registerName("C/x/z"); + check("C/x/z", new SpecList().add("C/x/z", mySpec)); + check("*/*", new SpecList().add("B/x", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("C/x/z", mySpec)); + + register.registerName("D/y/z"); + check("D/y/z", new SpecList().add("D/y/z", mySpec)); + check("*/*", new SpecList().add("B/x", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("C/x/z", mySpec) + .add("D/y/z", mySpec)); + + register.registerName("E/y"); + check("E/y", new SpecList().add("E/y", mySpec)); + check("*/*", new SpecList() + .add("B/x", mySpec) + .add("E/y", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("C/x/z", mySpec) + .add("D/y/z", mySpec)); + + register.registerName("F/y/w"); + check("F/y/w", new SpecList().add("F/y/w", mySpec)); + check("*/*", new SpecList() + .add("B/x", mySpec) + .add("E/y", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("C/x/z", mySpec) + .add("D/y/z", mySpec) + .add("F/y/w", mySpec)); + + check("*", new SpecList()); + + check("B/*", new SpecList() + .add("B/x", mySpec)); + + check("*/y", new SpecList() + .add("E/y", mySpec)); + + check("*/x/*", new SpecList() + .add("A/x/w", mySpec) + .add("C/x/z", mySpec)); + + check("*/*/z", new SpecList() + .add("C/x/z", mySpec) + .add("D/y/z", mySpec)); + + check("A/*/z", new SpecList()); + + check("A/*/w", new SpecList() + .add("A/x/w", mySpec)); + + register.unregisterName("E/y"); + register.unregisterName("C/x/z"); + register.unregisterName("F/y/w"); + check("*/*", new SpecList() + .add("B/x", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("D/y/z", mySpec)); + + register.registerName("E/y"); + register.registerName("C/x/z"); + register.registerName("F/y/w"); + check("*/*", new SpecList() + .add("B/x", mySpec) + .add("E/y", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("C/x/z", mySpec) + .add("D/y/z", mySpec) + .add("F/y/w", mySpec)); + + register.unregisterName("E/y"); + register.unregisterName("C/x/z"); + register.unregisterName("F/y/w"); + check("*/*", new SpecList() + .add("B/x", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("D/y/z", mySpec)); + + register.registerName("E/y"); + register.registerName("C/x/z"); + register.registerName("F/y/w"); + check("*/*", new SpecList() + .add("B/x", mySpec) + .add("E/y", mySpec)); + check("*/*/*", new SpecList() + .add("A/x/w", mySpec) + .add("C/x/z", mySpec) + .add("D/y/z", mySpec) + .add("F/y/w", mySpec)); + + assertFalse(error); + } + + public static void err(String msg) { + System.err.println(msg); + } +} diff --git a/jrt/tests/com/yahoo/jrt/SpecTest.java b/jrt/tests/com/yahoo/jrt/SpecTest.java new file mode 100644 index 00000000000..d15b8f95d30 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/SpecTest.java @@ -0,0 +1,98 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + +import java.net.InetSocketAddress; + +public class SpecTest extends junit.framework.TestCase { + + public SpecTest(String name) { + super(name); + } + + public void testPort() { + Spec spec = new Spec(457); + InetSocketAddress addr = new InetSocketAddress(457); + + assertEquals("tcp/457", spec.toString()); + assertFalse(spec.malformed()); + assertEquals(457, spec.port()); + assertNull(spec.host()); + assertTrue(addr.equals(spec.address())); + } + + public void testHostPort() { + String host = "localhost"; + Spec spec = new Spec(host, 457); + InetSocketAddress addr = new InetSocketAddress(host, 457); + + assertEquals("tcp/localhost:457", spec.toString()); + assertFalse(spec.malformed()); + assertEquals(457, spec.port()); + assertEquals(host, spec.host()); + assertTrue(addr.equals(spec.address())); + } + + public void testBogusHostPort() { + String host = "bogus.host.name"; + Spec spec = new Spec(host, 457); + InetSocketAddress addr = new InetSocketAddress(host, 457); + + assertEquals("tcp/bogus.host.name:457", spec.toString()); + assertFalse(spec.malformed()); + assertEquals(457, spec.port()); + assertEquals(host, spec.host()); + assertTrue(addr.equals(spec.address())); + } + + public void testSpec1() { + Spec spec = new Spec("tcp/localhost:8080"); + InetSocketAddress addr = new InetSocketAddress("localhost", 8080); + + assertEquals("tcp/localhost:8080", spec.toString()); + assertFalse(spec.malformed()); + assertEquals(8080, spec.port()); + assertEquals("localhost", spec.host()); + assertTrue(addr.equals(spec.address())); + } + + public void testSpec2() { + Spec spec = new Spec("tcp/8080"); + InetSocketAddress addr = new InetSocketAddress(8080); + + assertEquals("tcp/8080", spec.toString()); + assertFalse(spec.malformed()); + assertEquals(8080, spec.port()); + assertNull(spec.host()); + assertTrue(addr.equals(spec.address())); + } + + public void testBogusSpec1() { + Spec spec = new Spec("localhost:8080"); + + assertEquals("MALFORMED", spec.toString()); + assertTrue(spec.malformed()); + assertEquals(0, spec.port()); + assertNull(spec.host()); + assertNull(spec.address()); + } + + public void testBogusSpec2() { + Spec spec = new Spec("tcp/localhost:xyz"); + + assertEquals("MALFORMED", spec.toString()); + assertTrue(spec.malformed()); + assertEquals(0, spec.port()); + assertNull(spec.host()); + assertNull(spec.address()); + } + + public void testBogusSpec3() { + Spec spec = new Spec("tcp/localhost:"); + + assertEquals("MALFORMED", spec.toString()); + assertTrue(spec.malformed()); + assertEquals(0, spec.port()); + assertNull(spec.host()); + assertNull(spec.address()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/Test.java b/jrt/tests/com/yahoo/jrt/Test.java new file mode 100644 index 00000000000..52e386ae06d --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/Test.java @@ -0,0 +1,236 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.util.Arrays; + + +public class Test extends junit.framework.TestCase { + + public Test(String name) { super(name); } + public void testNothing() {} + + // www.random.org [2000, 9999] + public static final int PORT = 9741; + public static final int PORT_0 = 5069; + public static final int PORT_1 = 4935; + public static final int PORT_2 = 8862; + public static final int PORT_3 = 4695; + public static final int PORT_4 = 6975; + public static final int PORT_5 = 7186; + public static final int PORT_6 = 7694; + public static final int PORT_7 = 3518; + public static final int PORT_8 = 3542; + public static final int PORT_9 = 4954; + + /** + * Supervisor extension with some extra statistics used for + * testing. + **/ + public static class Orb extends Supervisor { + public int initCount = 0; + public int liveCount = 0; + public int readRequestCount = 0; + public int readReplyCount = 0; + public int readErrorCount = 0; + public long readBytes = 0; + public int writeRequestCount = 0; + public int writeReplyCount = 0; + public int writeErrorCount = 0; + public long writeBytes = 0; + public int downCount = 0; + public int finiCount = 0; + + public Orb(Transport t) { + super(t); + } + + public boolean checkReadCounts(int request, int reply, int error) { + return (request == readRequestCount && + reply == readReplyCount && + error == readErrorCount); + } + + public boolean checkWriteCounts(int request, int reply, int error) { + return (request == writeRequestCount && + reply == writeReplyCount && + error == writeErrorCount); + } + + public boolean checkLifeCounts(int init, int fini) { + return (init == initCount && fini == finiCount); + } + + public void sessionInit(Target target) { + initCount++; + super.sessionInit(target); + } + + public void sessionLive(Target target) { + liveCount++; + super.sessionLive(target); + } + + public void sessionDown(Target target) { + downCount++; + super.sessionDown(target); + } + + public void sessionFini(Target target) { + finiCount++; + super.sessionFini(target); + } + + public void readPacket(PacketInfo info) { + if (info.packetCode() == Packet.PCODE_REQUEST) { + readRequestCount++; + } else if (info.packetCode() == Packet.PCODE_REPLY) { + readReplyCount++; + } else if (info.packetCode() == Packet.PCODE_ERROR) { + readErrorCount++; + } + readBytes += info.packetLength(); + super.readPacket(info); + } + + public void writePacket(PacketInfo info) { + if (info.packetCode() == Packet.PCODE_REQUEST) { + writeRequestCount++; + } else if (info.packetCode() == Packet.PCODE_REPLY) { + writeReplyCount++; + } else if (info.packetCode() == Packet.PCODE_ERROR) { + writeErrorCount++; + } + writeBytes += info.packetLength(); + super.writePacket(info); + } + } + + /** + * A simple object used to wait for the completion of an + * asynchronous request. + **/ + public static class Waiter implements RequestWaiter { + private boolean done = false; + public boolean isDone() { + return done; + } + public synchronized void handleRequestDone(Request req) { + done = true; + notify(); + } + public synchronized void waitDone() { + while (!isDone()) { + try { wait(); } catch (InterruptedException e) {} + } + } + } + + /** + * A simple object used to make one thread wait until another + * thread tells it to continue. + **/ + public static class Barrier { + private boolean broken = false; + public synchronized void reset() { + broken = false; + } + public synchronized void breakIt() { + broken = true; + notify(); + } + public synchronized void waitFor() { + while (!broken) { + try { wait(); } catch (InterruptedException e) {} + } + } + } + + /** + * A simple object used to pass a single object from one thread to + * another. + **/ + public static class Receptor { + private Object obj = null; + public synchronized void reset() { + obj = null; + } + public synchronized Object get() { + while (obj == null) { + try { wait(); } catch (InterruptedException e) {} + } + return obj; + } + public synchronized void put(Object obj) { + this.obj = obj; + notify(); + } + } + + + public static boolean equals(byte[][] a, byte[][] b) { + if (a == null || b == null) { + return false; + } + if (a.length != b.length) { + return false; + } + for (int i = 0; i < a.length; i++) { + if (!Arrays.equals(a[i], b[i])) { + return false; + } + } + return true; + } + + public static boolean equals(Value a, Value b) { + if (a == null || b == null) { + return false; + } + if (a.type() != b.type()) { + return false; + } + switch (a.type()) { + case Value.INT8: return (a.asInt8() == b.asInt8()); + case Value.INT8_ARRAY: return Arrays.equals(a.asInt8Array(), + b.asInt8Array()); + case Value.INT16: return (a.asInt16() == b.asInt16()); + case Value.INT16_ARRAY: return Arrays.equals(a.asInt16Array(), + b.asInt16Array()); + case Value.INT32: return (a.asInt32() == b.asInt32()); + case Value.INT32_ARRAY: return Arrays.equals(a.asInt32Array(), + b.asInt32Array()); + case Value.INT64: return (a.asInt64() == b.asInt64()); + case Value.INT64_ARRAY: return Arrays.equals(a.asInt64Array(), + b.asInt64Array()); + case Value.FLOAT: return (a.asFloat() == b.asFloat()); + case Value.FLOAT_ARRAY: return Arrays.equals(a.asFloatArray(), + b.asFloatArray()); + case Value.DOUBLE: return (a.asDouble() == b.asDouble()); + case Value.DOUBLE_ARRAY: return Arrays.equals(a.asDoubleArray(), + b.asDoubleArray()); + case Value.DATA: return Arrays.equals(a.asData(), b.asData()); + case Value.DATA_ARRAY: return equals(a.asDataArray(), + b.asDataArray()); + case Value.STRING: return a.asString().equals(b.asString()); + case Value.STRING_ARRAY: return Arrays.equals(a.asStringArray(), + b.asStringArray()); + default: return false; + } + } + + public static boolean equals(Values a, Values b) { + if (a == null || b == null) { + return false; + } + if (a.size() != b.size()) { + return false; + } + for (int i = 0; i < a.size(); i++) { + if (!equals(a.get(i), b.get(i))) { + return false; + } + } + return true; + } +} diff --git a/jrt/tests/com/yahoo/jrt/TimeoutTest.java b/jrt/tests/com/yahoo/jrt/TimeoutTest.java new file mode 100644 index 00000000000..4b448fa29a8 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/TimeoutTest.java @@ -0,0 +1,77 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class TimeoutTest extends junit.framework.TestCase { + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + Test.Barrier barrier; + + public TimeoutTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + server.addMethod(new Method("concat", "ss", "s", this, "rpc_concat") + .methodDesc("Concatenate 2 strings") + .paramDesc(0, "str1", "a string") + .paramDesc(1, "str2", "another string") + .returnDesc(0, "ret", "str1 followed by str2")); + barrier = new Test.Barrier(); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void rpc_concat(Request req) { + barrier.waitFor(); + req.returnValues().add(new StringValue(req.parameters() + .get(0).asString() + + req.parameters() + .get(1).asString())); + } + + public void testTimeout() { + Request req = new Request("concat"); + req.parameters().add(new StringValue("abc")); + req.parameters().add(new StringValue("def")); + + target.invokeSync(req, 0.1); + barrier.breakIt(); + + Request flush = new Request("frt.rpc.ping"); + target.invokeSync(flush, 5.0); + assertTrue(!flush.isError()); + + assertTrue(req.isError()); + assertEquals(ErrorCode.TIMEOUT, req.errorCode()); + assertEquals(0, req.returnValues().size()); + } + + public void testNotTimeout() { + Request req = new Request("concat"); + req.parameters().add(new StringValue("abc")); + req.parameters().add(new StringValue("def")); + + Test.Waiter w = new Test.Waiter(); + target.invokeAsync(req, 30.0, w); + try { Thread.sleep(2500); } catch (InterruptedException e) {} + barrier.breakIt(); + w.waitDone(); + + assertTrue(!req.isError()); + assertEquals(1, req.returnValues().size()); + assertEquals("abcdef", req.returnValues().get(0).asString()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/ValuesTest.java b/jrt/tests/com/yahoo/jrt/ValuesTest.java new file mode 100644 index 00000000000..36320b10993 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/ValuesTest.java @@ -0,0 +1,435 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class ValuesTest extends junit.framework.TestCase { + + public ValuesTest(String name) { + super(name); + } + + public void testEmpty() { + Values src = new Values(); + assertEquals(src.bytes(), 4); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), 4); + + Values dst = new Values(); + dst.decode(buf); + assertEquals(dst.bytes(), 4); + } + + void checkSingleValue(Values v, byte type, int bytes) { + assertEquals(v.size(), 1); + assertEquals(v.get(0).type(), type); + assertEquals(v.bytes(), bytes); + } + + public void testInt8() { + int byteSize = 4 + 1 + 1; + Values src = new Values(); + src.add(new Int8Value((byte)1)); + checkSingleValue(src, Value.INT8, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT8, byteSize); + assertEquals(dst.get(0).asInt8(), (byte)1); + } + + public void testInt8Array() { + int byteSize = 4 + 1 + 4 + 4; + Values src = new Values(); + byte[] val = { 1, 2, 3, 4 }; + src.add(new Int8Array(val)); + checkSingleValue(src, Value.INT8_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT8_ARRAY, byteSize); + assertTrue(Arrays.equals(dst.get(0).asInt8Array(), val)); + } + + public void testInt16() { + int byteSize = 4 + 1 + 2; + Values src = new Values(); + src.add(new Int16Value((short)2)); + checkSingleValue(src, Value.INT16, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT16, byteSize); + assertEquals(dst.get(0).asInt16(), (short)2); + } + + public void testInt16Array() { + int byteSize = 4 + 1 + 4 + 4 * 2; + Values src = new Values(); + short[] val = { 2, 4, 6, 8 }; + src.add(new Int16Array(val)); + checkSingleValue(src, Value.INT16_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT16_ARRAY, byteSize); + assertTrue(Arrays.equals(dst.get(0).asInt16Array(), val)); + } + + public void testInt32() { + int byteSize = 4 + 1 + 4; + Values src = new Values(); + src.add(new Int32Value(4)); + checkSingleValue(src, Value.INT32, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT32, byteSize); + assertEquals(dst.get(0).asInt32(), 4); + } + + public void testInt32Array() { + int byteSize = 4 + 1 + 4 + 4 * 4; + Values src = new Values(); + int[] val = { 4, 8, 12, 16 }; + src.add(new Int32Array(val)); + checkSingleValue(src, Value.INT32_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT32_ARRAY, byteSize); + assertTrue(Arrays.equals(dst.get(0).asInt32Array(), val)); + } + + public void testInt64() { + int byteSize = 4 + 1 + 8; + Values src = new Values(); + src.add(new Int64Value(8)); + checkSingleValue(src, Value.INT64, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT64, byteSize); + assertEquals(dst.get(0).asInt64(), 8); + } + + public void testInt64Array() { + int byteSize = 4 + 1 + 4 + 4 * 8; + Values src = new Values(); + long[] val = { 8, 16, 24, 32 }; + src.add(new Int64Array(val)); + checkSingleValue(src, Value.INT64_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.INT64_ARRAY, byteSize); + assertTrue(Arrays.equals(dst.get(0).asInt64Array(), val)); + } + + public void testFloat() { + int byteSize = 4 + 1 + 4; + Values src = new Values(); + src.add(new FloatValue((float)2.5)); + checkSingleValue(src, Value.FLOAT, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.FLOAT, byteSize); + assertTrue(dst.get(0).asFloat() == (float)2.5); + } + + public void testFloatArray() { + int byteSize = 4 + 1 + 4 + 4 * 4; + Values src = new Values(); + float[] val = { 1.5f, 2.0f, 2.5f, 3.0f }; + src.add(new FloatArray(val)); + checkSingleValue(src, Value.FLOAT_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.FLOAT_ARRAY, byteSize); + assertTrue(Arrays.equals(dst.get(0).asFloatArray(), val)); + } + + public void testDouble() { + int byteSize = 4 + 1 + 8; + Values src = new Values(); + src.add(new DoubleValue(3.75)); + checkSingleValue(src, Value.DOUBLE, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.DOUBLE, byteSize); + assertTrue(dst.get(0).asDouble() == 3.75); + } + + public void testDoubleArray() { + int byteSize = 4 + 1 + 4 + 4 * 8; + Values src = new Values(); + double[] val = { 1.25, 1.50, 1.75, 2.00 }; + src.add(new DoubleArray(val)); + checkSingleValue(src, Value.DOUBLE_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.DOUBLE_ARRAY, byteSize); + assertTrue(Arrays.equals(dst.get(0).asDoubleArray(), val)); + } + + public void testData() { + int byteSize = 4 + 1 + 4 + 4; + Values src = new Values(); + byte[] val = { 1, 2, 3, 4 }; + src.add(new DataValue(val)); + checkSingleValue(src, Value.DATA, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.DATA, byteSize); + assertTrue(Arrays.equals(dst.get(0).asData(), val)); + } + + public void testDataArray() { + int byteSize = 4 + 1 + 4 + 4 * (4 + 4); + Values src = new Values(); + byte[][] val = {{ 1, 0, 1, 0 }, + { 0, 2, 0, 2 }, + { 3, 0, 3, 0 }, + { 0, 4, 0, 4 }}; + src.add(new DataArray(val)); + checkSingleValue(src, Value.DATA_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.DATA_ARRAY, byteSize); + assertTrue(Arrays.equals(dst.get(0).asDataArray()[0], val[0])); + assertTrue(Arrays.equals(dst.get(0).asDataArray()[1], val[1])); + assertTrue(Arrays.equals(dst.get(0).asDataArray()[2], val[2])); + assertTrue(Arrays.equals(dst.get(0).asDataArray()[3], val[3])); + } + + public void testString1() { + int byteSize = 4 + 1 + 4 + 4; + Values src = new Values(); + String val = "test"; + src.add(new StringValue(val)); + checkSingleValue(src, Value.STRING, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.STRING, byteSize); + assertTrue(dst.get(0).asString().equals("test")); + } + + public void testString2() { + int byteSize = 4 + 1 + 4 + 7; + Values src = new Values(); + String val = "H" + ((char)229) + "vard"; + src.add(new StringValue(val)); + checkSingleValue(src, Value.STRING, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + byte right[] = new byte[] { 0, 0, 0, 1, 's', + 0, 0, 0, 7, 'H', + (byte)(0xC0 | (0xE5 >> 6)), + (byte)(0x80 | (0xE5 & 0x3F)), + 'v', 'a', 'r', 'd' + }; + for (int ii = 0; ii < buf.remaining(); ++ii) { + assertEquals(buf.get(ii), right[ii]); + } + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.STRING, byteSize); + assertTrue(dst.get(0).asString().equals("H\u00E5vard")); + } + + public void testStringArray() { + int byteSize = 4 + 1 + 4 + 4 * 4 + 3 + 3 + 5 + 4; + Values src = new Values(); + String[] val = { "one", "two", "three", "four" }; + src.add(new StringArray(val)); + checkSingleValue(src, Value.STRING_ARRAY, byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + checkSingleValue(src, Value.STRING_ARRAY, byteSize); + assertTrue(dst.get(0).asStringArray()[0].equals("one")); + assertTrue(dst.get(0).asStringArray()[1].equals("two")); + assertTrue(dst.get(0).asStringArray()[2].equals("three")); + assertTrue(dst.get(0).asStringArray()[3].equals("four")); + } + + public void testAllValues() { + int byteSize = + 4 + 16 // typestring + + 1 // int8 + + 4 + 4 // int8 array + + 2 // int16 + + 4 + 4 * 2 // int16 array + + 4 // int32 + + 4 + 4 * 4 // int32 array + + 8 // int64 + + 4 + 4 * 8 // int64 array + + 4 // float + + 4 + 4 * 4 // float array + + 8 // double + + 4 + 4 * 8 // double array + + 4 + 4 // data + + 4 + 4 * 4 + 4 + 4 + 4 + 4 // data array + + 4 + 4 // string + + 4 + 4 * 4 + 3 + 3 + 5 + 4; // string array + + byte[] dataValue = { 1, 2, 3, 4 }; + byte[] int8Array = { 1, 2, 3, 4 }; + short[] int16Array = { 2, 4, 6, 8 }; + int[] int32Array = { 4, 8, 12, 16 }; + long[] int64Array = { 8, 16, 24, 32 }; + float[] floatArray = { 1.5f, 2.0f, 2.5f, 3.0f }; + double[] doubleArray = { 1.25, 1.50, 1.75, 2.00 }; + byte[][] dataArray = {{ 1, 0, 1, 0 }, + { 0, 2, 0, 2 }, + { 3, 0, 3, 0 }, + { 0, 4, 0, 4 }}; + String[] stringArray = { "one", "two", "three", "four" }; + + Values src = new Values(); + src.add(new Int8Value((byte)1)); + src.add(new Int8Array(int8Array)); + src.add(new Int16Value((short)2)); + src.add(new Int16Array(int16Array)); + src.add(new Int32Value(4)); + src.add(new Int32Array(int32Array)); + src.add(new Int64Value(8)); + src.add(new Int64Array(int64Array)); + src.add(new FloatValue(2.5f)); + src.add(new FloatArray(floatArray)); + src.add(new DoubleValue(3.75)); + src.add(new DoubleArray(doubleArray)); + src.add(new DataValue(dataValue)); + src.add(new DataArray(dataArray)); + src.add(new StringValue("test")); + src.add(new StringArray(stringArray)); + assertEquals(src.size(), 16); + assertEquals(src.bytes(), byteSize); + + ByteBuffer buf = ByteBuffer.allocate(src.bytes()); + src.encode(buf); + buf.flip(); + assertEquals(buf.remaining(), byteSize); + + Values dst = new Values(); + dst.decode(buf); + assertEquals(dst.get(0).asInt8(), (byte)1); + assertTrue(Arrays.equals(dst.get(1).asInt8Array(), int8Array)); + assertEquals(dst.get(2).asInt16(), (short)2); + assertTrue(Arrays.equals(dst.get(3).asInt16Array(), int16Array)); + assertEquals(dst.get(4).asInt32(), 4); + assertTrue(Arrays.equals(dst.get(5).asInt32Array(), int32Array)); + assertEquals(dst.get(6).asInt64(), 8); + assertTrue(Arrays.equals(dst.get(7).asInt64Array(), int64Array)); + assertTrue(dst.get(8).asFloat() == (float)2.5); + assertTrue(Arrays.equals(dst.get(9).asFloatArray(), floatArray)); + assertTrue(dst.get(10).asDouble() == 3.75); + assertTrue(Arrays.equals(dst.get(11).asDoubleArray(), doubleArray)); + assertTrue(Arrays.equals(dst.get(12).asData(), dataValue)); + assertTrue(Arrays.equals(dst.get(13).asDataArray()[0], dataArray[0])); + assertTrue(Arrays.equals(dst.get(13).asDataArray()[1], dataArray[1])); + assertTrue(Arrays.equals(dst.get(13).asDataArray()[2], dataArray[2])); + assertTrue(Arrays.equals(dst.get(13).asDataArray()[3], dataArray[3])); + assertTrue(dst.get(14).asString().equals("test")); + assertTrue(dst.get(15).asStringArray()[0].equals("one")); + assertTrue(dst.get(15).asStringArray()[1].equals("two")); + assertTrue(dst.get(15).asStringArray()[2].equals("three")); + assertTrue(dst.get(15).asStringArray()[3].equals("four")); + } +} diff --git a/jrt/tests/com/yahoo/jrt/WatcherTest.java b/jrt/tests/com/yahoo/jrt/WatcherTest.java new file mode 100644 index 00000000000..5a3fae5e2bf --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/WatcherTest.java @@ -0,0 +1,98 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +public class WatcherTest extends junit.framework.TestCase { + + private static class Watcher implements TargetWatcher { + private int notifyCnt = 0; + public void notifyTargetInvalid(Target target) { + notifyCnt++; + } + public int cnt() { + return notifyCnt; + } + public boolean equals(Object rhs) { + return true; + } + public int hashCode() { + return 0; + } + } + + Supervisor server; + Acceptor acceptor; + Supervisor client; + Target target; + + public WatcherTest(String name) { + super(name); + } + + public void setUp() throws ListenFailedException { + server = new Supervisor(new Transport()); + client = new Supervisor(new Transport()); + acceptor = server.listen(new Spec(Test.PORT)); + target = client.connect(new Spec("localhost", Test.PORT)); + } + + public void tearDown() { + target.close(); + acceptor.shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); + } + + public void testNotify() { + Watcher w1 = new Watcher(); + Watcher w2 = new Watcher(); + Watcher w3 = new Watcher(); + Watcher w4 = new Watcher(); + Watcher w5 = new Watcher(); + + assertTrue(target.addWatcher(w1)); + assertTrue(target.addWatcher(w1)); + assertTrue(target.addWatcher(w1)); + + assertTrue(target.addWatcher(w2)); + assertTrue(target.addWatcher(w2)); + assertTrue(target.addWatcher(w2)); + assertTrue(target.removeWatcher(w2)); + assertTrue(target.removeWatcher(w2)); + assertTrue(target.addWatcher(w2)); + + assertTrue(target.addWatcher(w3)); + assertTrue(target.removeWatcher(w3)); + + assertTrue(target.removeWatcher(w4)); + + assertTrue(target.addWatcher(w5)); + assertTrue(target.addWatcher(w5)); + assertTrue(target.addWatcher(w5)); + assertTrue(target.removeWatcher(w5)); + + target.close(); + client.transport().sync(); + + assertEquals(1, w1.cnt()); + assertEquals(1, w2.cnt()); + assertEquals(0, w3.cnt()); + assertEquals(0, w4.cnt()); + assertEquals(0, w5.cnt()); + + assertFalse(target.removeWatcher(w1)); + assertFalse(target.removeWatcher(w2)); + assertFalse(target.addWatcher(w3)); + assertFalse(target.addWatcher(w4)); + assertFalse(target.addWatcher(w5)); + + target.close(); + client.transport().sync(); + + assertEquals(1, w1.cnt()); + assertEquals(1, w2.cnt()); + assertEquals(0, w3.cnt()); + assertEquals(0, w4.cnt()); + assertEquals(0, w5.cnt()); + } +} diff --git a/jrt/tests/com/yahoo/jrt/order.txt b/jrt/tests/com/yahoo/jrt/order.txt new file mode 100644 index 00000000000..4e90d2635ca --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/order.txt @@ -0,0 +1,25 @@ +Topological sorting of test dependencies (success assumptions) +=============================================================================== +Test.java (does no testing, but contains common stuff) +ValuesTest.java +SpecTest.java +QueueTest.java +PacketTest.java +SchedulerTest.java +ListenTest.java +ConnectTest.java +WatcherTest.java +InvokeSyncTest.java +InvokeAsyncTest.java +InvokeVoidTest.java +EchoTest.java +InvokeErrorTest.java +MandatoryMethodsTest.java +DetachTest.java +AbortTest.java +BackTargetTest.java +TimeoutTest.java +SessionTest.java +=============================================================================== +NOTE: 'ls -al | wc -l' should give the same result as 'wc -l order.txt' +=============================================================================== diff --git a/jrt/tests/com/yahoo/jrt/slobrok/api/BackOffTestCase.java b/jrt/tests/com/yahoo/jrt/slobrok/api/BackOffTestCase.java new file mode 100644 index 00000000000..6e3e7443e3d --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/slobrok/api/BackOffTestCase.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * @author arnej27959 + */ +public class BackOffTestCase { + + static final double[] expectWait = { + 0.5, 1.0, 1.5, 2.0, 2.5, + 3.0, 3.5, 4.0, 4.5, + 5.0, 6.0, 7.0, 8.0, 9.0, + 10, 15, 20, 25, 30, 30, 30 + }; + + @Test + public void requireThatWaitTimesAreExpected() { + double sum = 0; + BackOffPolicy two = new BackOff(); + for (int i = 0; i < expectWait.length; i++) { + double got = two.get(); + sum += got; + assertEquals(expectWait[i], got, 0.001); + boolean sw = two.shouldWarn(got); +/* + System.err.println("i = "+i); + System.err.println("got = "+got); + System.err.println("sum = "+sum); + System.err.println("sw = "+sw); +*/ + if (i == 13 || i > 17) { + assertTrue(two.shouldWarn(got)); + } else { + assertFalse(two.shouldWarn(got)); + } + } + two.reset(); + for (int i = 0; i < expectWait.length; i++) { + double got = two.get(); + assertEquals(expectWait[i], got, 0.001); + if (i == 13 || i > 17) { + assertTrue(two.shouldWarn(got)); + } else { + assertFalse(two.shouldWarn(got)); + } + } + + } +} diff --git a/jrt/tests/com/yahoo/jrt/slobrok/api/MirrorTest.java b/jrt/tests/com/yahoo/jrt/slobrok/api/MirrorTest.java new file mode 100644 index 00000000000..d0ee8abfca0 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/slobrok/api/MirrorTest.java @@ -0,0 +1,89 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + +import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class MirrorTest { + + static void mustMatch(String name, String pattern) { + assertTrue(Mirror.match(name.toCharArray(), pattern.toCharArray())); + } + + static void mustNotMatch(String name, String pattern) { + assertFalse(Mirror.match(name.toCharArray(), pattern.toCharArray())); + } + + @Test public void requireThatPatternMatchesSameString() { + String pattern = "foo/bar*zot/qux?foo**bar*/*nop*"; + mustMatch(pattern, pattern); + } + + @Test public void requireThatStarIsPrefixMatch() { + String pattern = "foo/bar.*/qux.*/bar*/nop*"; + String matches = "foo/bar.foo/qux.bar/bar123/nop000"; + mustMatch(matches, pattern); + + matches = "foo/bar.bar/qux.qux/bar.bar/nop.nop"; + mustMatch(matches, pattern); + + matches = "foo/bar.1/qux.3/bar.4/nop.5"; + mustMatch(matches, pattern); + } + + @Test public void requireThatStarMatchesEmptyString() { + String pattern = "foo/bar.*/qux.*/bar*/nop*"; + String matches = "foo/bar./qux./bar/nop"; + mustMatch(matches, pattern); + } + + @Test public void requireThatExtraBeforeSlashIsNotMatch() { + String pattern = "foo/*"; + String nomatch = "foo1/bar"; + mustNotMatch(nomatch, pattern); + } + + @Test public void requireThatStarDoesNotMatchMultipleLevels() { + String pattern = "foo/*/qux"; + String matches = "foo/bar/qux"; + String nomatch = "foo/bar/bar/qux"; + mustMatch(matches, pattern); + mustNotMatch(nomatch, pattern); + + pattern = "*"; + nomatch = "foo/bar.foo/qux.bar/bar123/nop000"; + mustNotMatch(nomatch, pattern); + } + + @Test public void requireThatDoubleStarMatchesMultipleLevels() { + String pattern = "**"; + String matches = "foo/bar.foo/qux.bar/bar123/nop000"; + mustMatch(matches, pattern); + + pattern = "foo/**"; + matches = "foo/bar.foo/qux.bar/bar123/nop000"; + mustMatch(matches, pattern); + + pattern = "foo**"; + matches = "foo/bar.foo/qux.bar/bar123/nop000"; + mustMatch(matches, pattern); + + pattern = "f**"; + matches = "foo/bar.foo/qux.bar/bar123/nop000"; + mustMatch(matches, pattern); + } + + @Test public void requireThatDoubleStarMatchesNothing() { + String pattern = "A**"; + String matches = "A"; + mustMatch(matches, pattern); + } + + @Test public void requireThatDoubleStarEatsRestOfName() { + String pattern = "foo/**/suffix"; + String nomatch = "foo/bar/baz/suffix"; + mustNotMatch(nomatch, pattern); + } + +} diff --git a/jrt/tests/com/yahoo/jrt/slobrok/api/SlobrokListTestCase.java b/jrt/tests/com/yahoo/jrt/slobrok/api/SlobrokListTestCase.java new file mode 100644 index 00000000000..36874f0b80f --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/slobrok/api/SlobrokListTestCase.java @@ -0,0 +1,130 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.slobrok.api; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SlobrokListTestCase { + + @Test + public void requireThatNextSlobrokSpecReturnsNullAtEndOfList() { + SlobrokList lst = new SlobrokList(); + lst.setup(new String[] { "foo", "bar" }); + if ("[foo, bar]".equals(lst.toString())) { + assertEquals("foo", lst.nextSlobrokSpec()); + assertEquals("bar", lst.nextSlobrokSpec()); + assertNull(lst.nextSlobrokSpec()); + assertEquals("foo", lst.nextSlobrokSpec()); + assertEquals("bar", lst.nextSlobrokSpec()); + assertNull(lst.nextSlobrokSpec()); + assertEquals("[foo, bar]", lst.toString()); + } else { + assertEquals("bar", lst.nextSlobrokSpec()); + assertEquals("foo", lst.nextSlobrokSpec()); + assertNull(lst.nextSlobrokSpec()); + assertEquals("bar", lst.nextSlobrokSpec()); + assertEquals("foo", lst.nextSlobrokSpec()); + assertNull(lst.nextSlobrokSpec()); + assertEquals("[bar, foo]", lst.toString()); + } + } + + @Test + public void requireThatSiblingsIterateIndependently() { + SlobrokList foo = new SlobrokList(); + SlobrokList bar = new SlobrokList(foo); + foo.setup(new String[] { "foo", "bar" }); + if ("[foo, bar]".equals(foo.toString())) { + assertEquals("foo", foo.nextSlobrokSpec()); + assertEquals("foo", bar.nextSlobrokSpec()); + assertEquals("bar", foo.nextSlobrokSpec()); + assertEquals("bar", bar.nextSlobrokSpec()); + assertNull(foo.nextSlobrokSpec()); + assertNull(bar.nextSlobrokSpec()); + } else { + assertEquals("bar", foo.nextSlobrokSpec()); + assertEquals("bar", bar.nextSlobrokSpec()); + assertEquals("foo", foo.nextSlobrokSpec()); + assertEquals("foo", bar.nextSlobrokSpec()); + assertNull(foo.nextSlobrokSpec()); + assertNull(bar.nextSlobrokSpec()); + } + } + + @Test + public void requireThatLengthIsUpdatedBySetup() { + SlobrokList foo = new SlobrokList(); + assertEquals(0, foo.length()); + foo.setup(new String[69]); + assertEquals(69, foo.length()); + } + + @Test + public void requireThatIndexIsResetOnSetup() { + SlobrokList lst = new SlobrokList(); + lst.setup(new String[] { "foo", "foo" }); + assertEquals("foo", lst.nextSlobrokSpec()); + lst.setup(new String[] { "baz" }); + assertEquals("baz", lst.nextSlobrokSpec()); + assertNull(lst.nextSlobrokSpec()); + assertEquals("[baz]", lst.toString()); + } + + @Test + public void requireThatUpdateAffectsSiblings() { + SlobrokList foo = new SlobrokList(); + SlobrokList bar = new SlobrokList(foo); + + assertEquals(0, foo.length()); + assertEquals(0, bar.length()); + + foo.setup(new String[] { "foo" }); + assertEquals(1, foo.length()); + assertEquals(1, bar.length()); + assertEquals("foo", foo.nextSlobrokSpec()); + assertEquals("foo", bar.nextSlobrokSpec()); + assertEquals("[foo]", foo.toString()); + assertEquals("[foo]", bar.toString()); + + foo.setup(new String[] { "baz" }); + assertEquals(1, foo.length()); + assertEquals(1, bar.length()); + assertEquals("baz", bar.nextSlobrokSpec()); + assertEquals("baz", foo.nextSlobrokSpec()); + assertNull(foo.nextSlobrokSpec()); + assertNull(bar.nextSlobrokSpec()); + assertEquals("[baz]", foo.toString()); + assertEquals("[baz]", bar.toString()); + } + + @Test + public void requireThatUpdateAffectsContains() { + SlobrokList foo = new SlobrokList(); + foo.setup(new String[] { "foo", "bar" }); + assertEquals(2, foo.length()); + String one = foo.nextSlobrokSpec(); + String two = foo.nextSlobrokSpec(); + assertNull(foo.nextSlobrokSpec()); + assertEquals(true, foo.contains(one)); + assertEquals(true, foo.contains(two)); + assertEquals(true, foo.contains("foo")); + assertEquals(true, foo.contains("bar")); + assertEquals(false, foo.contains("baz")); + + foo.setup(new String[] { "foo", "baz" }); + assertEquals(2, foo.length()); + assertEquals(true, foo.contains("foo")); + assertEquals(false, foo.contains("bar")); + assertEquals(true, foo.contains("baz")); + one = foo.nextSlobrokSpec(); + two = foo.nextSlobrokSpec(); + assertNull(foo.nextSlobrokSpec()); + assertEquals(true, foo.contains(one)); + assertEquals(true, foo.contains(two)); + } +} diff --git a/jrt/tests/com/yahoo/jrt/tool/RpcInvokerTest.java b/jrt/tests/com/yahoo/jrt/tool/RpcInvokerTest.java new file mode 100644 index 00000000000..fc3a1a63699 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/tool/RpcInvokerTest.java @@ -0,0 +1,59 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt.tool; + +import com.yahoo.jrt.Request; + +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +/** + * @author bratseth + */ +public class RpcInvokerTest extends junit.framework.TestCase { + + public RpcInvokerTest(String name) { + super(name); + } + + public void test0Args() { + assertCorrectArguments(""); + } + + public void test1StringShorthanArgs() { + assertCorrectArguments("foo"); + } + + public void test2StringArgs() { + assertCorrectArguments("s:foo s:bar"); + } + + public void test2StringShorthandArgs() { + assertCorrectArguments("foo bar"); + } + + protected void assertCorrectArguments(String argString) { + RpcInvoker invoker=new RpcInvoker(); + List<String> args=toList(argString); + Request request=invoker.createRequest("testmethod",args); + for (int i=0; i<args.size(); i++) { + // Strip type here if present + String arg=args.get(i); + if (arg.length()>=1 && arg.charAt(1)==':') + arg=arg.substring(2); + assertEquals(arg,request.parameters().get(i).toString()); + } + } + + private List<String> toList(String argsString) { + List<String> argsList=new ArrayList<String>(); + String[] argsArray=argsString.split(" "); + for (String arg : argsArray) { + if (arg.trim().length()==0) continue; + argsList.add(arg); + } + return argsList; + } + +} |