From cb741674206e655de6072ebae2045e6a05a83532 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 12 Aug 2022 23:52:39 +0200 Subject: Initial thread local byte buffer pool. --- .../jdisc/http/server/jetty/ConnectorFactory.java | 7 +- .../http/server/jetty/JDiscServerConnector.java | 8 +- .../jdisc/http/server/jetty/JettyHttpServer.java | 17 ++-- .../server/jetty/ThreadLocalByteBufferPool.java | 101 +++++++++++++++++++++ .../http/server/jetty/ConnectorFactoryTest.java | 2 +- 5 files changed, 118 insertions(+), 17 deletions(-) create mode 100644 container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ThreadLocalByteBufferPool.java diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java index a9385060010..dcc3cdaa67b 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java @@ -13,6 +13,7 @@ import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.DetectorConnectionFactory; @@ -79,10 +80,10 @@ public class ConnectorFactory { return connectorConfig; } - public ServerConnector createConnector(final Metric metric, final Server server, JettyConnectionLogger connectionLogger, - ConnectionMetricAggregator connectionMetricAggregator) { + public ServerConnector createConnector(final Metric metric, final Server server, ByteBufferPool bufferPool, + JettyConnectionLogger connectionLogger, ConnectionMetricAggregator connectionMetricAggregator) { ServerConnector connector = new JDiscServerConnector( - connectorConfig, metric, server, connectionLogger, connectionMetricAggregator, + connectorConfig, metric, server, bufferPool, connectionLogger, connectionMetricAggregator, createConnectionFactories(metric).toArray(ConnectionFactory[]::new)); connector.setPort(connectorConfig.listenPort()); connector.setName(connectorConfig.name()); diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java index 79cdb8f67cf..cc60e741d75 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java @@ -3,6 +3,7 @@ package com.yahoo.jdisc.http.server.jetty; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.http.ConnectorConfig; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ConnectionStatistics; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Server; @@ -31,9 +32,10 @@ class JDiscServerConnector extends ServerConnector { private final String connectorName; private final int listenPort; - JDiscServerConnector(ConnectorConfig config, Metric metric, Server server, JettyConnectionLogger connectionLogger, - ConnectionMetricAggregator connectionMetricAggregator, ConnectionFactory... factories) { - super(server, factories); + JDiscServerConnector(ConnectorConfig config, Metric metric, Server server, ByteBufferPool bufferPool, + JettyConnectionLogger connectionLogger, ConnectionMetricAggregator connectionMetricAggregator, + ConnectionFactory... factories) { + super(server, null, null, bufferPool, -1, -1, factories); this.config = config; this.tcpKeepAlive = config.tcpKeepAliveEnabled(); this.tcpNoDelay = config.tcpNoDelay(); diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java index 96c5bac335b..856bce31424 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java @@ -11,6 +11,8 @@ import com.yahoo.jdisc.http.ServerConfig; import com.yahoo.jdisc.service.AbstractServerProvider; import com.yahoo.jdisc.service.CurrentContainer; import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.jmx.ConnectorServer; import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.server.Connector; @@ -76,27 +78,22 @@ public class JettyHttpServer extends AbstractServerProvider { configureJettyThreadpool(server, serverConfig); JettyConnectionLogger connectionLogger = new JettyConnectionLogger(serverConfig.connectionLog(), connectionLog); ConnectionMetricAggregator connectionMetricAggregator = new ConnectionMetricAggregator(serverConfig, metric); + ByteBufferPool bufferPool = new ThreadLocalByteBufferPool(new ArrayByteBufferPool()); for (ConnectorFactory connectorFactory : connectorFactories.allComponents()) { ConnectorConfig connectorConfig = connectorFactory.getConnectorConfig(); - server.addConnector(connectorFactory.createConnector(metric, server, connectionLogger, connectionMetricAggregator)); + server.addConnector(connectorFactory.createConnector(metric, server, bufferPool, connectionLogger, connectionMetricAggregator)); listenedPorts.add(connectorConfig.listenPort()); } - JDiscContext jDiscContext = new JDiscContext(filterBindings, - container, - janitor, - metric, - serverConfig); + JDiscContext jDiscContext = new JDiscContext(filterBindings, container, janitor, metric, serverConfig); ServletHolder jdiscServlet = new ServletHolder(new JDiscHttpServlet(jDiscContext)); List connectors = Arrays.stream(server.getConnectors()) .map(JDiscServerConnector.class::cast) .collect(toList()); - server.setHandler(getHandlerCollection(serverConfig, - connectors, - jdiscServlet)); + server.setHandler(getHandlerCollection(serverConfig, connectors, jdiscServlet)); this.metricsReporter = new ServerMetricReporter(metric, server); } @@ -190,7 +187,7 @@ public class JettyHttpServer extends AbstractServerProvider { logEffectiveSslConfiguration(); } catch (final Exception e) { if (e instanceof IOException && e.getCause() instanceof BindException) { - throw new RuntimeException("Failed to start server due to BindException. ListenPorts = " + listenedPorts.toString(), e.getCause()); + throw new RuntimeException("Failed to start server due to BindException. ListenPorts = " + listenedPorts, e.getCause()); } throw new RuntimeException("Failed to start server.", e); } diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ThreadLocalByteBufferPool.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ThreadLocalByteBufferPool.java new file mode 100644 index 00000000000..f1a7ad18ac9 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ThreadLocalByteBufferPool.java @@ -0,0 +1,101 @@ +package com.yahoo.jdisc.http.server.jetty; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; + +import java.nio.ByteBuffer; + +class ThreadLocalByteBufferPool implements ByteBufferPool { + static int bufferId(int size) { + return 32 - Integer.numberOfLeadingZeros(size); + } + private static class BufferList { + final ByteBuffer [] buffers = new ByteBuffer[16]; + int latest = -1; + ByteBuffer acquire() { + if (latest == -1) return null; + ByteBuffer buf = buffers[latest]; + buffers[latest] = null; + latest--; + return buf; + } + ByteBuffer release(ByteBuffer buf) { + if (latest >= (buffers.length - 1)) return buf; + buffers[++latest] = buf; + BufferUtil.clearToFill(buf); + return null; + } + } + private static class Cache { + private int cachedBytes = 0; + private final BufferList [] direct; + private final BufferList [] heap; + Cache(int numBufferClasses) { + direct = new BufferList[numBufferClasses]; + heap = new BufferList[numBufferClasses]; + for (int i=0; i < numBufferClasses; i++) { + direct[i] = new BufferList(); + heap[i] = new BufferList(); + } + } + ByteBuffer acquire(int bufferId, boolean direct) { + ByteBuffer buf = direct ? this.direct[bufferId].acquire() : this.heap[bufferId].acquire(); + if (buf != null) + cachedBytes -= buf.capacity(); + return buf; + } + ByteBuffer release(int bufferId, ByteBuffer buf) { + ByteBuffer overflow = buf.isDirect() ? direct[bufferId].release(buf) : heap[bufferId].release(buf); + if (overflow == null) + cachedBytes += buf.capacity(); + return overflow; + } + } + private static class ThreadLocalCache extends ThreadLocal { + private final int numBufferClasses; + ThreadLocalCache(int numBufferClasses) { + this.numBufferClasses = numBufferClasses; + } + @Override + protected Cache initialValue() { + return new Cache(numBufferClasses); + } + } + final private ByteBufferPool globalPool; + final private int maxCachedPerThread; + final private int lowestBufferId; + final private int largestBufferSize; + final private ThreadLocalCache cache; + + ThreadLocalByteBufferPool(ByteBufferPool globalPool) { + this(globalPool, 0x100000, 1024, 0x40000); + } + ThreadLocalByteBufferPool(ByteBufferPool globalPool, int maxCachedPerThread, int smallestBufferSize, int largestBufferSize) { + this.globalPool = globalPool; + this.maxCachedPerThread = maxCachedPerThread; + this.lowestBufferId = bufferId(smallestBufferSize); + this.largestBufferSize = largestBufferSize; + cache = new ThreadLocalCache(bufferId(largestBufferSize)); + } + @Override + public ByteBuffer acquire(int size, boolean direct) { + if (size <= largestBufferSize) { + ByteBuffer buf = cache.get().acquire(Integer.min(lowestBufferId, bufferId(size)), direct); + if (buf != null) return buf; + } + return globalPool.acquire(size, direct); + } + + @Override + public void release(ByteBuffer buffer) { + if (buffer.capacity() <= largestBufferSize) { + Cache local = cache.get(); + if (local.cachedBytes < maxCachedPerThread) { + buffer = local.release(Integer.max(lowestBufferId, bufferId(buffer.capacity())), buffer); + } + } + if (buffer != null) { + globalPool.release(buffer); + } + } +} diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java index 1ff2783cc53..43ddd551689 100644 --- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java +++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java @@ -82,7 +82,7 @@ public class ConnectorFactoryTest { new VoidConnectionLog()); DummyMetric metric = new DummyMetric(); var connectionMetricAggregator = new ConnectionMetricAggregator(new ServerConfig(new ServerConfig.Builder()), metric); - return (JDiscServerConnector)factory.createConnector(metric, server, connectionLogger, connectionMetricAggregator); + return (JDiscServerConnector)factory.createConnector(metric, server, null, connectionLogger, connectionMetricAggregator); } private static class HelloWorldHandler extends AbstractHandler { -- cgit v1.2.3