diff options
author | Bjørn Christian Seime <bjorncs@oath.com> | 2018-12-07 12:01:48 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@oath.com> | 2019-01-07 16:26:32 +0100 |
commit | 6dd199e9b4a982a2a748b41a19e795c74b6e8f92 (patch) | |
tree | 1c30491c33b3d127e7677ebfc3ae6be960d3f023 /jdisc_http_service | |
parent | d8ec344baeeda88363af3736f495742f4e0b0846 (diff) |
Add support for connection throttling in JDisc
Diffstat (limited to 'jdisc_http_service')
5 files changed, 388 insertions, 0 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottler.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottler.java new file mode 100644 index 00000000000..b9001d187a9 --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottler.java @@ -0,0 +1,274 @@ +// Copyright 2019 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jdisc.http.server.jetty; + +import com.yahoo.jdisc.http.ConnectorConfig; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.server.AbstractConnector; +import org.eclipse.jetty.server.ConnectionLimit; +import org.eclipse.jetty.server.LowResourceMonitor; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.statistic.RateStatistic; +import org.eclipse.jetty.util.thread.Scheduler; + +import java.nio.channels.SelectableChannel; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import static java.util.stream.Collectors.toList; + +/** + * Monitor various resource constraints and throttles new connections once a threshold is exceeded. + * Implementation inspired by Jetty's {@link LowResourceMonitor}, {@link AcceptRateLimit} and {@link ConnectionLimit}. + * + * @author bjorncs + */ +@ManagedObject("Monitor various resource constraints and throttles new connections once a threshold is exceeded") +class ConnectionThrottler extends ContainerLifeCycle implements SelectorManager.AcceptListener { + + private static final Logger log = Logger.getLogger(ConnectionThrottler.class.getName()); + + private final Object monitor = new Object(); + private final Collection<ResourceLimit> resourceLimits = new ArrayList<>(); + private final AbstractConnector connector; + private final Duration idleTimeout; + private final Scheduler scheduler; + + private boolean isRegistered = false; + private boolean isThrottling = false; + + ConnectionThrottler(AbstractConnector connector, ConnectorConfig.Throttling config) { + this(Runtime.getRuntime(), new RateStatistic(1, TimeUnit.SECONDS), connector.getScheduler(), connector, config); + } + + // Intended for unit testing + ConnectionThrottler(Runtime runtime, + RateStatistic rateStatistic, + Scheduler scheduler, + AbstractConnector connector, + ConnectorConfig.Throttling config) { + this.connector = connector; + if (config.maxHeapUtilization() != -1) { + this.resourceLimits.add(new HeapResourceLimit(runtime, config.maxHeapUtilization())); + } + if (config.maxConnections() != -1) { + this.resourceLimits.add(new ConnectionLimitThreshold(config.maxConnections())); + } + if (config.maxAcceptRate() != -1) { + this.resourceLimits.add(new AcceptRateLimit(rateStatistic, config.maxAcceptRate())); + } + this.idleTimeout = config.idleTimeout() != -1 ? Duration.ofMillis((long) (config.idleTimeout()*1000)) : null; + this.scheduler = scheduler; + } + + void registerWithConnector() { + synchronized (monitor) { + if (isRegistered) return; + isRegistered = true; + resourceLimits.forEach(connector::addBean); + connector.addBean(this); + } + } + + @Override + public void onAccepting(SelectableChannel channel) { + throttleIfAnyThresholdIsExceeded(); + } + + private void throttleIfAnyThresholdIsExceeded() { + synchronized (monitor) { + if (isThrottling) return; + List<String> reasons = getThrottlingReasons(); + if (reasons.isEmpty()) return; + log.warning(String.format("Throttling new connection. Reasons: %s", reasons)); + isThrottling = true; + if (connector.isAccepting()) { + connector.setAccepting(false); + } + if (idleTimeout != null) { + log.warning(String.format("Applying idle timeout to existing connections: timeout=%sms", idleTimeout)); + connector.getConnectedEndPoints() + .forEach(endPoint -> endPoint.setIdleTimeout(idleTimeout.toMillis())); + } + scheduler.schedule(this::unthrottleIfBelowThresholds, 1, TimeUnit.SECONDS); + } + } + + private void unthrottleIfBelowThresholds() { + synchronized (monitor) { + if (!isThrottling) return; + List<String> reasons = getThrottlingReasons(); + if (!reasons.isEmpty()) { + log.warning(String.format("Throttling continued. Reasons: %s", reasons)); + scheduler.schedule(this::unthrottleIfBelowThresholds, 1, TimeUnit.SECONDS); + return; + } + if (idleTimeout != null) { + long originalTimeout = connector.getIdleTimeout(); + log.info(String.format("Reverting idle timeout for existing connections: timeout=%sms", originalTimeout)); + connector.getConnectedEndPoints() + .forEach(endPoint -> endPoint.setIdleTimeout(originalTimeout)); + } + log.info("Throttling disabled - resource thresholds no longer exceeded"); + if (!connector.isAccepting()) { + connector.setAccepting(true); + } + isThrottling = false; + } + } + + private List<String> getThrottlingReasons() { + synchronized (monitor) { + return resourceLimits.stream() + .map(ResourceLimit::isThresholdExceeded) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } + } + + private interface ResourceLimit extends LifeCycle, SelectorManager.AcceptListener, Connection.Listener { + /** + * @return A string containing the reason if threshold exceeded, empty otherwise. + */ + Optional<String> isThresholdExceeded(); + + @Override default void onOpened(Connection connection) {} + + @Override default void onClosed(Connection connection) {} + } + + /** + * Note: implementation inspired by Jetty's {@link LowResourceMonitor} + */ + private static class HeapResourceLimit extends AbstractLifeCycle implements ResourceLimit { + private final Runtime runtime; + private final double maxHeapUtilization; + + HeapResourceLimit(Runtime runtime, double maxHeapUtilization) { + this.runtime = runtime; + this.maxHeapUtilization = maxHeapUtilization; + } + + @Override + public Optional<String> isThresholdExceeded() { + double heapUtilization = (runtime.maxMemory() - runtime.freeMemory()) / (double) runtime.maxMemory(); + if (heapUtilization > maxHeapUtilization) { + return Optional.of(String.format("Max heap utilization exceeded: %f%%>%f%%", heapUtilization*100, maxHeapUtilization*100)); + } + return Optional.empty(); + } + } + + /** + * Note: implementation inspired by Jetty's {@link org.eclipse.jetty.server.AcceptRateLimit} + */ + private static class AcceptRateLimit extends AbstractLifeCycle implements ResourceLimit { + private final Object monitor = new Object(); + private final RateStatistic rateStatistic; + private final int maxAcceptRate; + + AcceptRateLimit(RateStatistic rateStatistic, int maxAcceptRate) { + this.rateStatistic = rateStatistic; + this.maxAcceptRate = maxAcceptRate; + } + + @Override + public Optional<String> isThresholdExceeded() { + synchronized (monitor) { + int acceptRate = rateStatistic.getRate(); + if (acceptRate > maxAcceptRate) { + return Optional.of(String.format("Max accept rate exceeded: %d>%d", acceptRate, maxAcceptRate)); + } + return Optional.empty(); + } + } + + @Override + public void onAccepting(SelectableChannel channel) { + synchronized (monitor) { + rateStatistic.record(); + } + } + + @Override + protected void doStop() { + synchronized (monitor) { + rateStatistic.reset(); + } + } + } + + /** + * Note: implementation inspired by Jetty's {@link ConnectionLimit}. + */ + private static class ConnectionLimitThreshold extends AbstractLifeCycle implements ResourceLimit { + private final Object monitor = new Object(); + private final int maxConnections; + private final Set<SelectableChannel> connectionsAccepting = new HashSet<>(); + private int connectionOpened; + + ConnectionLimitThreshold(int maxConnections) { + this.maxConnections = maxConnections; + } + + @Override + public Optional<String> isThresholdExceeded() { + synchronized (monitor) { + int totalConnections = connectionOpened + connectionsAccepting.size(); + if (totalConnections > maxConnections) { + return Optional.of(String.format("Max connection exceeded: %d>%d", totalConnections, maxConnections)); + } + return Optional.empty(); + } + } + + @Override + public void onOpened(Connection connection) { + synchronized (monitor) { + connectionsAccepting.remove(connection.getEndPoint().getTransport()); + ++connectionOpened; + } + } + + @Override + public void onClosed(Connection connection) { + synchronized (monitor) { + --connectionOpened; + } + } + + @Override + public void onAccepting(SelectableChannel channel) { + synchronized (monitor) { + connectionsAccepting.add(channel); + } + + } + + @Override + public void onAcceptFailed(SelectableChannel channel, Throwable cause) { + synchronized (monitor) { + connectionsAccepting.remove(channel); + } + } + + @Override + protected void doStop() { + synchronized (monitor) { + connectionsAccepting.clear(); + connectionOpened = 0; + } + } + } +} diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java index f7d6e1717af..6b371473a57 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java @@ -50,6 +50,10 @@ class JDiscServerConnector extends ServerConnector { this.statistics = new ServerConnectionStatistics(); addBean(statistics); + ConnectorConfig.Throttling throttlingConfig = config.throttling(); + if (throttlingConfig.enabled()) { + new ConnectionThrottler(this, throttlingConfig).registerWithConnector(); + } } @Override diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def index 157ffabdd63..676fdbc8157 100644 --- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def +++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def @@ -41,6 +41,21 @@ tcpKeepAliveEnabled bool default=false # Enable/disable TCP_NODELAY (disable/enable Nagle's algorithm). tcpNoDelay bool default=true +# Whether to enable connection throttling. New connections will be dropped when a threshold is exceeded. +throttling.enabled bool default=false + +# Max number of connections. +throttling.maxConnections int default=-1 + +# Max memory utilization as a value between 0 and 1. +throttling.maxHeapUtilization double default=-1.0 + +# Max connection accept rate per second. +throttling.maxAcceptRate int default=-1 + +# Idle timeout in seconds applied to endpoints when a threshold is exceeded. +throttling.idleTimeout double default=-1.0 + # Whether to enable SSL for this connector. ssl.enabled bool default=false diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottlerTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottlerTest.java new file mode 100644 index 00000000000..9e66418c382 --- /dev/null +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottlerTest.java @@ -0,0 +1,78 @@ +// Copyright 2019 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jdisc.http.server.jetty; + +import com.yahoo.jdisc.http.ConnectorConfig; +import org.eclipse.jetty.server.AbstractConnector; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.statistic.RateStatistic; +import org.eclipse.jetty.util.thread.Scheduler; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import static org.testng.Assert.assertNotNull; + +/** + * @author bjorncs + */ +public class ConnectionThrottlerTest { + + @Test + public void throttles_when_any_resource_check_exceeds_configured_threshold() { + Runtime runtime = mock(Runtime.class); + when(runtime.maxMemory()).thenReturn(100l); + RateStatistic rateStatistic = new RateStatistic(1, TimeUnit.HOURS); + MockScheduler scheduler = new MockScheduler(); + ConnectorConfig.Throttling config = new ConnectorConfig.Throttling(new ConnectorConfig.Throttling.Builder() + .maxHeapUtilization(0.8) + .maxAcceptRate(1)); + + AbstractConnector connector = mock(AbstractConnector.class); + + ConnectionThrottler throttler = new ConnectionThrottler(runtime, rateStatistic, scheduler, connector, config); + + // Heap utilization above configured threshold, but connection rate below threshold. + when(runtime.freeMemory()).thenReturn(10l); + when(connector.isAccepting()).thenReturn(true); + throttler.onAccepting(null); + assertNotNull(scheduler.task); + verify(connector).setAccepting(false); + + // Heap utilization below threshold, but connection rate above threshold. + when(runtime.freeMemory()).thenReturn(80l); + rateStatistic.record(); + rateStatistic.record(); // above accept rate limit (2 > 1) + scheduler.task.run(); // run unthrottleIfBelowThresholds() + verify(connector, times(1)).setAccepting(anyBoolean()); // verify setAccepting has not been called any mores times + + // Both heap utilization and accept rate below threshold + when(runtime.freeMemory()).thenReturn(80l); + when(connector.isAccepting()).thenReturn(false); + rateStatistic.reset(); + scheduler.task.run(); // run unthrottleIfBelowThresholds() + verify(connector).setAccepting(true); + + // Both heap utilization and accept rate below threshold + when(connector.isAccepting()).thenReturn(true); + when(runtime.freeMemory()).thenReturn(80l); + rateStatistic.record(); + throttler.onAccepting(null); + verify(connector, times(2)).setAccepting(anyBoolean()); // verify setAccepting has not been called any mores times + } + + private static class MockScheduler extends AbstractLifeCycle implements Scheduler { + Runnable task; + + @Override + public Task schedule(Runnable task, long delay, TimeUnit units) { + this.task = task; + return () -> false; + } + } + +}
\ No newline at end of file diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java index 30d5f9e657a..db0746baf2e 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java @@ -16,6 +16,7 @@ import com.yahoo.jdisc.handler.RequestHandler; import com.yahoo.jdisc.handler.ResponseDispatch; import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.http.ConnectorConfig; +import com.yahoo.jdisc.http.ConnectorConfig.Throttling; import com.yahoo.jdisc.http.Cookie; import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.HttpResponse; @@ -480,6 +481,22 @@ public class HttpServerTest { assertThat(driver.close(), is(true)); } + @Test + public void requireThatConnectionThrottleDoesNotBlockConnectionsBelowThreshold() throws Exception { + final TestDriver driver = TestDrivers.newConfiguredInstance( + new EchoRequestHandler(), + new ServerConfig.Builder(), + new ConnectorConfig.Builder() + .throttling(new Throttling.Builder() + .enabled(true) + .maxAcceptRate(10) + .maxHeapUtilization(1.0) + .maxConnections(10))); + driver.client().get("/status.html") + .expectStatusCode(is(OK)); + assertThat(driver.close(), is(true)); + } + private static RequestHandler mockRequestHandler() { final RequestHandler mockRequestHandler = mock(RequestHandler.class); when(mockRequestHandler.refer()).thenReturn(References.NOOP_REFERENCE); |