diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-09-16 18:12:25 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-09-16 18:12:25 +0200 |
commit | f19cf2dd0f1f80735522ec868700d91f93f53076 (patch) | |
tree | b1a51ad154a958000ba4a4fc17c8eead8e87760a | |
parent | 751c42ec11614c9bac3d2bb56b9616df93c991d5 (diff) |
Use an async http client with only 2 threads.
4 files changed, 88 insertions, 80 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java index d729715b737..c0f22e38e11 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java @@ -3,15 +3,16 @@ package ai.vespa.metricsproxy.http.application; import ai.vespa.metricsproxy.metric.model.ConsumerId; import ai.vespa.metricsproxy.metric.model.MetricsPacket; -import ai.vespa.util.http.hc5.VespaHttpClientBuilder; +import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; -import com.yahoo.concurrent.ThreadFactoryFactory; -import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.util.Timeout; +import java.io.IOException; import java.time.Clock; import java.time.Duration; import java.util.HashMap; @@ -20,8 +21,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -41,7 +40,6 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName()); - static final int MAX_THREADS = 20; static final Duration MIN_TIMEOUT = Duration.ofSeconds(60); static final Duration MAX_TIMEOUT = Duration.ofSeconds(240); @@ -49,9 +47,8 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru private static final int HTTP_SOCKET_TIMEOUT = 30000; private static final Duration METRICS_TTL = Duration.ofSeconds(30); - private final HttpClient httpClient = createHttpClient(); + private final CloseableHttpAsyncClient httpClient = createHttpClient(); private final List<NodeMetricsClient> clients; - private final ExecutorService fetchPool; private final Thread pollThread; private final Set<ConsumerId> consumerSet; private long pollCount = 0; @@ -63,12 +60,11 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru @Inject public ApplicationMetricsRetriever(MetricsNodesConfig nodesConfig) { clients = createNodeClients(nodesConfig); - int numThreads = Math.min(clients.size(), MAX_THREADS); - taskTimeout = timeout(clients.size(), numThreads); - fetchPool = Executors.newFixedThreadPool(numThreads, ThreadFactoryFactory.getDaemonThreadFactory("metrics-fetcher")); + taskTimeout = timeout(clients.size()); stopped = false; consumerSet = new HashSet<>(); consumerSet.add(defaultMetricsConsumerId); + httpClient.start(); pollThread = new Thread(this, "metrics-poller"); pollThread.setDaemon(true); pollThread.start(); @@ -107,7 +103,11 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru stopped = true; pollThread.notifyAll(); } - fetchPool.shutdownNow(); + try { + httpClient.close(); + } catch (IOException e) { + log.warning("Failed closing httpclient: " + e); + } try { pollThread.join(); } catch (InterruptedException e) {} @@ -151,14 +151,15 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru private int fetchMetricsAsync(ConsumerId consumer) { Map<Node, Future<Boolean>> futures = new HashMap<>(); for (NodeMetricsClient client : clients) { - futures.put(client.node, fetchPool.submit(() -> updateMetrics(client, consumer))); + var optional = client.startSnapshotUpdate(consumer, METRICS_TTL); + optional.ifPresent(future -> futures.put(client.node, future)); } int numOk = 0; int numTried = futures.size(); for (Map.Entry<Node, Future<Boolean>> entry : futures.entrySet()) { try { Boolean result = entry.getValue().get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS); - if (result != null && result) numOk++; + if ((result != null) && result) numOk++; } catch (InterruptedException | ExecutionException | TimeoutException e) { // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException log.log(Level.WARNING, "Failed retrieving metrics for '" + entry.getKey() + "' : ", e); @@ -168,24 +169,16 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru return numTried - numOk; } - private boolean updateMetrics(NodeMetricsClient client, ConsumerId consumer) { - try { - return client.updateSnapshots(consumer, METRICS_TTL); - } catch (Exception e) { - log.log(Level.WARNING, "Could not retrieve metrics from " + client.node.metricsUri(consumer), e); - return false; - } - } - private List<NodeMetricsClient> createNodeClients(MetricsNodesConfig nodesConfig) { return nodesConfig.node().stream() .map(Node::new) .map(node-> new NodeMetricsClient(httpClient, node, Clock.systemUTC())) .collect(Collectors.toList()); - } + } - private static CloseableHttpClient createHttpClient() { - return VespaHttpClientBuilder.create() + static CloseableHttpAsyncClient createHttpClient() { + return VespaAsyncHttpClientBuilder.create() + .setIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build()) .setUserAgent("application-metrics-retriever") .setDefaultRequestConfig(RequestConfig.custom() .setConnectTimeout(Timeout.ofMilliseconds(HTTP_CONNECT_TIMEOUT)) @@ -194,8 +187,8 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru .build(); } - static Duration timeout(int clients, int numThreads) { - Duration timeout = Duration.ofSeconds(Long.max(MIN_TIMEOUT.toSeconds(), 20 * clients / numThreads)); + static Duration timeout(int clients) { + Duration timeout = Duration.ofSeconds(Long.max(MIN_TIMEOUT.toSeconds(), clients)); return timeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : timeout; } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java index 0a8597dc6e3..e4051f4dd5b 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java @@ -5,23 +5,26 @@ import ai.vespa.metricsproxy.metric.model.ConsumerId; import ai.vespa.metricsproxy.metric.model.MetricsPacket; import ai.vespa.metricsproxy.metric.model.json.GenericJsonUtil; import ai.vespa.metricsproxy.metric.model.processing.MetricsProcessor; -import com.yahoo.yolean.Exceptions; -import org.apache.hc.client5.http.classic.HttpClient; -import org.apache.hc.client5.http.classic.methods.HttpGet; -import org.apache.hc.client5.http.impl.classic.BasicHttpClientResponseHandler; -import java.io.IOException; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import static ai.vespa.metricsproxy.metric.model.processing.MetricsProcessor.applyProcessors; -import static java.util.Collections.emptyList; import static java.util.logging.Level.FINE; import static java.util.stream.Collectors.toList; @@ -42,13 +45,13 @@ public class NodeMetricsClient { private static final int MAX_DIMENSIONS = 10; final Node node; - private final HttpClient httpClient; + private final CloseableHttpAsyncClient httpClient; private final Clock clock; private final Map<ConsumerId, Snapshot> snapshots = new ConcurrentHashMap<>(); - private AtomicLong snapshotsRetrieved = new AtomicLong(); + private final AtomicLong snapshotsRetrieved = new AtomicLong(); - NodeMetricsClient(HttpClient httpClient, Node node, Clock clock) { + NodeMetricsClient(CloseableHttpAsyncClient httpClient, Node node, Clock clock) { this.httpClient = httpClient; this.node = node; this.clock = clock; @@ -59,34 +62,38 @@ public class NodeMetricsClient { return (snapshot != null) ? snapshot.metrics : List.of(); } - boolean updateSnapshots(ConsumerId consumer, Duration ttl) { + Optional<Future<Boolean>> startSnapshotUpdate(ConsumerId consumer, Duration ttl) { var snapshot = snapshots.get(consumer); - if ((snapshot != null) && snapshot.isValid(clock.instant(), ttl)) return true; + if ((snapshot != null) && snapshot.isValid(clock.instant(), ttl)) return Optional.empty(); - snapshot = retrieveMetrics(consumer); - snapshots.put(consumer, snapshot); - return ! snapshot.metrics.isEmpty(); + return Optional.of(retrieveMetrics(consumer)); } - private Snapshot retrieveMetrics(ConsumerId consumer) { + private Future<Boolean> retrieveMetrics(ConsumerId consumer) { String metricsUri = node.metricsUri(consumer).toString(); log.log(FINE, () -> "Retrieving metrics from host " + metricsUri); - try { - String metricsJson = httpClient.execute(new HttpGet(metricsUri), new BasicHttpClientResponseHandler()); - var metricsBuilders = GenericJsonUtil.toMetricsPackets(metricsJson); - var metrics = processAndBuild(metricsBuilders, - new ServiceIdDimensionProcessor(), - new ClusterIdDimensionProcessor(), - new PublicDimensionsProcessor(MAX_DIMENSIONS)); - snapshotsRetrieved.incrementAndGet(); - log.log(FINE, () -> "Successfully retrieved " + metrics.size() + " metrics packets from " + metricsUri); - - return new Snapshot(Instant.now(clock), metrics); - } catch (IOException e) { - log.warning("Unable to retrieve metrics from " + metricsUri + ": " + Exceptions.toMessageString(e)); - return new Snapshot(Instant.now(clock), emptyList()); - } + CompletableFuture<Boolean> onDone = new CompletableFuture<>(); + httpClient.execute(SimpleRequestBuilder.get(metricsUri).build(), + new FutureCallback<>() { + @Override public void completed(SimpleHttpResponse result) { + handleResponse(metricsUri, consumer, result.getBodyText()); + onDone.complete(true); + } + @Override public void failed(Exception ex) { onDone.completeExceptionally(ex); } + @Override public void cancelled() { onDone.cancel(false); } + }); + return onDone; + } + + void handleResponse(String metricsUri, ConsumerId consumer, String respons) { + var metrics = processAndBuild(GenericJsonUtil.toMetricsPackets(respons), + new ServiceIdDimensionProcessor(), + new ClusterIdDimensionProcessor(), + new PublicDimensionsProcessor(MAX_DIMENSIONS)); + snapshotsRetrieved.incrementAndGet(); + log.log(FINE, () -> "Successfully retrieved " + metrics.size() + " metrics packets from " + metricsUri); + snapshots.put(consumer, new Snapshot(Instant.now(clock), metrics)); } private static List<MetricsPacket> processAndBuild(List<MetricsPacket.Builder> builders, diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java index b478f1ce632..09cc355d292 100644 --- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java @@ -10,7 +10,6 @@ import java.time.Duration; import java.util.Arrays; import static ai.vespa.metricsproxy.TestUtil.getFileContents; -import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.MAX_THREADS; import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.MAX_TIMEOUT; import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.MIN_TIMEOUT; import static ai.vespa.metricsproxy.http.application.ApplicationMetricsRetriever.timeout; @@ -145,13 +144,13 @@ public class ApplicationMetricsRetrieverTest { @Test public void test_timeout_calculation() { - assertEquals(MIN_TIMEOUT, timeout(1, 1)); - assertEquals(MIN_TIMEOUT, timeout(MAX_THREADS, MAX_THREADS)); + assertEquals(MIN_TIMEOUT, timeout(1)); + assertEquals(MIN_TIMEOUT, timeout(20)); // These values must be updated if the calculation in the timeout method itself is changed. - assertEquals(Duration.ofSeconds(100), timeout(100, MAX_THREADS)); - assertEquals(Duration.ofSeconds(200), timeout(200, MAX_THREADS)); - assertEquals(MAX_TIMEOUT, timeout(240, MAX_THREADS)); + assertEquals(Duration.ofSeconds(100), timeout(100)); + assertEquals(Duration.ofSeconds(200), timeout(200)); + assertEquals(MAX_TIMEOUT, timeout(240)); } private MetricsNodesConfig nodesConfig(String... paths) { diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java index 6644579147f..4bf62a62445 100644 --- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java @@ -2,12 +2,12 @@ package ai.vespa.metricsproxy.http.application; import ai.vespa.metricsproxy.http.metrics.MetricsV1Handler; +import ai.vespa.metricsproxy.metric.model.ConsumerId; import ai.vespa.metricsproxy.metric.model.MetricsPacket; import com.github.tomakehurst.wiremock.junit.WireMockClassRule; import com.yahoo.test.ManualClock; -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -16,6 +16,7 @@ import org.junit.Test; import java.net.URI; import java.time.Duration; import java.util.List; +import java.util.concurrent.ExecutionException; import static ai.vespa.metricsproxy.TestUtil.getFileContents; import static ai.vespa.metricsproxy.http.ValuesFetcher.defaultMetricsConsumerId; @@ -41,7 +42,7 @@ public class NodeMetricsClientTest { private static final String TEST_FILE = "generic-sample.json"; private static final String RESPONSE = getFileContents(TEST_FILE); - private static final CloseableHttpClient httpClient = HttpClients.createDefault(); + private static final CloseableHttpAsyncClient httpClient = ApplicationMetricsRetriever.createHttpClient(); private static final String CPU_METRIC = "cpu.util"; private static final String REPLACED_CPU_METRIC = "replaced_cpu_util"; @@ -51,7 +52,6 @@ public class NodeMetricsClientTest { private static Node node; - private ManualClock clock; private NodeMetricsClient nodeMetricsClient; @ClassRule @@ -78,8 +78,8 @@ public class NodeMetricsClientTest { @Before public void setupClient() { - clock = new ManualClock(); - nodeMetricsClient = new NodeMetricsClient(httpClient, node, clock); + httpClient.start(); + nodeMetricsClient = new NodeMetricsClient(httpClient, node, new ManualClock()); } @Test @@ -88,10 +88,10 @@ public class NodeMetricsClientTest { } @Test - public void metrics_are_retrieved_upon_first_update() { + public void metrics_are_retrieved_upon_first_update() throws InterruptedException, ExecutionException { assertEquals(0, nodeMetricsClient.getMetrics(defaultMetricsConsumerId).size()); assertEquals(0, nodeMetricsClient.snapshotsRetrieved()); - assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); + updateSnapshot(defaultMetricsConsumerId, TTL); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); List<MetricsPacket> metrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); @@ -101,31 +101,31 @@ public class NodeMetricsClientTest { @Test public void metrics_are_refreshed_on_every_update() { assertEquals(0, nodeMetricsClient.snapshotsRetrieved()); - assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); + updateSnapshot(defaultMetricsConsumerId, TTL); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); - assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO)); + updateSnapshot(defaultMetricsConsumerId, Duration.ZERO); assertEquals(2, nodeMetricsClient.snapshotsRetrieved()); } @Test public void metrics_are_not_refreshed_if_ttl_not_expired() { assertEquals(0, nodeMetricsClient.snapshotsRetrieved()); - assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); + updateSnapshot(defaultMetricsConsumerId, TTL); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); - assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); + updateSnapshot(defaultMetricsConsumerId, TTL); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); - assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO)); + updateSnapshot(defaultMetricsConsumerId, Duration.ZERO); assertEquals(2, nodeMetricsClient.snapshotsRetrieved()); } @Test public void metrics_for_different_consumers_are_cached_separately() { - assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId,TTL)); + updateSnapshot(defaultMetricsConsumerId, TTL); List<MetricsPacket> defaultMetrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); assertEquals(4, defaultMetrics.size()); - assertTrue(nodeMetricsClient.updateSnapshots(toConsumerId(CUSTOM_CONSUMER), TTL)); + updateSnapshot(toConsumerId(CUSTOM_CONSUMER), TTL); List<MetricsPacket> customMetrics = nodeMetricsClient.getMetrics(toConsumerId(CUSTOM_CONSUMER)); assertEquals(2, nodeMetricsClient.snapshotsRetrieved()); assertEquals(4, customMetrics.size()); @@ -133,4 +133,13 @@ public class NodeMetricsClientTest { MetricsPacket replacedCpuMetric = customMetrics.get(0); assertTrue(replacedCpuMetric.metrics().containsKey(toMetricId(REPLACED_CPU_METRIC))); } + private void updateSnapshot(ConsumerId consumerId, Duration ttl) { + + var optional = nodeMetricsClient.startSnapshotUpdate(consumerId, ttl); + optional.ifPresent(future -> { + try { + assertTrue(future.get()); + } catch (InterruptedException | ExecutionException e) {} + }); + } } |