summaryrefslogtreecommitdiffstats
path: root/metrics-proxy
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-09-16 18:12:25 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2021-09-16 18:12:25 +0200
commitf19cf2dd0f1f80735522ec868700d91f93f53076 (patch)
treeb1a51ad154a958000ba4a4fc17c8eead8e87760a /metrics-proxy
parent751c42ec11614c9bac3d2bb56b9616df93c991d5 (diff)
Use an async http client with only 2 threads.
Diffstat (limited to 'metrics-proxy')
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java51
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java67
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java11
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java39
4 files changed, 88 insertions, 80 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
index d729715b737..c0f22e38e11 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
@@ -3,15 +3,16 @@ package ai.vespa.metricsproxy.http.application;
import ai.vespa.metricsproxy.metric.model.ConsumerId;
import ai.vespa.metricsproxy.metric.model.MetricsPacket;
-import ai.vespa.util.http.hc5.VespaHttpClientBuilder;
+import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
-import com.yahoo.concurrent.ThreadFactoryFactory;
-import org.apache.hc.client5.http.classic.HttpClient;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;
+import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
@@ -20,8 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -41,7 +40,6 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName());
- static final int MAX_THREADS = 20;
static final Duration MIN_TIMEOUT = Duration.ofSeconds(60);
static final Duration MAX_TIMEOUT = Duration.ofSeconds(240);
@@ -49,9 +47,8 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
private static final int HTTP_SOCKET_TIMEOUT = 30000;
private static final Duration METRICS_TTL = Duration.ofSeconds(30);
- private final HttpClient httpClient = createHttpClient();
+ private final CloseableHttpAsyncClient httpClient = createHttpClient();
private final List<NodeMetricsClient> clients;
- private final ExecutorService fetchPool;
private final Thread pollThread;
private final Set<ConsumerId> consumerSet;
private long pollCount = 0;
@@ -63,12 +60,11 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
@Inject
public ApplicationMetricsRetriever(MetricsNodesConfig nodesConfig) {
clients = createNodeClients(nodesConfig);
- int numThreads = Math.min(clients.size(), MAX_THREADS);
- taskTimeout = timeout(clients.size(), numThreads);
- fetchPool = Executors.newFixedThreadPool(numThreads, ThreadFactoryFactory.getDaemonThreadFactory("metrics-fetcher"));
+ taskTimeout = timeout(clients.size());
stopped = false;
consumerSet = new HashSet<>();
consumerSet.add(defaultMetricsConsumerId);
+ httpClient.start();
pollThread = new Thread(this, "metrics-poller");
pollThread.setDaemon(true);
pollThread.start();
@@ -107,7 +103,11 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
stopped = true;
pollThread.notifyAll();
}
- fetchPool.shutdownNow();
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ log.warning("Failed closing httpclient: " + e);
+ }
try {
pollThread.join();
} catch (InterruptedException e) {}
@@ -151,14 +151,15 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
private int fetchMetricsAsync(ConsumerId consumer) {
Map<Node, Future<Boolean>> futures = new HashMap<>();
for (NodeMetricsClient client : clients) {
- futures.put(client.node, fetchPool.submit(() -> updateMetrics(client, consumer)));
+ var optional = client.startSnapshotUpdate(consumer, METRICS_TTL);
+ optional.ifPresent(future -> futures.put(client.node, future));
}
int numOk = 0;
int numTried = futures.size();
for (Map.Entry<Node, Future<Boolean>> entry : futures.entrySet()) {
try {
Boolean result = entry.getValue().get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS);
- if (result != null && result) numOk++;
+ if ((result != null) && result) numOk++;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Since the task is a ForkJoinTask, we don't need special handling of InterruptedException
log.log(Level.WARNING, "Failed retrieving metrics for '" + entry.getKey() + "' : ", e);
@@ -168,24 +169,16 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
return numTried - numOk;
}
- private boolean updateMetrics(NodeMetricsClient client, ConsumerId consumer) {
- try {
- return client.updateSnapshots(consumer, METRICS_TTL);
- } catch (Exception e) {
- log.log(Level.WARNING, "Could not retrieve metrics from " + client.node.metricsUri(consumer), e);
- return false;
- }
- }
-
private List<NodeMetricsClient> createNodeClients(MetricsNodesConfig nodesConfig) {
return nodesConfig.node().stream()
.map(Node::new)
.map(node-> new NodeMetricsClient(httpClient, node, Clock.systemUTC()))
.collect(Collectors.toList());
- }
+ }
- private static CloseableHttpClient createHttpClient() {
- return VespaHttpClientBuilder.create()
+ static CloseableHttpAsyncClient createHttpClient() {
+ return VespaAsyncHttpClientBuilder.create()
+ .setIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build())
.setUserAgent("application-metrics-retriever")
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(Timeout.ofMilliseconds(HTTP_CONNECT_TIMEOUT))
@@ -194,8 +187,8 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
.build();
}
- static Duration timeout(int clients, int numThreads) {
- Duration timeout = Duration.ofSeconds(Long.max(MIN_TIMEOUT.toSeconds(), 20 * clients / numThreads));
+ static Duration timeout(int clients) {
+ Duration timeout = Duration.ofSeconds(Long.max(MIN_TIMEOUT.toSeconds(), clients));
return timeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : timeout;
}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
index 0a8597dc6e3..e4051f4dd5b 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
@@ -5,23 +5,26 @@ import ai.vespa.metricsproxy.metric.model.ConsumerId;
import ai.vespa.metricsproxy.metric.model.MetricsPacket;
import ai.vespa.metricsproxy.metric.model.json.GenericJsonUtil;
import ai.vespa.metricsproxy.metric.model.processing.MetricsProcessor;
-import com.yahoo.yolean.Exceptions;
-import org.apache.hc.client5.http.classic.HttpClient;
-import org.apache.hc.client5.http.classic.methods.HttpGet;
-import org.apache.hc.client5.http.impl.classic.BasicHttpClientResponseHandler;
-import java.io.IOException;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.concurrent.FutureCallback;
+
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import static ai.vespa.metricsproxy.metric.model.processing.MetricsProcessor.applyProcessors;
-import static java.util.Collections.emptyList;
import static java.util.logging.Level.FINE;
import static java.util.stream.Collectors.toList;
@@ -42,13 +45,13 @@ public class NodeMetricsClient {
private static final int MAX_DIMENSIONS = 10;
final Node node;
- private final HttpClient httpClient;
+ private final CloseableHttpAsyncClient httpClient;
private final Clock clock;
private final Map<ConsumerId, Snapshot> snapshots = new ConcurrentHashMap<>();
- private AtomicLong snapshotsRetrieved = new AtomicLong();
+ private final AtomicLong snapshotsRetrieved = new AtomicLong();
- NodeMetricsClient(HttpClient httpClient, Node node, Clock clock) {
+ NodeMetricsClient(CloseableHttpAsyncClient httpClient, Node node, Clock clock) {
this.httpClient = httpClient;
this.node = node;
this.clock = clock;
@@ -59,34 +62,38 @@ public class NodeMetricsClient {
return (snapshot != null) ? snapshot.metrics : List.of();
}
- boolean updateSnapshots(ConsumerId consumer, Duration ttl) {
+ Optional<Future<Boolean>> startSnapshotUpdate(ConsumerId consumer, Duration ttl) {
var snapshot = snapshots.get(consumer);
- if ((snapshot != null) && snapshot.isValid(clock.instant(), ttl)) return true;
+ if ((snapshot != null) && snapshot.isValid(clock.instant(), ttl)) return Optional.empty();
- snapshot = retrieveMetrics(consumer);
- snapshots.put(consumer, snapshot);
- return ! snapshot.metrics.isEmpty();
+ return Optional.of(retrieveMetrics(consumer));
}
- private Snapshot retrieveMetrics(ConsumerId consumer) {
+ private Future<Boolean> retrieveMetrics(ConsumerId consumer) {
String metricsUri = node.metricsUri(consumer).toString();
log.log(FINE, () -> "Retrieving metrics from host " + metricsUri);
- try {
- String metricsJson = httpClient.execute(new HttpGet(metricsUri), new BasicHttpClientResponseHandler());
- var metricsBuilders = GenericJsonUtil.toMetricsPackets(metricsJson);
- var metrics = processAndBuild(metricsBuilders,
- new ServiceIdDimensionProcessor(),
- new ClusterIdDimensionProcessor(),
- new PublicDimensionsProcessor(MAX_DIMENSIONS));
- snapshotsRetrieved.incrementAndGet();
- log.log(FINE, () -> "Successfully retrieved " + metrics.size() + " metrics packets from " + metricsUri);
-
- return new Snapshot(Instant.now(clock), metrics);
- } catch (IOException e) {
- log.warning("Unable to retrieve metrics from " + metricsUri + ": " + Exceptions.toMessageString(e));
- return new Snapshot(Instant.now(clock), emptyList());
- }
+ CompletableFuture<Boolean> onDone = new CompletableFuture<>();
+ httpClient.execute(SimpleRequestBuilder.get(metricsUri).build(),
+ new FutureCallback<>() {
+ @Override public void completed(SimpleHttpResponse result) {
+ handleResponse(metricsUri, consumer, result.getBodyText());
+ onDone.complete(true);
+ }
+ @Override public void failed(Exception ex) { onDone.completeExceptionally(ex); }
+ @Override public void cancelled() { onDone.cancel(false); }
+ });
+ return onDone;
+ }
+
+ void handleResponse(String metricsUri, ConsumerId consumer, String respons) {
+ var metrics = processAndBuild(GenericJsonUtil.toMetricsPackets(respons),
+ new ServiceIdDimensionProcessor(),
+ new ClusterIdDimensionProcessor(),
+ new PublicDimensionsProcessor(MAX_DIMENSIONS));
+ snapshotsRetrieved.incrementAndGet();
+ log.log(FINE, () -> "Successfully retrieved " + metrics.size() + " metrics packets from " + metricsUri);
+ snapshots.put(consumer, new Snapshot(Instant.now(clock), metrics));
}
private static List<MetricsPacket> processAndBuild(List<MetricsPacket.Builder> builders,
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
index b478f1ce632..09cc355d292 100644
--- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
@@ -10,7 +10,6 @@ import java.time.Duration;
import java.util.Arrays;
import static ai.vespa.metricsproxy.TestUtil.getFileContents;
-import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.MAX_THREADS;
import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.MAX_TIMEOUT;
import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.MIN_TIMEOUT;
import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.timeout;
@@ -145,13 +144,13 @@ public class ApplicationMetricsRetrieverTest {
@Test
public void test_timeout_calculation() {
- assertEquals(MIN_TIMEOUT, timeout(1, 1));
- assertEquals(MIN_TIMEOUT, timeout(MAX_THREADS, MAX_THREADS));
+ assertEquals(MIN_TIMEOUT, timeout(1));
+ assertEquals(MIN_TIMEOUT, timeout(20));
// These values must be updated if the calculation in the timeout method itself is changed.
- assertEquals(Duration.ofSeconds(100), timeout(100, MAX_THREADS));
- assertEquals(Duration.ofSeconds(200), timeout(200, MAX_THREADS));
- assertEquals(MAX_TIMEOUT, timeout(240, MAX_THREADS));
+ assertEquals(Duration.ofSeconds(100), timeout(100));
+ assertEquals(Duration.ofSeconds(200), timeout(200));
+ assertEquals(MAX_TIMEOUT, timeout(240));
}
private MetricsNodesConfig nodesConfig(String... paths) {
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java
index 6644579147f..4bf62a62445 100644
--- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java
@@ -2,12 +2,12 @@
package ai.vespa.metricsproxy.http.application;
import ai.vespa.metricsproxy.http.metrics.MetricsV1Handler;
+import ai.vespa.metricsproxy.metric.model.ConsumerId;
import ai.vespa.metricsproxy.metric.model.MetricsPacket;
import com.github.tomakehurst.wiremock.junit.WireMockClassRule;
import com.yahoo.test.ManualClock;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
-import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -16,6 +16,7 @@ import org.junit.Test;
import java.net.URI;
import java.time.Duration;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import static ai.vespa.metricsproxy.TestUtil.getFileContents;
import static ai.vespa.metricsproxy.http.ValuesFetcher.defaultMetricsConsumerId;
@@ -41,7 +42,7 @@ public class NodeMetricsClientTest {
private static final String TEST_FILE = "generic-sample.json";
private static final String RESPONSE = getFileContents(TEST_FILE);
- private static final CloseableHttpClient httpClient = HttpClients.createDefault();
+ private static final CloseableHttpAsyncClient httpClient = ApplicationMetricsRetriever.createHttpClient();
private static final String CPU_METRIC = "cpu.util";
private static final String REPLACED_CPU_METRIC = "replaced_cpu_util";
@@ -51,7 +52,6 @@ public class NodeMetricsClientTest {
private static Node node;
- private ManualClock clock;
private NodeMetricsClient nodeMetricsClient;
@ClassRule
@@ -78,8 +78,8 @@ public class NodeMetricsClientTest {
@Before
public void setupClient() {
- clock = new ManualClock();
- nodeMetricsClient = new NodeMetricsClient(httpClient, node, clock);
+ httpClient.start();
+ nodeMetricsClient = new NodeMetricsClient(httpClient, node, new ManualClock());
}
@Test
@@ -88,10 +88,10 @@ public class NodeMetricsClientTest {
}
@Test
- public void metrics_are_retrieved_upon_first_update() {
+ public void metrics_are_retrieved_upon_first_update() throws InterruptedException, ExecutionException {
assertEquals(0, nodeMetricsClient.getMetrics(defaultMetricsConsumerId).size());
assertEquals(0, nodeMetricsClient.snapshotsRetrieved());
- assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
+ updateSnapshot(defaultMetricsConsumerId, TTL);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
List<MetricsPacket> metrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
@@ -101,31 +101,31 @@ public class NodeMetricsClientTest {
@Test
public void metrics_are_refreshed_on_every_update() {
assertEquals(0, nodeMetricsClient.snapshotsRetrieved());
- assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
+ updateSnapshot(defaultMetricsConsumerId, TTL);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
- assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO));
+ updateSnapshot(defaultMetricsConsumerId, Duration.ZERO);
assertEquals(2, nodeMetricsClient.snapshotsRetrieved());
}
@Test
public void metrics_are_not_refreshed_if_ttl_not_expired() {
assertEquals(0, nodeMetricsClient.snapshotsRetrieved());
- assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
+ updateSnapshot(defaultMetricsConsumerId, TTL);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
- assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
+ updateSnapshot(defaultMetricsConsumerId, TTL);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
- assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO));
+ updateSnapshot(defaultMetricsConsumerId, Duration.ZERO);
assertEquals(2, nodeMetricsClient.snapshotsRetrieved());
}
@Test
public void metrics_for_different_consumers_are_cached_separately() {
- assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId,TTL));
+ updateSnapshot(defaultMetricsConsumerId, TTL);
List<MetricsPacket> defaultMetrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
assertEquals(4, defaultMetrics.size());
- assertTrue(nodeMetricsClient.updateSnapshots(toConsumerId(CUSTOM_CONSUMER), TTL));
+ updateSnapshot(toConsumerId(CUSTOM_CONSUMER), TTL);
List<MetricsPacket> customMetrics = nodeMetricsClient.getMetrics(toConsumerId(CUSTOM_CONSUMER));
assertEquals(2, nodeMetricsClient.snapshotsRetrieved());
assertEquals(4, customMetrics.size());
@@ -133,4 +133,13 @@ public class NodeMetricsClientTest {
MetricsPacket replacedCpuMetric = customMetrics.get(0);
assertTrue(replacedCpuMetric.metrics().containsKey(toMetricId(REPLACED_CPU_METRIC)));
}
+ private void updateSnapshot(ConsumerId consumerId, Duration ttl) {
+
+ var optional = nodeMetricsClient.startSnapshotUpdate(consumerId, ttl);
+ optional.ifPresent(future -> {
+ try {
+ assertTrue(future.get());
+ } catch (InterruptedException | ExecutionException e) {}
+ });
+ }
}