diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-01-15 18:51:38 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-01-15 19:15:38 +0100 |
commit | 01898a51e39a6c0b71bd571ae0968e967c0bcc67 (patch) | |
tree | 58b43501dd34f0fd9338828eb5d2fbe81225dec3 /jdisc_http_service/src/main/java/com/yahoo | |
parent | 623c5906f15ffb07048f7326809c7de4dc0c8d8f (diff) |
Rewrite JettyConnectionLogger to deduplicate Jetty connections
Use SocketChannelEndPoint to uniquely identity TCP/IP connections.
Rename 'AggregatedConnectionInfo' to 'ConnectionInfo'.
Implement SslHandshakeListener to read SSL session after handshake.
Diffstat (limited to 'jdisc_http_service/src/main/java/com/yahoo')
-rw-r--r-- | jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyConnectionLogger.java | 149 |
1 files changed, 106 insertions, 43 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyConnectionLogger.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyConnectionLogger.java index cb5709b7fc3..65eddb0a161 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyConnectionLogger.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyConnectionLogger.java @@ -7,8 +7,12 @@ import com.yahoo.io.HexDump; import com.yahoo.jdisc.http.ServerConfig; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.io.ssl.SslConnection; +import org.eclipse.jetty.io.ssl.SslHandshakeListener; import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpConnection; +import org.eclipse.jetty.server.ProxyConnectionFactory; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -36,13 +40,15 @@ import java.util.logging.Logger; * * @author bjorncs */ -class JettyConnectionLogger extends AbstractLifeCycle implements Connection.Listener, HttpChannel.Listener { +class JettyConnectionLogger extends AbstractLifeCycle implements Connection.Listener, HttpChannel.Listener, SslHandshakeListener { static final String CONNECTION_ID_REQUEST_ATTRIBUTE = "jdisc.request.connection.id"; private static final Logger log = Logger.getLogger(JettyConnectionLogger.class.getName()); - private final ConcurrentMap<Connection, AggregatedConnectionInfo> connectionInfo = new ConcurrentHashMap<>(); + private final ConcurrentMap<SocketChannelEndPoint, ConnectionInfo> connectionInfo = new ConcurrentHashMap<>(); + private final ConcurrentMap<SSLEngine, ConnectionInfo> sslToConnectionInfo = new ConcurrentHashMap<>(); + private final boolean enabled; private final ConnectionLog connectionLog; @@ -78,35 +84,41 @@ class JettyConnectionLogger extends AbstractLifeCycle implements Connection.List @Override public void onOpened(Connection connection) { handleListenerInvocation("Connection.Listener", "onOpened", "%h", List.of(connection), () -> { - AggregatedConnectionInfo info = new AggregatedConnectionInfo(UUID.randomUUID()); - synchronized (info.lock()) { - EndPoint endpoint = connection.getEndPoint(); - info.setCreatedAt(endpoint.getCreatedTimeStamp()) - .setLocalAddress(endpoint.getLocalAddress()) - .setPeerAddress(endpoint.getRemoteAddress()); - if (connection instanceof SslConnection) { - SslConnection sslConnection = (SslConnection) connection; - SSLEngine sslEngine = sslConnection.getSSLEngine(); - SSLSession sslSession = sslEngine.getSession(); - info.setSslSessionDetails(sslSession); - } + SocketChannelEndPoint endpoint = findUnderlyingSocketEndpoint(connection.getEndPoint()); + ConnectionInfo info = connectionInfo.get(endpoint); + if (info == null) { + info = ConnectionInfo.from(endpoint); + connectionInfo.put(endpoint, info); + } + // TODO Store details on proxy-protocol + if (connection instanceof SslConnection) { + SSLEngine sslEngine = ((SslConnection) connection).getSSLEngine(); + sslToConnectionInfo.put(sslEngine, info); } - connectionInfo.put(connection, info); }); } @Override public void onClosed(Connection connection) { handleListenerInvocation("Connection.Listener", "onClosed", "%h", List.of(connection), () -> { - // TODO Decide on handling of connection upgrade where old connection object is closed and replaced by a new (e.g for proxy-protocol auto detection) - AggregatedConnectionInfo builder = Objects.requireNonNull(connectionInfo.remove(connection)); - ConnectionLogEntry logEntry; - synchronized (builder.lock()) { - logEntry = builder.setBytesReceived(connection.getBytesIn()) - .setBytesSent(connection.getBytesOut()) - .toLogEntry(); + SocketChannelEndPoint endpoint = findUnderlyingSocketEndpoint(connection.getEndPoint()); + ConnectionInfo info = connectionInfo.get(endpoint); + if (info == null) return; // Closed connection already handled + if (connection instanceof HttpConnection) { + long bytesIn = connection.getBytesIn(); + long bytesOut = connection.getBytesOut(); + synchronized (info.lock()) { + info.setBytesReceived(bytesIn).setBytesSent(bytesOut); + } + } + if (!endpoint.isOpen()) { + ConnectionLogEntry logEntry; + synchronized (info.lock()) { + logEntry = info.toLogEntry(); + } + connectionLog.log(logEntry); + connectionInfo.remove(endpoint); } - connectionLog.log(logEntry); }); } // @@ -119,8 +131,8 @@ class JettyConnectionLogger extends AbstractLifeCycle implements Connection.List @Override public void onRequestBegin(Request request) { handleListenerInvocation("HttpChannel.Listener", "onRequestBegin", "%h", List.of(request), () -> { - Connection connection = request.getHttpChannel().getConnection(); - AggregatedConnectionInfo info = Objects.requireNonNull(connectionInfo.get(connection)); + SocketChannelEndPoint endpoint = findUnderlyingSocketEndpoint(request.getHttpChannel().getEndPoint()); + ConnectionInfo info = Objects.requireNonNull(connectionInfo.get(endpoint)); UUID uuid; synchronized (info.lock()) { info.incrementRequests(); @@ -133,8 +145,8 @@ class JettyConnectionLogger extends AbstractLifeCycle implements Connection.List @Override public void onResponseBegin(Request request) { handleListenerInvocation("HttpChannel.Listener", "onResponseBegin", "%h", List.of(request), () -> { - Connection connection = request.getHttpChannel().getConnection(); - AggregatedConnectionInfo info = Objects.requireNonNull(connectionInfo.get(connection)); + SocketChannelEndPoint endpoint = findUnderlyingSocketEndpoint(request.getHttpChannel().getEndPoint()); + ConnectionInfo info = Objects.requireNonNull(connectionInfo.get(endpoint)); synchronized (info.lock()) { info.incrementResponses(); } @@ -144,6 +156,32 @@ class JettyConnectionLogger extends AbstractLifeCycle implements Connection.List // HttpChannel.Listener methods end // + // + // SslHandshakeListener methods start + // + @Override + public void handshakeSucceeded(Event event) { + handleListenerInvocation("SslHandshakeListener", "handshakeSucceeded", "", List.of(), () -> { + SSLEngine sslEngine = event.getSSLEngine(); + ConnectionInfo info = sslToConnectionInfo.remove(sslEngine); + synchronized (info.lock()) { + info.setSslSessionDetails(sslEngine.getSession()); + } + }); + } + + @Override + public void handshakeFailed(Event event, Throwable failure) { + handleListenerInvocation("SslHandshakeListener", "handshakeFailed", "", List.of(), () -> { + SSLEngine sslEngine = event.getSSLEngine(); + sslToConnectionInfo.remove(sslEngine); + // TODO Store details on failed ssl handshake + }); + } + // + // SslHandshakeListener methods end + // + private void handleListenerInvocation( String listenerType, String methodName, String methodArgumentsFormat, List<Object> methodArguments, ListenerHandler handler) { if (!enabled) return; @@ -155,16 +193,35 @@ class JettyConnectionLogger extends AbstractLifeCycle implements Connection.List } } + /** + * Protocol layers are connected through each {@link Connection}'s {@link EndPoint} reference. + * This methods iterates through the endpoints recursively to find the underlying socket endpoint. + */ + private static SocketChannelEndPoint findUnderlyingSocketEndpoint(EndPoint endpoint) { + if (endpoint instanceof SocketChannelEndPoint) { + return (SocketChannelEndPoint) endpoint; + } else if (endpoint instanceof SslConnection.DecryptedEndPoint) { + var decryptedEndpoint = (SslConnection.DecryptedEndPoint) endpoint; + return findUnderlyingSocketEndpoint(decryptedEndpoint.getSslConnection().getEndPoint()); + } else if (endpoint instanceof ProxyConnectionFactory.ProxyEndPoint) { + var proxyEndpoint = (ProxyConnectionFactory.ProxyEndPoint) endpoint; + return findUnderlyingSocketEndpoint(proxyEndpoint.unwrap()); + } else { + throw new IllegalArgumentException("Unknown connection endpoint type: " + endpoint.getClass().getName()); + } + } + @FunctionalInterface private interface ListenerHandler { void run() throws Exception; } - private static class AggregatedConnectionInfo { + // TODO Include connection duration or timestamp closed + private static class ConnectionInfo { private final Object monitor = new Object(); private final UUID uuid; + private final long createdAt; + private final InetSocketAddress localAddress; + private final InetSocketAddress peerAddress; - private long createdAt; - private InetSocketAddress localAddress; - private InetSocketAddress peerAddress; private long bytesReceived; private long bytesSent; private long requests; @@ -177,29 +234,35 @@ class JettyConnectionLogger extends AbstractLifeCycle implements Connection.List private Date sslPeerNotAfter; private List<SNIServerName> sslSniServerNames; - AggregatedConnectionInfo(UUID uuid) { + private ConnectionInfo(UUID uuid, long createdAt, InetSocketAddress localAddress, InetSocketAddress peerAddress) { this.uuid = uuid; + this.createdAt = createdAt; + this.localAddress = localAddress; + this.peerAddress = peerAddress; + } + + static ConnectionInfo from(SocketChannelEndPoint endpoint) { + return new ConnectionInfo( + UUID.randomUUID(), + endpoint.getCreatedTimeStamp(), + endpoint.getLocalAddress(), + endpoint.getRemoteAddress()); } Object lock() { return monitor; } UUID uuid() { return uuid; } - AggregatedConnectionInfo setCreatedAt(long createdAt) { this.createdAt = createdAt; return this; } - - AggregatedConnectionInfo setLocalAddress(InetSocketAddress localAddress) { this.localAddress = localAddress; return this; } - - AggregatedConnectionInfo setPeerAddress(InetSocketAddress peerAddress) { this.peerAddress = peerAddress; return this; } - - AggregatedConnectionInfo setBytesReceived(long bytesReceived) { this.bytesReceived = bytesReceived; return this; } + // TODO Consider renaming bytes methods to reflect that they are bytes written by HTTP layer, not underlying socket + ConnectionInfo setBytesReceived(long bytesReceived) { this.bytesReceived = bytesReceived; return this; } - AggregatedConnectionInfo setBytesSent(long bytesSent) { this.bytesSent = bytesSent; return this; } + ConnectionInfo setBytesSent(long bytesSent) { this.bytesSent = bytesSent; return this; } - AggregatedConnectionInfo incrementRequests() { ++this.requests; return this; } + ConnectionInfo incrementRequests() { ++this.requests; return this; } - AggregatedConnectionInfo incrementResponses() { ++this.responses; return this; } + ConnectionInfo incrementResponses() { ++this.responses; return this; } - AggregatedConnectionInfo setSslSessionDetails(SSLSession session) { + ConnectionInfo setSslSessionDetails(SSLSession session) { this.sslCipherSuite = session.getCipherSuite(); this.sslProtocol = session.getProtocol(); this.sslSessionId = session.getId(); |