diff options
7 files changed, 162 insertions, 65 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java index f8a6f47f946..7eb59917f9a 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java @@ -50,6 +50,7 @@ public class ApplicationMetricsHandler extends HttpHandlerBase { super(executor); this.metricsRetriever = metricsRetriever; this.metricsConsumers = metricsConsumers; + metricsRetriever.startPollAnwWait(); } @Override diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java index ad5e01744c5..e5336a58c4e 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java @@ -6,26 +6,32 @@ import ai.vespa.metricsproxy.metric.model.MetricsPacket; import ai.vespa.util.http.hc5.VespaHttpClientBuilder; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.ThreadFactoryFactory; import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.core5.util.Timeout; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.time.Clock; import java.time.Duration; -import java.util.AbstractMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import static ai.vespa.metricsproxy.http.ValuesFetcher.defaultMetricsConsumerId; -import static java.util.Collections.emptyList; -import static java.util.stream.Collectors.toMap; /** * This class retrieves metrics from all nodes in the given config, usually all @@ -33,7 +39,7 @@ import static java.util.stream.Collectors.toMap; * * @author gjoranv */ -public class ApplicationMetricsRetriever extends AbstractComponent { +public class ApplicationMetricsRetriever extends AbstractComponent implements Runnable { private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName()); @@ -43,10 +49,15 @@ public class ApplicationMetricsRetriever extends AbstractComponent { private static final int HTTP_CONNECT_TIMEOUT = 5000; private static final int HTTP_SOCKET_TIMEOUT = 30000; + private static final Duration METRICS_TTL = Duration.ofSeconds(30); private final HttpClient httpClient = createHttpClient(); private final List<NodeMetricsClient> clients; - private final ForkJoinPool forkJoinPool; + private final ExecutorService fetchPool; + private final Thread pollThread; + private final Set<ConsumerId> consumerSet; + private long pollCount = 0; + private boolean stopped; // Non-final for testing private Duration taskTimeout; @@ -56,12 +67,52 @@ public class ApplicationMetricsRetriever extends AbstractComponent { clients = createNodeClients(nodesConfig); int numThreads = Math.min(clients.size(), MAX_THREADS); taskTimeout = timeout(clients.size(), numThreads); - forkJoinPool = new ForkJoinPool(numThreads); + fetchPool = Executors.newFixedThreadPool(numThreads, ThreadFactoryFactory.getDaemonThreadFactory("metrics-fetcher")); + stopped = false; + consumerSet = new HashSet<>(); + consumerSet.add(defaultMetricsConsumerId); + pollThread = new Thread(this, "metrics-poller"); + pollThread.setDaemon(true); + pollThread.start(); + } + + @Override + public void run() { + try { + while (true) { + ConsumerId [] consumers; + synchronized (pollThread) { + consumers = consumerSet.toArray(new ConsumerId[0]); + } + for (ConsumerId consumer : consumers) { + int numFailed = fetchMetricsAsync(consumer); + if (numFailed > 0 ) { + log.log(Level.WARNING, "Updated metrics for consumer '" + consumer +"' failed for " + numFailed + " services"); + } else { + log.log(Level.FINE, "Updated metrics for consumer '" + consumer +"'."); + } + } + Duration timeUntilNextPoll = Duration.ofMillis(1000); + synchronized (pollThread) { + pollCount++; + pollThread.notifyAll(); + pollThread.wait(timeUntilNextPoll.toMillis()); + if (stopped) return; + } + } + } catch (InterruptedException e) {} } @Override public void deconstruct() { - forkJoinPool.shutdownNow(); + synchronized (pollThread) { + stopped = true; + pollThread.notifyAll(); + } + fetchPool.shutdownNow(); + try { + pollThread.join(); + } catch (InterruptedException e) {} super.deconstruct(); } @@ -70,31 +121,62 @@ public class ApplicationMetricsRetriever extends AbstractComponent { } public Map<Node, List<MetricsPacket>> getMetrics(ConsumerId consumer) { - log.log(Level.FINE, () -> "Retrieving metrics from " + clients.size() + " nodes."); - var forkJoinTask = forkJoinPool.submit(() -> clients.parallelStream() - .map(client -> getNodeMetrics(client, consumer)) - .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))); + log.log(Level.INFO, () -> "Retrieving metrics from " + clients.size() + " nodes."); + synchronized (pollThread) { + if (consumerSet.add(consumer)) { + // Wakeup poll thread first time we see a new consumer + pollThread.notifyAll(); + } + } + Map<Node, List<MetricsPacket>> metrics = new HashMap<>(); + for (NodeMetricsClient client : clients) { + metrics.put(client.node, client.getMetrics(consumer)); + } + return metrics; + } + void startPollAnwWait() { try { - var metricsByNode = forkJoinTask.get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS); - - log.log(Level.FINE, () -> "Finished retrieving metrics from " + clients.size() + " nodes."); - return metricsByNode; + synchronized (pollThread) { + if ( ! pollThread.isAlive()) { + pollThread.start(); + } + long before = pollCount; + pollThread.notifyAll(); + while (pollCount == before) { + pollThread.wait(); + } + } + } catch (InterruptedException e) {} + } - } catch (Exception e) { - forkJoinTask.cancel(true); - // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException - throw new ApplicationMetricsException("Failed retrieving metrics.", e); + private int fetchMetricsAsync(ConsumerId consumer) { + Map<Node, Future<Boolean>> futures = new HashMap<>(); + for (NodeMetricsClient client : clients) { + futures.put(client.node, fetchPool.submit(() -> updateMetrics(client, consumer))); + } + int numOk = 0; + int numTried = futures.size(); + for (Map.Entry<Node, Future<Boolean>> entry : futures.entrySet()) { + try { + Boolean result = entry.getValue().get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS); + if (result != null && result) numOk++; + } catch (InterruptedException | ExecutionException | TimeoutException e) { + // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException + log.log(Level.WARNING, "Failed retrieving metrics for '" + entry.getKey() + "' : ", e); + } } + log.log(Level.INFO, () -> "Finished retrieving metrics from " + clients.size() + " nodes."); + return numTried - numOk; } - private Map.Entry<Node, List<MetricsPacket>> getNodeMetrics(NodeMetricsClient client, ConsumerId consumer) { + private boolean updateMetrics(NodeMetricsClient client, ConsumerId consumer) { try { - return new AbstractMap.SimpleEntry<>(client.node, client.getMetrics(consumer)); + return client.updateSnapshots(consumer, METRICS_TTL); } catch (Exception e) { log.log(Level.WARNING, "Could not retrieve metrics from " + client.node.metricsUri(consumer), e); + return false; } - return new AbstractMap.SimpleEntry<>(client.node, emptyList()); } private List<NodeMetricsClient> createNodeClients(MetricsNodesConfig nodesConfig) { diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java index c439a037774..c9be238cb2b 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java @@ -55,4 +55,10 @@ public class Node { public int hashCode() { return Objects.hash(role, hostname, port, path); } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(role).append(":").append(metricsUriBase); + return sb.toString(); + } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java index 2e17443e821..f67d418f542 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java @@ -17,6 +17,7 @@ import java.time.Instant; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import static ai.vespa.metricsproxy.metric.model.processing.MetricsProcessor.applyProcessors; @@ -38,7 +39,6 @@ public class NodeMetricsClient { private static final Logger log = Logger.getLogger(NodeMetricsClient.class.getName()); - static final Duration METRICS_TTL = Duration.ofSeconds(30); private static final int MAX_DIMENSIONS = 10; final Node node; @@ -46,7 +46,7 @@ public class NodeMetricsClient { private final Clock clock; private final Map<ConsumerId, Snapshot> snapshots = new ConcurrentHashMap<>(); - private long snapshotsRetrieved = 0; + private AtomicLong snapshotsRetrieved = new AtomicLong(); NodeMetricsClient(HttpClient httpClient, Node node, Clock clock) { this.httpClient = httpClient; @@ -54,15 +54,18 @@ public class NodeMetricsClient { this.clock = clock; } - public List<MetricsPacket> getMetrics(ConsumerId consumer) { - var currentSnapshot = snapshots.get(consumer); - if (currentSnapshot == null || currentSnapshot.isStale(clock) || currentSnapshot.metrics.isEmpty()) { - Snapshot snapshot = retrieveMetrics(consumer); - snapshots.put(consumer, snapshot); - return snapshot.metrics; - } else { - return snapshots.get(consumer).metrics; - } + List<MetricsPacket> getMetrics(ConsumerId consumer) { + var snapshot = snapshots.get(consumer); + return (snapshot != null) ? snapshot.metrics : List.of(); + } + + boolean updateSnapshots(ConsumerId consumer, Duration ttl) { + var snapshot = snapshots.get(consumer); + if ((snapshot) != null && clock.instant().isBefore(snapshot.timestamp.plus(ttl))) return true; + + snapshot = retrieveMetrics(consumer); + snapshots.put(consumer, snapshot); + return ! snapshot.metrics.isEmpty(); } private Snapshot retrieveMetrics(ConsumerId consumer) { @@ -76,7 +79,7 @@ public class NodeMetricsClient { new ServiceIdDimensionProcessor(), new ClusterIdDimensionProcessor(), new PublicDimensionsProcessor(MAX_DIMENSIONS)); - snapshotsRetrieved ++; + snapshotsRetrieved.incrementAndGet(); log.log(FINE, () -> "Successfully retrieved " + metrics.size() + " metrics packets from " + metricsUri); return new Snapshot(Instant.now(clock), metrics); @@ -95,7 +98,7 @@ public class NodeMetricsClient { } long snapshotsRetrieved() { - return snapshotsRetrieved; + return snapshotsRetrieved.get(); } /** @@ -110,10 +113,6 @@ public class NodeMetricsClient { this.timestamp = timestamp; this.metrics = metrics; } - - boolean isStale(Clock clock) { - return Instant.now(clock).isAfter(timestamp.plus(METRICS_TTL)); - } } } diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java index cf1eac3c691..563ecab1346 100644 --- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java @@ -4,6 +4,7 @@ package ai.vespa.metricsproxy.http.application; import ai.vespa.metricsproxy.core.ConsumersConfig; import ai.vespa.metricsproxy.core.MetricsConsumers; import ai.vespa.metricsproxy.metric.dimensions.PublicDimensions; +import ai.vespa.metricsproxy.metric.model.ConsumerId; import ai.vespa.metricsproxy.metric.model.json.GenericApplicationModel; import ai.vespa.metricsproxy.metric.model.json.GenericJsonModel; import ai.vespa.metricsproxy.metric.model.json.GenericMetrics; @@ -84,6 +85,8 @@ public class ApplicationMetricsHandlerTest { ApplicationMetricsHandler handler = new ApplicationMetricsHandler(Executors.newSingleThreadExecutor(), applicationMetricsRetriever, getMetricsConsumers()); + applicationMetricsRetriever.getMetrics(ConsumerId.toConsumerId(CUSTOM_CONSUMER)); + applicationMetricsRetriever.startPollAnwWait(); testDriver = new RequestHandlerTestDriver(handler); } diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java index a74989f7af6..d01001b9e93 100644 --- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java @@ -1,6 +1,7 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.metricsproxy.http.application; +import ai.vespa.metricsproxy.metric.model.MetricsPacket; import com.github.tomakehurst.wiremock.junit.WireMockRule; import org.junit.Before; import org.junit.Rule; @@ -8,6 +9,7 @@ import org.junit.Test; import java.time.Duration; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeoutException; import static ai.vespa.metricsproxy.TestUtil.getFileContents; @@ -56,6 +58,7 @@ public class ApplicationMetricsRetrieverTest { .willReturn(aResponse().withBody(RESPONSE))); ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + retriever.startPollAnwWait(); var metricsByNode = retriever.getMetrics(); assertEquals(1, metricsByNode.size()); assertEquals(4, metricsByNode.get(node).size()); @@ -73,6 +76,7 @@ public class ApplicationMetricsRetrieverTest { .willReturn(aResponse().withBody(RESPONSE))); ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + retriever.startPollAnwWait(); var metricsByNode = retriever.getMetrics(); assertEquals(2, metricsByNode.size()); assertEquals(4, metricsByNode.get(node0).size()); @@ -101,6 +105,7 @@ public class ApplicationMetricsRetrieverTest { .willReturn(aResponse().withBody(RESPONSE))); ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + retriever.startPollAnwWait(); var metricsByNode = retriever.getMetrics(); assertEquals(2, metricsByNode.size()); assertEquals(0, metricsByNode.get(node0).size()); @@ -110,7 +115,7 @@ public class ApplicationMetricsRetrieverTest { @Test public void an_exception_is_thrown_when_retrieving_times_out() { var config = nodesConfig("/node0"); - + Node node = new Node(config.node(0)); wireMockRule.stubFor(get(urlPathEqualTo(config.node(0).metricsPath())) .willReturn(aResponse() .withBody(RESPONSE) @@ -118,13 +123,9 @@ public class ApplicationMetricsRetrieverTest { ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); retriever.setTaskTimeout(Duration.ofMillis(1)); + retriever.startPollAnwWait(); + assertTrue(retriever.getMetrics().get(node).isEmpty()); - try { - retriever.getMetrics(); - fail("Did not get expected exception"); - } catch (ApplicationMetricsException expected) { - assertTrue(expected.getCause() instanceof TimeoutException); - } } @Test @@ -139,12 +140,8 @@ public class ApplicationMetricsRetrieverTest { ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); retriever.setTaskTimeout(Duration.ofMillis(1)); - try { - retriever.getMetrics(); - fail("Did not get expected exception"); - } catch (ApplicationMetricsException expected) { - } - + retriever.startPollAnwWait(); + assertTrue(retriever.getMetrics().get(node).isEmpty()); // Verify successful retrieving wireMockRule.removeStubMapping(delayedStub); verifyRetrievingMetricsFromSingleNode(config, node); diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java index ab84a4edcde..6644579147f 100644 --- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java @@ -14,6 +14,7 @@ import org.junit.ClassRule; import org.junit.Test; import java.net.URI; +import java.time.Duration; import java.util.List; import static ai.vespa.metricsproxy.TestUtil.getFileContents; @@ -45,6 +46,8 @@ public class NodeMetricsClientTest { private static final String CPU_METRIC = "cpu.util"; private static final String REPLACED_CPU_METRIC = "replaced_cpu_util"; private static final String CUSTOM_CONSUMER = "custom-consumer"; + private static final Duration TTL = Duration.ofSeconds(30); + private static Node node; @@ -85,38 +88,44 @@ public class NodeMetricsClientTest { } @Test - public void metrics_are_retrieved_upon_first_request() { + public void metrics_are_retrieved_upon_first_update() { + assertEquals(0, nodeMetricsClient.getMetrics(defaultMetricsConsumerId).size()); + assertEquals(0, nodeMetricsClient.snapshotsRetrieved()); + assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); + assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); List<MetricsPacket> metrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); assertEquals(4, metrics.size()); } @Test - public void cached_metrics_are_used_when_ttl_has_not_expired() { - nodeMetricsClient.getMetrics(defaultMetricsConsumerId); - assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); - - clock.advance(NodeMetricsClient.METRICS_TTL.minusMillis(1)); - nodeMetricsClient.getMetrics(defaultMetricsConsumerId); + public void metrics_are_refreshed_on_every_update() { + assertEquals(0, nodeMetricsClient.snapshotsRetrieved()); + assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); + assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO)); + assertEquals(2, nodeMetricsClient.snapshotsRetrieved()); } @Test - public void metrics_are_refreshed_when_ttl_has_expired() { - nodeMetricsClient.getMetrics(defaultMetricsConsumerId); + public void metrics_are_not_refreshed_if_ttl_not_expired() { + assertEquals(0, nodeMetricsClient.snapshotsRetrieved()); + assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); - - clock.advance(NodeMetricsClient.METRICS_TTL.plusMillis(1)); - nodeMetricsClient.getMetrics(defaultMetricsConsumerId); + assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL)); + assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); + assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO)); assertEquals(2, nodeMetricsClient.snapshotsRetrieved()); } @Test public void metrics_for_different_consumers_are_cached_separately() { + assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId,TTL)); List<MetricsPacket> defaultMetrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId); assertEquals(1, nodeMetricsClient.snapshotsRetrieved()); assertEquals(4, defaultMetrics.size()); + assertTrue(nodeMetricsClient.updateSnapshots(toConsumerId(CUSTOM_CONSUMER), TTL)); List<MetricsPacket> customMetrics = nodeMetricsClient.getMetrics(toConsumerId(CUSTOM_CONSUMER)); assertEquals(2, nodeMetricsClient.snapshotsRetrieved()); assertEquals(4, customMetrics.size()); |