summaryrefslogtreecommitdiffstats
path: root/jrt/src
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2022-02-24 14:26:42 +0100
committerGitHub <noreply@github.com>2022-02-24 14:26:42 +0100
commitb4f06ca78224a2df262b93f7a0440443f329f337 (patch)
tree2d226e59cf068eaf2230623ec462ede3de489135 /jrt/src
parent4ed0d2b6f00c82bc3ba8a3290576a7bedfdd1895 (diff)
Revert "GC unused SessionHandler"
Diffstat (limited to 'jrt/src')
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java30
-rw-r--r--jrt/src/com/yahoo/jrt/SessionHandler.java66
-rw-r--r--jrt/src/com/yahoo/jrt/Supervisor.java79
3 files changed, 168 insertions, 7 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index 00aceb7e352..c4f906f5e35 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -40,6 +40,7 @@ class Connection extends Target {
private final boolean tcpNoDelay;
private final Map<Integer, ReplyHandler> replyMap = new HashMap<>();
private final Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>();
+ private int activeReqs = 0;
private int writeWork = 0;
private boolean pendingHandshakeWork = false;
private final TransportThread parent;
@@ -59,9 +60,11 @@ class Connection extends Target {
}
boolean live = (state == CONNECTED);
boolean down = (state == CLOSED);
+ boolean fini;
boolean pendingWrite;
synchronized (this) {
this.state = state;
+ fini = down && (activeReqs == 0);
pendingWrite = (writeWork > 0);
}
if (live) {
@@ -71,6 +74,7 @@ class Connection extends Target {
} else {
disableWrite();
}
+ owner.sessionLive(this);
}
if (down) {
for (ReplyHandler rh : replyMap.values()) {
@@ -79,6 +83,10 @@ class Connection extends Target {
for (TargetWatcher watcher : watchers.values()) {
watcher.notifyTargetInvalid(this);
}
+ owner.sessionDown(this);
+ }
+ if (fini) {
+ owner.sessionFini(this);
}
}
@@ -94,6 +102,7 @@ class Connection extends Target {
maxOutputSize = owner.getMaxOutputBufferSize();
dropEmptyBuffers = owner.getDropEmptyBuffers();
server = true;
+ owner.sessionInit(this);
}
public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context, boolean tcpNoDelay) {
@@ -106,6 +115,7 @@ class Connection extends Target {
maxOutputSize = owner.getMaxOutputBufferSize();
dropEmptyBuffers = owner.getDropEmptyBuffers();
server = false;
+ owner.sessionInit(this);
}
public TransportThread transportThread() {
@@ -115,7 +125,8 @@ class Connection extends Target {
public int allocateKey() {
long v = requestId.getAndIncrement();
v = v*2 + (server ? 1 : 0);
- return (int)(v & 0x7fffffff);
+ int i = (int)(v & 0x7fffffff);
+ return i;
}
public synchronized boolean cancelReply(ReplyHandler handler) {
@@ -267,7 +278,7 @@ class Connection extends Target {
try {
packet = info.decodePacket(rb);
} catch (RuntimeException e) {
- log.log(Level.WARNING, "got garbage; closing connection: " + this);
+ log.log(Level.WARNING, "got garbage; closing connection: " + toString());
throw new IOException("jrt: decode error", e);
}
ReplyHandler handler;
@@ -385,10 +396,6 @@ class Connection extends Target {
return (state == CLOSED);
}
- public synchronized boolean isConnected() {
- return (state == CONNECTED);
- }
-
public boolean hasSocket() {
return ((socket != null) && (socket.channel() != null));
}
@@ -410,16 +417,25 @@ class Connection extends Target {
}
public TieBreaker startRequest() {
+ synchronized (this) {
+ activeReqs++;
+ }
return new TieBreaker();
}
public boolean completeRequest(TieBreaker done) {
+ boolean signalFini = false;
synchronized (this) {
if (!done.first()) {
return false;
}
+ if (--activeReqs == 0 && state == CLOSED) {
+ signalFini = true;
+ }
+ }
+ if (signalFini) {
+ owner.sessionFini(this);
}
-
return true;
}
diff --git a/jrt/src/com/yahoo/jrt/SessionHandler.java b/jrt/src/com/yahoo/jrt/SessionHandler.java
new file mode 100644
index 00000000000..82355d73a93
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/SessionHandler.java
@@ -0,0 +1,66 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+/**
+ * Interface used to handle the lifetime of a {@link Target}. The word
+ * session is used to denote all the RPC activity across a single
+ * {@link Target} during its lifetime. This interface gives the
+ * application information about when different {@link Target} objects
+ * enter different stages in their lifetime. Combined with the ability
+ * to bind application specific data to a {@link Target} with the
+ * {@link Target#setContext} method, this enables method invocations
+ * in the same session to share state information. Usage of this
+ * interface is optional. It is typically useful for server
+ * applications needing state to be shared between RPC method
+ * invocation on a session. Each {@link Supervisor} can only have a
+ * single session handler. Use the {@link Supervisor#setSessionHandler
+ * Supervisor.setSessionHandler} method to set the session
+ * handler. The different callbacks may be called from several
+ * different threads, but for a single target there will be no
+ * overlapping of session callbacks, and the order will always be the
+ * same; init, live (not always called), down, fini.
+ **/
+public interface SessionHandler {
+
+ /**
+ * Invoked when a new {@link Target} is created. This is a nice
+ * place to initialize and attach application context to the
+ * {@link Target}.
+ *
+ * @param target the target
+ **/
+ public void handleSessionInit(Target target);
+
+ /**
+ * Invoked when a connection is established with the peer. Note
+ * that if a connection could not be established with the peer,
+ * this method is never invoked.
+ *
+ * @param target the target
+ **/
+ public void handleSessionLive(Target target);
+
+ /**
+ * Invoked when the target becomes invalid. This is typically
+ * caused by the network connection with the peer going down. Note
+ * that this method is invoked also when a connection with the
+ * peer could not be established at all.
+ *
+ * @param target the target
+ **/
+ public void handleSessionDown(Target target);
+
+ /**
+ * Invoked when the target is invalid and no more RPC invocations
+ * are active on our side of this target (invoked from the other
+ * side; we being the server). If you need to perform cleanup
+ * related to the application data associated with the target, you
+ * should wait until this method is invoked, to avoid cleaning up
+ * the {@link Target} application context under the feet of active
+ * invocations.
+ *
+ * @param target the target
+ **/
+ public void handleSessionFini(Target target);
+}
diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java
index 48dd2d8bd3d..e8dc6219768 100644
--- a/jrt/src/com/yahoo/jrt/Supervisor.java
+++ b/jrt/src/com/yahoo/jrt/Supervisor.java
@@ -15,7 +15,10 @@ import java.util.concurrent.atomic.AtomicReference;
**/
public class Supervisor {
+ private static final int SMALL_INPUT_BUFFER_SIZE = 20 * 1024; // Large enough too hold the typical application buffersize of 17k.
+ private static final int SMALL_OUTPUT_BUFFER_SIZE = 8 *1024; // Suitable small buffer usage with many connections and little traffic.
private final Transport transport;
+ private SessionHandler sessionHandler = null;
private final Object methodMapLock = new Object();
private final AtomicReference<HashMap<String, Method>> methodMap = new AtomicReference<>(new HashMap<>());
private int maxInputBufferSize = 64*1024;
@@ -95,6 +98,15 @@ public class Supervisor {
}
/**
+ * Set the session handler for this Supervisor
+ *
+ * @param handler the session handler
+ **/
+ public void setSessionHandler(SessionHandler handler) {
+ sessionHandler = handler;
+ }
+
+ /**
* Add a method to the set of methods held by this Supervisor
*
* @param method the method to add
@@ -161,6 +173,73 @@ public class Supervisor {
}
/**
+ * Convenience method for connecting to a peer, invoking a method
+ * and disconnecting.
+ *
+ * @param spec the address to connect to
+ * @param req the invocation request
+ * @param timeout request timeout in seconds
+ **/
+ public void invokeBatch(Spec spec, Request req, double timeout) {
+ Target target = connect(spec);
+ try {
+ target.invokeSync(req, timeout);
+ } finally {
+ target.close();
+ }
+ }
+
+ /**
+ * This method is invoked when a new target is created
+ *
+ * @param target the target
+ **/
+ void sessionInit(Target target) {
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionInit(target);
+ }
+ }
+
+ /**
+ * This method is invoked when a target establishes a connection
+ * with its peer
+ *
+ * @param target the target
+ **/
+ void sessionLive(Target target) {
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionLive(target);
+ }
+ }
+
+ /**
+ * This method is invoked when a target becomes invalid
+ *
+ * @param target the target
+ **/
+ void sessionDown(Target target) {
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionDown(target);
+ }
+ }
+
+ /**
+ * This method is invoked when a target is invalid and no more
+ * invocations are active
+ *
+ * @param target the target
+ **/
+ void sessionFini(Target target) {
+ SessionHandler handler = sessionHandler;
+ if (handler != null) {
+ handler.handleSessionFini(target);
+ }
+ }
+
+ /**
* This method is invoked each time we write a packet. This method
* is empty and only used for testing through sub-classing.
*