summaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2022-05-18 17:26:10 +0200
committerJon Bratseth <bratseth@gmail.com>2022-05-18 17:26:10 +0200
commit0217d33b34c7841e4e8476bb301b81c97ba208fa (patch)
treef616b6ca46cdefbadcddb845b7d0c365fef7df77 /jrt
parenta7ee9c1ba3b65fea3866dcb95e4ceb1d84ca0f4d (diff)
No functional changes
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Supervisor.java58
-rw-r--r--jrt/src/com/yahoo/jrt/Target.java56
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java42
-rw-r--r--jrt/src/com/yahoo/jrt/TransportThread.java39
4 files changed, 98 insertions, 97 deletions
diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java
index 48dd2d8bd3d..65deea0dc61 100644
--- a/jrt/src/com/yahoo/jrt/Supervisor.java
+++ b/jrt/src/com/yahoo/jrt/Supervisor.java
@@ -12,7 +12,7 @@ import java.util.concurrent.atomic.AtomicReference;
* requests obtained from that {@link Target}. Note that RPC
* invocations can be performed both ways across a connection, so even
* the client side of a connection has RPC server capabilities.
- **/
+ */
public class Supervisor {
private final Transport transport;
@@ -23,23 +23,22 @@ public class Supervisor {
private boolean dropEmptyBuffers = false;
/**
- * Create a new Supervisor based on the given {@link Transport}
+ * Creates a new Supervisor based on the given {@link Transport}
*
- * @param transport object performing low-level operations for
- * this Supervisor
- **/
+ * @param transport object performing low-level operations for this Supervisor
+ */
public Supervisor(Transport transport) {
this.transport = transport;
new MandatoryMethods(this);
}
/**
- * Drop empty buffers. This will reduce memory footprint for idle
+ * Drops empty buffers. This will reduce memory footprint for idle
* connections at the cost of extra allocations when buffer space
* is needed again.
*
* @param value true means drop empty buffers
- **/
+ */
public Supervisor setDropEmptyBuffers(boolean value) {
dropEmptyBuffers = value;
return this;
@@ -47,7 +46,7 @@ public class Supervisor {
boolean getDropEmptyBuffers() { return dropEmptyBuffers; }
/**
- * Set maximum input buffer size. This value will only affect
+ * Sets maximum input buffer size. This value will only affect
* connections that use a common input buffer when decoding
* incoming packets. Note that this value is not an absolute
* max. The buffer will still grow larger than this value if
@@ -55,14 +54,14 @@ public class Supervisor {
* larger than this value, it will be shrunk back when possible.
*
* @param bytes buffer size in bytes. 0 means unlimited.
- **/
+ */
public void setMaxInputBufferSize(int bytes) {
maxInputBufferSize = bytes;
}
int getMaxInputBufferSize() { return maxInputBufferSize; }
/**
- * Set maximum output buffer size. This value will only affect
+ * Sets maximum output buffer size. This value will only affect
* connections that use a common output buffer when encoding
* outgoing packets. Note that this value is not an absolute
* max. The buffer will still grow larger than this value if needed
@@ -70,35 +69,35 @@ public class Supervisor {
* than this value, it will be shrunk back when possible.
*
* @param bytes buffer size in bytes. 0 means unlimited.
- **/
+ */
public void setMaxOutputBufferSize(int bytes) {
maxOutputBufferSize = bytes;
}
int getMaxOutputBufferSize() { return maxOutputBufferSize; }
/**
- * Obtain the method map for this Supervisor
+ * Obtains the method map for this Supervisor
*
* @return the method map
- **/
+ */
HashMap<String, Method> methodMap() {
return methodMap.getAcquire();
}
/**
- * Obtain the underlying Transport object.
+ * Obtains the underlying Transport object.
*
* @return underlying Transport object
- **/
+ */
public Transport transport() {
return transport;
}
/**
- * Add a method to the set of methods held by this Supervisor
+ * Adds a method to the set of methods held by this Supervisor
*
* @param method the method to add
- **/
+ */
public void addMethod(Method method) {
synchronized (methodMapLock) {
HashMap<String, Method> newMap = new HashMap<>(methodMap());
@@ -108,12 +107,12 @@ public class Supervisor {
}
/**
- * Remove a method from the set of methods held by this
+ * Removes a method from the set of methods held by this
* Supervisor. Use this if you know exactly which method to remove
* and not only the name.
*
* @param method the method to remove
- **/
+ */
public void removeMethod(Method method) {
synchronized (methodMapLock) {
HashMap<String, Method> newMap = new HashMap<>(methodMap());
@@ -124,19 +123,19 @@ public class Supervisor {
}
/**
- * Connect to the given address. The new {@link Target} will be
+ * Connects to the given address. The new {@link Target} will be
* associated with this Supervisor.
*
* @return Target representing our end of the connection
* @param spec where to connect
* @see #connect(com.yahoo.jrt.Spec, java.lang.Object)
- **/
+ */
public Target connect(Spec spec) {
return transport.connect(this, spec, null);
}
/**
- * Connect to the given address. The new {@link Target} will be
+ * Connects to the given address. The new {@link Target} will be
* associated with this Supervisor and will have 'context' as
* application context.
*
@@ -144,18 +143,18 @@ public class Supervisor {
* @param spec where to connect
* @param context application context for the Target
* @see Target#getContext
- **/
+ */
public Target connect(Spec spec, Object context) {
return transport.connect(this, spec, context);
}
/**
- * Listen to the given address.
+ * Listens to the given address.
*
* @return active object accepting new connections that will be
* associated with this Supervisor
* @param spec the address to listen to
- **/
+ */
public Acceptor listen(Spec spec) throws ListenFailedException {
return transport.listen(this, spec);
}
@@ -165,7 +164,7 @@ public class Supervisor {
* is empty and only used for testing through sub-classing.
*
* @param info information about the written packet
- **/
+ */
void writePacket(PacketInfo info) {}
/**
@@ -173,17 +172,17 @@ public class Supervisor {
* is empty and only used for testing through sub-classing.
*
* @param info information about the read packet
- **/
+ */
void readPacket(PacketInfo info) {}
/**
- * Handle a packet received on one of the connections associated
+ * Handles a packet received on one of the connections associated
* with this Supervisor. This method is invoked for all packets
* not handled by a {@link ReplyHandler}
*
* @param conn where the packet came from
* @param packet the packet
- **/
+ */
void handlePacket(Connection conn, Packet packet) {
if (packet.packetCode() != Packet.PCODE_REQUEST) {
return;
@@ -195,4 +194,5 @@ public class Supervisor {
packet.requestId(),
packet.noReply()).invoke();
}
+
}
diff --git a/jrt/src/com/yahoo/jrt/Target.java b/jrt/src/com/yahoo/jrt/Target.java
index 1bc62d12ac9..a59aa341fe0 100644
--- a/jrt/src/com/yahoo/jrt/Target.java
+++ b/jrt/src/com/yahoo/jrt/Target.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
-
import java.util.Optional;
/**
@@ -10,54 +9,54 @@ import java.util.Optional;
* side. The client side is the one initiating the connection. RPC
* requests may be invoked across the connection from both the client
* and the server side.
- **/
+ */
public abstract class Target {
private Object context;
/**
- * Create a Target with the given application context.
+ * Creates a Target with the given application context.
*
* @param context application context
- **/
+ */
Target(Object context) {
this.context = context;
}
/**
- * Create a Target without any application context.
- **/
+ * Creates a Target without any application context.
+ */
Target() {
this(null);
}
/**
- * Set the application context associated with this target.
+ * Sets the application context associated with this target.
*
- * @param context application context
- **/
+ * @param context the application context
+ */
public void setContext(Object context) {
this.context = context;
}
/**
- * Obtain the application context associated with this target.
+ * Obtains the application context associated with this target.
*
- * @return application context
- **/
+ * @return the application context
+ */
public Object getContext() {
return context;
}
/**
- * Check if this target is still valid for invocations.
+ * Checks if this target is still valid for invocations.
*
* @return true if this target is still valid
- **/
+ */
public abstract boolean isValid();
/**
- * Obtain the low-level reason behind losing the connection for
+ * Obtains the low-level reason behind losing the connection for
* which this target is an endpoint. If the target is still valid
* or if the target became invalid because it was closed, this
* method will return null. In other cases this method may or may
@@ -66,11 +65,11 @@ public abstract class Target {
* based on implementation details across platforms.
*
* @return exception causing connection loss or null
- **/
+ */
public Exception getConnectionLostReason() { return null; }
/**
- * @return the security context associated with this target, or empty if no connection or is insecure.
+ * Returns the security context associated with this target, or empty if no connection or is insecure.
*/
public abstract Optional<SecurityContext> getSecurityContext();
@@ -79,7 +78,7 @@ public abstract class Target {
* connection.
*
* @return true if this is a client-side target
- **/
+ */
public abstract boolean isClient();
/**
@@ -87,7 +86,7 @@ public abstract class Target {
* connection.
*
* @return true if this is a server-side target
- **/
+ */
public abstract boolean isServer();
/**
@@ -95,7 +94,7 @@ public abstract class Target {
*
* @param req the request
* @param timeout timeout in seconds
- **/
+ */
public abstract void invokeSync(Request req, double timeout);
/**
@@ -105,7 +104,7 @@ public abstract class Target {
* @param req the request
* @param timeout timeout in seconds
* @param waiter callback handler
- **/
+ */
public abstract void invokeAsync(Request req, double timeout,
RequestWaiter waiter);
@@ -115,10 +114,10 @@ public abstract class Target {
* ignored. However, the return value gives a little hint by
* indicating whether the invocation has been attempted at all.
*
+ * @param req the request
* @return false if the invocation was not attempted due to the
* target being invalid
- * @param req the request
- **/
+ */
public abstract boolean invokeVoid(Request req);
/**
@@ -128,9 +127,9 @@ public abstract class Target {
* returned. Multiple adds of the same watcher has no additional
* effect.
*
- * @return true if the add operation was performed
* @param watcher the watcher to be added
- **/
+ * @return true if the add operation was performed
+ */
public abstract boolean addWatcher(TargetWatcher watcher);
/**
@@ -139,16 +138,17 @@ public abstract class Target {
* and false is returned. Multiple removes of the same watcher has
* no additional effect.
*
- * @return true if the remove operation was performed
* @param watcher the watcher to be removed
+ * @return true if the remove operation was performed
* @see #addWatcher
- **/
+ */
public abstract boolean removeWatcher(TargetWatcher watcher);
/**
* Close this target. Note that the close operation is
* asynchronous. If you need to wait for the target to become
* invalid, use the {@link Transport#sync Transport.sync} method.
- **/
+ */
public abstract void close();
+
}
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 0c8977f2c40..871d0188691 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -4,6 +4,7 @@ package com.yahoo.jrt;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@@ -15,7 +16,7 @@ import java.util.logging.Logger;
* multiplexed network IO, handles scheduled tasks and keeps track of
* some additional helper threads. A single Transport object can back
* multiple {@link Supervisor} objects.
- **/
+ */
public class Transport {
private static final Logger log = Logger.getLogger(Transport.class.getName());
@@ -30,11 +31,11 @@ public class Transport {
private final int eventsBeforeWakeup;
private final TransportMetrics metrics = TransportMetrics.getInstance();
- private final ArrayList<TransportThread> threads = new ArrayList<>();
+ private final List<TransportThread> threads = new ArrayList<>();
private final Random rnd = new Random();
/**
- * Create a new Transport object with the given fatal error
+ * Creates a new Transport object with the given fatal error
* handler and CryptoEngine. If a fatal error occurs when no fatal
* error handler is registered, the default action is to log the
* error and exit with exit code 1.
@@ -44,7 +45,7 @@ public class Transport {
* @param cryptoEngine crypto engine to use
* @param numThreads number of {@link TransportThread}s.
* @param eventsBeforeWakeup number write events in Q before waking thread up
- **/
+ */
public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay, int eventsBeforeWakeup) {
this.name = name;
this.fatalHandler = fatalHandler; // NB: this must be set first
@@ -82,7 +83,7 @@ public class Transport {
* Select a random transport thread
*
* @return a random transport thread
- **/
+ */
public TransportThread selectThread() {
return threads.get(rnd.nextInt(threads.size()));
}
@@ -93,24 +94,24 @@ public class Transport {
public String getName() { return name; }
/**
- * Use the underlying CryptoEngine to create a CryptoSocket for
+ * Uses the underlying CryptoEngine to create a CryptoSocket for
* the client side of a connection.
*
* @return CryptoSocket handling appropriate encryption
* @param channel low-level socket channel to be wrapped by the CryptoSocket
* @param spec who we are connecting to, for hostname validation
- **/
+ */
CryptoSocket createClientCryptoSocket(SocketChannel channel, Spec spec) {
return cryptoEngine.createClientCryptoSocket(channel, spec);
}
/**
- * Use the underlying CryptoEngine to create a CryptoSocket for
+ * Uses the underlying CryptoEngine to create a CryptoSocket for
* the server side of a connection.
*
* @return CryptoSocket handling appropriate encryption
* @param channel low-level socket channel to be wrapped by the CryptoSocket
- **/
+ */
CryptoSocket createServerCryptoSocket(SocketChannel channel) {
return cryptoEngine.createServerCryptoSocket(channel);
}
@@ -122,7 +123,7 @@ public class Transport {
*
* @param problem the throwable causing the failure
* @param context the object owning the crashing thread
- **/
+ */
void handleFailure(Throwable problem, Object context) {
if (fatalHandler != null) {
fatalHandler.handleFailure(problem, context);
@@ -135,19 +136,19 @@ public class Transport {
}
/**
- * Listen to the given address. This method is called by a {@link
+ * Listens to the given address. This method is called by a {@link
* Supervisor} object.
*
* @return active object accepting new connections
* @param owner the one calling this method
* @param spec the address to listen to
- **/
+ */
Acceptor listen(Supervisor owner, Spec spec) throws ListenFailedException {
return new Acceptor(this, owner, spec);
}
/**
- * Connect to the given address. This method is called by a {@link
+ * Connects to the given address. This method is called by a {@link
* Supervisor} object.
*
* @return the new connection
@@ -166,7 +167,7 @@ public class Transport {
}
/**
- * Request that {@link Connection#doHandshakeWork()} be called (in any thread)
+ * Requests that {@link Connection#doHandshakeWork()} be called (in any thread)
* followed by a call to {@link Connection#handleHandshakeWorkDone()} from the transport thread.
*
* @param conn the connection needing handshake work
@@ -176,7 +177,7 @@ public class Transport {
}
/**
- * Synchronize with all transport threads. This method will block
+ * Synchronizes with all transport threads. This method will block
* until all commands issued before this method was invoked has
* completed. If a transport thread has been shut down (or is in
* the progress of being shut down) this method will instead wait
@@ -185,7 +186,7 @@ public class Transport {
* method from a transport thread is not a good idea.
*
* @return this object, to enable chaining
- **/
+ */
public Transport sync() {
for (TransportThread thread: threads) {
thread.sync();
@@ -194,10 +195,10 @@ public class Transport {
}
/**
- * Initiate controlled shutdown of all transport threads.
+ * Initiates controlled shutdown of all transport threads.
*
* @return this object, to enable chaining with join
- **/
+ */
public Transport shutdown() {
connector.close();
for (TransportThread thread: threads) {
@@ -207,8 +208,8 @@ public class Transport {
}
/**
- * Wait for all transport threads to finish.
- **/
+ * Waits for all transport threads to finish.
+ */
public void join() {
for (TransportThread thread: threads) {
thread.join();
@@ -225,4 +226,5 @@ public class Transport {
public TransportMetrics metrics() {
return metrics;
}
+
}
diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java
index a3f1773c814..6063e72ecdd 100644
--- a/jrt/src/com/yahoo/jrt/TransportThread.java
+++ b/jrt/src/com/yahoo/jrt/TransportThread.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
-
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -9,11 +8,10 @@ import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
-
/**
* A single reactor/scheduler thread inside a potentially
* multi-threaded {@link Transport}.
- **/
+ */
public class TransportThread {
private static final int OPEN = 1;
@@ -192,17 +190,17 @@ public class TransportThread {
*
* @param problem the throwable causing the failure
* @param context the object owning the crashing thread
- **/
+ */
void handleFailure(Throwable problem, Object context) {
parent.handleFailure(problem, context);
}
/**
- * Add a connection to the set of connections handled by this
+ * Adds a connection to the set of connections handled by this
* TransportThread. Invoked by the {@link Connector} class.
*
* @param conn the connection to add
- **/
+ */
void addConnection(Connection conn) {
if (!postCommand(new AddConnectionCmd(conn))) {
perform(new CloseConnectionCmd(conn));
@@ -210,20 +208,20 @@ public class TransportThread {
}
/**
- * Request an asynchronous close of a connection.
+ * Requests an asynchronous close of a connection.
*
* @param conn the connection to close
- **/
+ */
void closeConnection(Connection conn) {
postCommand(new CloseConnectionCmd(conn));
}
/**
- * Request an asynchronous enabling of write events for a
+ * Requests an asynchronous enabling of write events for a
* connection.
*
* @param conn the connection to enable write events for
- **/
+ */
void enableWrite(Connection conn) {
if (Thread.currentThread() == thread) {
handleEnableWrite(conn);
@@ -237,24 +235,24 @@ public class TransportThread {
}
/**
- * Create a {@link Task} that can be scheduled for execution in
+ * Creates a {@link Task} that can be scheduled for execution in
* the transport thread.
*
* @return the newly created Task
* @param cmd what to run when the task is executed
- **/
+ */
public Task createTask(Runnable cmd) {
return new Task(scheduler, cmd);
}
/**
- * Perform the given command in such a way that it does not run
+ * Performs the given command in such a way that it does not run
* concurrently with the transport thread or other commands
* performed by invoking this method. This method will continue to
* work even after the transport thread has been shut down.
*
* @param cmd the command to perform
- **/
+ */
public void perform(Runnable cmd) {
if (Thread.currentThread() == thread) {
cmd.run();
@@ -269,16 +267,16 @@ public class TransportThread {
}
/**
- * Wake up this transport thread explicitly.
- **/
+ * Wakes up this transport thread explicitly.
+ */
public void wakeup() {
selector.wakeup();
}
/**
- * Wake up this transport thread explicitly, but only if the
+ * Wakes up this transport thread explicitly, but only if the
* calling thread is not the transport thread itself.
- **/
+ */
public void wakeup_if_not_self() {
if (Thread.currentThread() != thread) {
wakeup();
@@ -286,7 +284,7 @@ public class TransportThread {
}
/**
- * Synchronize with the transport thread. This method will block
+ * Synchronizes with the transport thread. This method will block
* until all commands issued before this method was invoked has
* completed. If the transport thread has been shut down (or is in
* the progress of being shut down) this method will instead wait
@@ -295,7 +293,7 @@ public class TransportThread {
* method from the transport thread is not a good idea.
*
* @return this object, to enable chaining
- **/
+ */
public TransportThread sync() {
SyncCmd cmd = new SyncCmd();
if (postCommand(cmd)) {
@@ -366,4 +364,5 @@ public class TransportThread {
} catch (InterruptedException e) {}
}
}
+
}