summaryrefslogtreecommitdiffstats
path: root/jdisc_http_service
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@oath.com>2018-12-07 12:01:48 +0100
committerBjørn Christian Seime <bjorncs@oath.com>2019-01-07 16:26:32 +0100
commit6dd199e9b4a982a2a748b41a19e795c74b6e8f92 (patch)
tree1c30491c33b3d127e7677ebfc3ae6be960d3f023 /jdisc_http_service
parentd8ec344baeeda88363af3736f495742f4e0b0846 (diff)
Add support for connection throttling in JDisc
Diffstat (limited to 'jdisc_http_service')
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottler.java274
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java4
-rw-r--r--jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def15
-rw-r--r--jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottlerTest.java78
-rw-r--r--jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java17
5 files changed, 388 insertions, 0 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottler.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottler.java
new file mode 100644
index 00000000000..b9001d187a9
--- /dev/null
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottler.java
@@ -0,0 +1,274 @@
+// Copyright 2019 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.http.server.jetty;
+
+import com.yahoo.jdisc.http.ConnectorConfig;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.SelectorManager;
+import org.eclipse.jetty.server.AbstractConnector;
+import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.LowResourceMonitor;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
+import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.statistic.RateStatistic;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+import java.nio.channels.SelectableChannel;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Monitor various resource constraints and throttles new connections once a threshold is exceeded.
+ * Implementation inspired by Jetty's {@link LowResourceMonitor}, {@link AcceptRateLimit} and {@link ConnectionLimit}.
+ *
+ * @author bjorncs
+ */
+@ManagedObject("Monitor various resource constraints and throttles new connections once a threshold is exceeded")
+class ConnectionThrottler extends ContainerLifeCycle implements SelectorManager.AcceptListener {
+
+ private static final Logger log = Logger.getLogger(ConnectionThrottler.class.getName());
+
+ private final Object monitor = new Object();
+ private final Collection<ResourceLimit> resourceLimits = new ArrayList<>();
+ private final AbstractConnector connector;
+ private final Duration idleTimeout;
+ private final Scheduler scheduler;
+
+ private boolean isRegistered = false;
+ private boolean isThrottling = false;
+
+ ConnectionThrottler(AbstractConnector connector, ConnectorConfig.Throttling config) {
+ this(Runtime.getRuntime(), new RateStatistic(1, TimeUnit.SECONDS), connector.getScheduler(), connector, config);
+ }
+
+ // Intended for unit testing
+ ConnectionThrottler(Runtime runtime,
+ RateStatistic rateStatistic,
+ Scheduler scheduler,
+ AbstractConnector connector,
+ ConnectorConfig.Throttling config) {
+ this.connector = connector;
+ if (config.maxHeapUtilization() != -1) {
+ this.resourceLimits.add(new HeapResourceLimit(runtime, config.maxHeapUtilization()));
+ }
+ if (config.maxConnections() != -1) {
+ this.resourceLimits.add(new ConnectionLimitThreshold(config.maxConnections()));
+ }
+ if (config.maxAcceptRate() != -1) {
+ this.resourceLimits.add(new AcceptRateLimit(rateStatistic, config.maxAcceptRate()));
+ }
+ this.idleTimeout = config.idleTimeout() != -1 ? Duration.ofMillis((long) (config.idleTimeout()*1000)) : null;
+ this.scheduler = scheduler;
+ }
+
+ void registerWithConnector() {
+ synchronized (monitor) {
+ if (isRegistered) return;
+ isRegistered = true;
+ resourceLimits.forEach(connector::addBean);
+ connector.addBean(this);
+ }
+ }
+
+ @Override
+ public void onAccepting(SelectableChannel channel) {
+ throttleIfAnyThresholdIsExceeded();
+ }
+
+ private void throttleIfAnyThresholdIsExceeded() {
+ synchronized (monitor) {
+ if (isThrottling) return;
+ List<String> reasons = getThrottlingReasons();
+ if (reasons.isEmpty()) return;
+ log.warning(String.format("Throttling new connection. Reasons: %s", reasons));
+ isThrottling = true;
+ if (connector.isAccepting()) {
+ connector.setAccepting(false);
+ }
+ if (idleTimeout != null) {
+ log.warning(String.format("Applying idle timeout to existing connections: timeout=%sms", idleTimeout));
+ connector.getConnectedEndPoints()
+ .forEach(endPoint -> endPoint.setIdleTimeout(idleTimeout.toMillis()));
+ }
+ scheduler.schedule(this::unthrottleIfBelowThresholds, 1, TimeUnit.SECONDS);
+ }
+ }
+
+ private void unthrottleIfBelowThresholds() {
+ synchronized (monitor) {
+ if (!isThrottling) return;
+ List<String> reasons = getThrottlingReasons();
+ if (!reasons.isEmpty()) {
+ log.warning(String.format("Throttling continued. Reasons: %s", reasons));
+ scheduler.schedule(this::unthrottleIfBelowThresholds, 1, TimeUnit.SECONDS);
+ return;
+ }
+ if (idleTimeout != null) {
+ long originalTimeout = connector.getIdleTimeout();
+ log.info(String.format("Reverting idle timeout for existing connections: timeout=%sms", originalTimeout));
+ connector.getConnectedEndPoints()
+ .forEach(endPoint -> endPoint.setIdleTimeout(originalTimeout));
+ }
+ log.info("Throttling disabled - resource thresholds no longer exceeded");
+ if (!connector.isAccepting()) {
+ connector.setAccepting(true);
+ }
+ isThrottling = false;
+ }
+ }
+
+ private List<String> getThrottlingReasons() {
+ synchronized (monitor) {
+ return resourceLimits.stream()
+ .map(ResourceLimit::isThresholdExceeded)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(toList());
+ }
+ }
+
+ private interface ResourceLimit extends LifeCycle, SelectorManager.AcceptListener, Connection.Listener {
+ /**
+ * @return A string containing the reason if threshold exceeded, empty otherwise.
+ */
+ Optional<String> isThresholdExceeded();
+
+ @Override default void onOpened(Connection connection) {}
+
+ @Override default void onClosed(Connection connection) {}
+ }
+
+ /**
+ * Note: implementation inspired by Jetty's {@link LowResourceMonitor}
+ */
+ private static class HeapResourceLimit extends AbstractLifeCycle implements ResourceLimit {
+ private final Runtime runtime;
+ private final double maxHeapUtilization;
+
+ HeapResourceLimit(Runtime runtime, double maxHeapUtilization) {
+ this.runtime = runtime;
+ this.maxHeapUtilization = maxHeapUtilization;
+ }
+
+ @Override
+ public Optional<String> isThresholdExceeded() {
+ double heapUtilization = (runtime.maxMemory() - runtime.freeMemory()) / (double) runtime.maxMemory();
+ if (heapUtilization > maxHeapUtilization) {
+ return Optional.of(String.format("Max heap utilization exceeded: %f%%>%f%%", heapUtilization*100, maxHeapUtilization*100));
+ }
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Note: implementation inspired by Jetty's {@link org.eclipse.jetty.server.AcceptRateLimit}
+ */
+ private static class AcceptRateLimit extends AbstractLifeCycle implements ResourceLimit {
+ private final Object monitor = new Object();
+ private final RateStatistic rateStatistic;
+ private final int maxAcceptRate;
+
+ AcceptRateLimit(RateStatistic rateStatistic, int maxAcceptRate) {
+ this.rateStatistic = rateStatistic;
+ this.maxAcceptRate = maxAcceptRate;
+ }
+
+ @Override
+ public Optional<String> isThresholdExceeded() {
+ synchronized (monitor) {
+ int acceptRate = rateStatistic.getRate();
+ if (acceptRate > maxAcceptRate) {
+ return Optional.of(String.format("Max accept rate exceeded: %d>%d", acceptRate, maxAcceptRate));
+ }
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void onAccepting(SelectableChannel channel) {
+ synchronized (monitor) {
+ rateStatistic.record();
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ synchronized (monitor) {
+ rateStatistic.reset();
+ }
+ }
+ }
+
+ /**
+ * Note: implementation inspired by Jetty's {@link ConnectionLimit}.
+ */
+ private static class ConnectionLimitThreshold extends AbstractLifeCycle implements ResourceLimit {
+ private final Object monitor = new Object();
+ private final int maxConnections;
+ private final Set<SelectableChannel> connectionsAccepting = new HashSet<>();
+ private int connectionOpened;
+
+ ConnectionLimitThreshold(int maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
+ @Override
+ public Optional<String> isThresholdExceeded() {
+ synchronized (monitor) {
+ int totalConnections = connectionOpened + connectionsAccepting.size();
+ if (totalConnections > maxConnections) {
+ return Optional.of(String.format("Max connection exceeded: %d>%d", totalConnections, maxConnections));
+ }
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void onOpened(Connection connection) {
+ synchronized (monitor) {
+ connectionsAccepting.remove(connection.getEndPoint().getTransport());
+ ++connectionOpened;
+ }
+ }
+
+ @Override
+ public void onClosed(Connection connection) {
+ synchronized (monitor) {
+ --connectionOpened;
+ }
+ }
+
+ @Override
+ public void onAccepting(SelectableChannel channel) {
+ synchronized (monitor) {
+ connectionsAccepting.add(channel);
+ }
+
+ }
+
+ @Override
+ public void onAcceptFailed(SelectableChannel channel, Throwable cause) {
+ synchronized (monitor) {
+ connectionsAccepting.remove(channel);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ synchronized (monitor) {
+ connectionsAccepting.clear();
+ connectionOpened = 0;
+ }
+ }
+ }
+}
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java
index f7d6e1717af..6b371473a57 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java
@@ -50,6 +50,10 @@ class JDiscServerConnector extends ServerConnector {
this.statistics = new ServerConnectionStatistics();
addBean(statistics);
+ ConnectorConfig.Throttling throttlingConfig = config.throttling();
+ if (throttlingConfig.enabled()) {
+ new ConnectionThrottler(this, throttlingConfig).registerWithConnector();
+ }
}
@Override
diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def
index 157ffabdd63..676fdbc8157 100644
--- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def
+++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def
@@ -41,6 +41,21 @@ tcpKeepAliveEnabled bool default=false
# Enable/disable TCP_NODELAY (disable/enable Nagle's algorithm).
tcpNoDelay bool default=true
+# Whether to enable connection throttling. New connections will be dropped when a threshold is exceeded.
+throttling.enabled bool default=false
+
+# Max number of connections.
+throttling.maxConnections int default=-1
+
+# Max memory utilization as a value between 0 and 1.
+throttling.maxHeapUtilization double default=-1.0
+
+# Max connection accept rate per second.
+throttling.maxAcceptRate int default=-1
+
+# Idle timeout in seconds applied to endpoints when a threshold is exceeded.
+throttling.idleTimeout double default=-1.0
+
# Whether to enable SSL for this connector.
ssl.enabled bool default=false
diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottlerTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottlerTest.java
new file mode 100644
index 00000000000..9e66418c382
--- /dev/null
+++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectionThrottlerTest.java
@@ -0,0 +1,78 @@
+// Copyright 2019 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.http.server.jetty;
+
+import com.yahoo.jdisc.http.ConnectorConfig;
+import org.eclipse.jetty.server.AbstractConnector;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.statistic.RateStatistic;
+import org.eclipse.jetty.util.thread.Scheduler;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * @author bjorncs
+ */
+public class ConnectionThrottlerTest {
+
+ @Test
+ public void throttles_when_any_resource_check_exceeds_configured_threshold() {
+ Runtime runtime = mock(Runtime.class);
+ when(runtime.maxMemory()).thenReturn(100l);
+ RateStatistic rateStatistic = new RateStatistic(1, TimeUnit.HOURS);
+ MockScheduler scheduler = new MockScheduler();
+ ConnectorConfig.Throttling config = new ConnectorConfig.Throttling(new ConnectorConfig.Throttling.Builder()
+ .maxHeapUtilization(0.8)
+ .maxAcceptRate(1));
+
+ AbstractConnector connector = mock(AbstractConnector.class);
+
+ ConnectionThrottler throttler = new ConnectionThrottler(runtime, rateStatistic, scheduler, connector, config);
+
+ // Heap utilization above configured threshold, but connection rate below threshold.
+ when(runtime.freeMemory()).thenReturn(10l);
+ when(connector.isAccepting()).thenReturn(true);
+ throttler.onAccepting(null);
+ assertNotNull(scheduler.task);
+ verify(connector).setAccepting(false);
+
+ // Heap utilization below threshold, but connection rate above threshold.
+ when(runtime.freeMemory()).thenReturn(80l);
+ rateStatistic.record();
+ rateStatistic.record(); // above accept rate limit (2 > 1)
+ scheduler.task.run(); // run unthrottleIfBelowThresholds()
+ verify(connector, times(1)).setAccepting(anyBoolean()); // verify setAccepting has not been called any mores times
+
+ // Both heap utilization and accept rate below threshold
+ when(runtime.freeMemory()).thenReturn(80l);
+ when(connector.isAccepting()).thenReturn(false);
+ rateStatistic.reset();
+ scheduler.task.run(); // run unthrottleIfBelowThresholds()
+ verify(connector).setAccepting(true);
+
+ // Both heap utilization and accept rate below threshold
+ when(connector.isAccepting()).thenReturn(true);
+ when(runtime.freeMemory()).thenReturn(80l);
+ rateStatistic.record();
+ throttler.onAccepting(null);
+ verify(connector, times(2)).setAccepting(anyBoolean()); // verify setAccepting has not been called any mores times
+ }
+
+ private static class MockScheduler extends AbstractLifeCycle implements Scheduler {
+ Runnable task;
+
+ @Override
+ public Task schedule(Runnable task, long delay, TimeUnit units) {
+ this.task = task;
+ return () -> false;
+ }
+ }
+
+} \ No newline at end of file
diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
index 30d5f9e657a..db0746baf2e 100644
--- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
+++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
@@ -16,6 +16,7 @@ import com.yahoo.jdisc.handler.RequestHandler;
import com.yahoo.jdisc.handler.ResponseDispatch;
import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.http.ConnectorConfig;
+import com.yahoo.jdisc.http.ConnectorConfig.Throttling;
import com.yahoo.jdisc.http.Cookie;
import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.HttpResponse;
@@ -480,6 +481,22 @@ public class HttpServerTest {
assertThat(driver.close(), is(true));
}
+ @Test
+ public void requireThatConnectionThrottleDoesNotBlockConnectionsBelowThreshold() throws Exception {
+ final TestDriver driver = TestDrivers.newConfiguredInstance(
+ new EchoRequestHandler(),
+ new ServerConfig.Builder(),
+ new ConnectorConfig.Builder()
+ .throttling(new Throttling.Builder()
+ .enabled(true)
+ .maxAcceptRate(10)
+ .maxHeapUtilization(1.0)
+ .maxConnections(10)));
+ driver.client().get("/status.html")
+ .expectStatusCode(is(OK));
+ assertThat(driver.close(), is(true));
+ }
+
private static RequestHandler mockRequestHandler() {
final RequestHandler mockRequestHandler = mock(RequestHandler.class);
when(mockRequestHandler.refer()).thenReturn(References.NOOP_REFERENCE);