From b4f06ca78224a2df262b93f7a0440443f329f337 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Thu, 24 Feb 2022 14:26:42 +0100 Subject: Revert "GC unused SessionHandler" --- jrt/src/com/yahoo/jrt/Connection.java | 30 +++++++++--- jrt/src/com/yahoo/jrt/SessionHandler.java | 66 ++++++++++++++++++++++++++ jrt/src/com/yahoo/jrt/Supervisor.java | 79 +++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 7 deletions(-) create mode 100644 jrt/src/com/yahoo/jrt/SessionHandler.java (limited to 'jrt/src') diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index 00aceb7e352..c4f906f5e35 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -40,6 +40,7 @@ class Connection extends Target { private final boolean tcpNoDelay; private final Map replyMap = new HashMap<>(); private final Map watchers = new IdentityHashMap<>(); + private int activeReqs = 0; private int writeWork = 0; private boolean pendingHandshakeWork = false; private final TransportThread parent; @@ -59,9 +60,11 @@ class Connection extends Target { } boolean live = (state == CONNECTED); boolean down = (state == CLOSED); + boolean fini; boolean pendingWrite; synchronized (this) { this.state = state; + fini = down && (activeReqs == 0); pendingWrite = (writeWork > 0); } if (live) { @@ -71,6 +74,7 @@ class Connection extends Target { } else { disableWrite(); } + owner.sessionLive(this); } if (down) { for (ReplyHandler rh : replyMap.values()) { @@ -79,6 +83,10 @@ class Connection extends Target { for (TargetWatcher watcher : watchers.values()) { watcher.notifyTargetInvalid(this); } + owner.sessionDown(this); + } + if (fini) { + owner.sessionFini(this); } } @@ -94,6 +102,7 @@ class Connection extends Target { maxOutputSize = owner.getMaxOutputBufferSize(); dropEmptyBuffers = owner.getDropEmptyBuffers(); server = true; + owner.sessionInit(this); } public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context, boolean tcpNoDelay) { @@ -106,6 +115,7 @@ class Connection extends Target { maxOutputSize = owner.getMaxOutputBufferSize(); dropEmptyBuffers = owner.getDropEmptyBuffers(); server = false; + owner.sessionInit(this); } public TransportThread transportThread() { @@ -115,7 +125,8 @@ class Connection extends Target { public int allocateKey() { long v = requestId.getAndIncrement(); v = v*2 + (server ? 1 : 0); - return (int)(v & 0x7fffffff); + int i = (int)(v & 0x7fffffff); + return i; } public synchronized boolean cancelReply(ReplyHandler handler) { @@ -267,7 +278,7 @@ class Connection extends Target { try { packet = info.decodePacket(rb); } catch (RuntimeException e) { - log.log(Level.WARNING, "got garbage; closing connection: " + this); + log.log(Level.WARNING, "got garbage; closing connection: " + toString()); throw new IOException("jrt: decode error", e); } ReplyHandler handler; @@ -385,10 +396,6 @@ class Connection extends Target { return (state == CLOSED); } - public synchronized boolean isConnected() { - return (state == CONNECTED); - } - public boolean hasSocket() { return ((socket != null) && (socket.channel() != null)); } @@ -410,16 +417,25 @@ class Connection extends Target { } public TieBreaker startRequest() { + synchronized (this) { + activeReqs++; + } return new TieBreaker(); } public boolean completeRequest(TieBreaker done) { + boolean signalFini = false; synchronized (this) { if (!done.first()) { return false; } + if (--activeReqs == 0 && state == CLOSED) { + signalFini = true; + } + } + if (signalFini) { + owner.sessionFini(this); } - return true; } diff --git a/jrt/src/com/yahoo/jrt/SessionHandler.java b/jrt/src/com/yahoo/jrt/SessionHandler.java new file mode 100644 index 00000000000..82355d73a93 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/SessionHandler.java @@ -0,0 +1,66 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +/** + * Interface used to handle the lifetime of a {@link Target}. The word + * session is used to denote all the RPC activity across a single + * {@link Target} during its lifetime. This interface gives the + * application information about when different {@link Target} objects + * enter different stages in their lifetime. Combined with the ability + * to bind application specific data to a {@link Target} with the + * {@link Target#setContext} method, this enables method invocations + * in the same session to share state information. Usage of this + * interface is optional. It is typically useful for server + * applications needing state to be shared between RPC method + * invocation on a session. Each {@link Supervisor} can only have a + * single session handler. Use the {@link Supervisor#setSessionHandler + * Supervisor.setSessionHandler} method to set the session + * handler. The different callbacks may be called from several + * different threads, but for a single target there will be no + * overlapping of session callbacks, and the order will always be the + * same; init, live (not always called), down, fini. + **/ +public interface SessionHandler { + + /** + * Invoked when a new {@link Target} is created. This is a nice + * place to initialize and attach application context to the + * {@link Target}. + * + * @param target the target + **/ + public void handleSessionInit(Target target); + + /** + * Invoked when a connection is established with the peer. Note + * that if a connection could not be established with the peer, + * this method is never invoked. + * + * @param target the target + **/ + public void handleSessionLive(Target target); + + /** + * Invoked when the target becomes invalid. This is typically + * caused by the network connection with the peer going down. Note + * that this method is invoked also when a connection with the + * peer could not be established at all. + * + * @param target the target + **/ + public void handleSessionDown(Target target); + + /** + * Invoked when the target is invalid and no more RPC invocations + * are active on our side of this target (invoked from the other + * side; we being the server). If you need to perform cleanup + * related to the application data associated with the target, you + * should wait until this method is invoked, to avoid cleaning up + * the {@link Target} application context under the feet of active + * invocations. + * + * @param target the target + **/ + public void handleSessionFini(Target target); +} diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java index 48dd2d8bd3d..e8dc6219768 100644 --- a/jrt/src/com/yahoo/jrt/Supervisor.java +++ b/jrt/src/com/yahoo/jrt/Supervisor.java @@ -15,7 +15,10 @@ import java.util.concurrent.atomic.AtomicReference; **/ public class Supervisor { + private static final int SMALL_INPUT_BUFFER_SIZE = 20 * 1024; // Large enough too hold the typical application buffersize of 17k. + private static final int SMALL_OUTPUT_BUFFER_SIZE = 8 *1024; // Suitable small buffer usage with many connections and little traffic. private final Transport transport; + private SessionHandler sessionHandler = null; private final Object methodMapLock = new Object(); private final AtomicReference> methodMap = new AtomicReference<>(new HashMap<>()); private int maxInputBufferSize = 64*1024; @@ -94,6 +97,15 @@ public class Supervisor { return transport; } + /** + * Set the session handler for this Supervisor + * + * @param handler the session handler + **/ + public void setSessionHandler(SessionHandler handler) { + sessionHandler = handler; + } + /** * Add a method to the set of methods held by this Supervisor * @@ -160,6 +172,73 @@ public class Supervisor { return transport.listen(this, spec); } + /** + * Convenience method for connecting to a peer, invoking a method + * and disconnecting. + * + * @param spec the address to connect to + * @param req the invocation request + * @param timeout request timeout in seconds + **/ + public void invokeBatch(Spec spec, Request req, double timeout) { + Target target = connect(spec); + try { + target.invokeSync(req, timeout); + } finally { + target.close(); + } + } + + /** + * This method is invoked when a new target is created + * + * @param target the target + **/ + void sessionInit(Target target) { + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionInit(target); + } + } + + /** + * This method is invoked when a target establishes a connection + * with its peer + * + * @param target the target + **/ + void sessionLive(Target target) { + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionLive(target); + } + } + + /** + * This method is invoked when a target becomes invalid + * + * @param target the target + **/ + void sessionDown(Target target) { + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionDown(target); + } + } + + /** + * This method is invoked when a target is invalid and no more + * invocations are active + * + * @param target the target + **/ + void sessionFini(Target target) { + SessionHandler handler = sessionHandler; + if (handler != null) { + handler.handleSessionFini(target); + } + } + /** * This method is invoked each time we write a packet. This method * is empty and only used for testing through sub-classing. -- cgit v1.2.3