aboutsummaryrefslogtreecommitdiffstats
path: root/metrics-proxy/src/main/java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-09-15 09:40:48 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2021-09-15 10:25:55 +0200
commitba51ebe257629167032958c653eb955a71d07be4 (patch)
treece971383b87ae166de062c234474a3cf7d6ed808 /metrics-proxy/src/main/java
parent4f0f0b9cf51b747ac5d3dc2c75564130fb25ec7e (diff)
In order to avoid possible deadlock when calling yourself for metrics when TTL expires, we only fetch metrics in the background.
If it is present use it, if not it will be there next time.
Diffstat (limited to 'metrics-proxy/src/main/java')
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java1
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java128
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java6
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java33
4 files changed, 128 insertions, 40 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java
index f8a6f47f946..7eb59917f9a 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java
@@ -50,6 +50,7 @@ public class ApplicationMetricsHandler extends HttpHandlerBase {
super(executor);
this.metricsRetriever = metricsRetriever;
this.metricsConsumers = metricsConsumers;
+ metricsRetriever.startPollAnwWait();
}
@Override
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
index ad5e01744c5..e5336a58c4e 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
@@ -6,26 +6,32 @@ import ai.vespa.metricsproxy.metric.model.MetricsPacket;
import ai.vespa.util.http.hc5.VespaHttpClientBuilder;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.ThreadFactoryFactory;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.util.Timeout;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.time.Clock;
import java.time.Duration;
-import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import static ai.vespa.metricsproxy.http.ValuesFetcher.defaultMetricsConsumerId;
-import static java.util.Collections.emptyList;
-import static java.util.stream.Collectors.toMap;
/**
* This class retrieves metrics from all nodes in the given config, usually all
@@ -33,7 +39,7 @@ import static java.util.stream.Collectors.toMap;
*
* @author gjoranv
*/
-public class ApplicationMetricsRetriever extends AbstractComponent {
+public class ApplicationMetricsRetriever extends AbstractComponent implements Runnable {
private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName());
@@ -43,10 +49,15 @@ public class ApplicationMetricsRetriever extends AbstractComponent {
private static final int HTTP_CONNECT_TIMEOUT = 5000;
private static final int HTTP_SOCKET_TIMEOUT = 30000;
+ private static final Duration METRICS_TTL = Duration.ofSeconds(30);
private final HttpClient httpClient = createHttpClient();
private final List<NodeMetricsClient> clients;
- private final ForkJoinPool forkJoinPool;
+ private final ExecutorService fetchPool;
+ private final Thread pollThread;
+ private final Set<ConsumerId> consumerSet;
+ private long pollCount = 0;
+ private boolean stopped;
// Non-final for testing
private Duration taskTimeout;
@@ -56,12 +67,52 @@ public class ApplicationMetricsRetriever extends AbstractComponent {
clients = createNodeClients(nodesConfig);
int numThreads = Math.min(clients.size(), MAX_THREADS);
taskTimeout = timeout(clients.size(), numThreads);
- forkJoinPool = new ForkJoinPool(numThreads);
+ fetchPool = Executors.newFixedThreadPool(numThreads, ThreadFactoryFactory.getDaemonThreadFactory("metrics-fetcher"));
+ stopped = false;
+ consumerSet = new HashSet<>();
+ consumerSet.add(defaultMetricsConsumerId);
+ pollThread = new Thread(this, "metrics-poller");
+ pollThread.setDaemon(true);
+ pollThread.start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ ConsumerId [] consumers;
+ synchronized (pollThread) {
+ consumers = consumerSet.toArray(new ConsumerId[0]);
+ }
+ for (ConsumerId consumer : consumers) {
+ int numFailed = fetchMetricsAsync(consumer);
+ if (numFailed > 0 ) {
+ log.log(Level.WARNING, "Updated metrics for consumer '" + consumer +"' failed for " + numFailed + " services");
+ } else {
+ log.log(Level.FINE, "Updated metrics for consumer '" + consumer +"'.");
+ }
+ }
+ Duration timeUntilNextPoll = Duration.ofMillis(1000);
+ synchronized (pollThread) {
+ pollCount++;
+ pollThread.notifyAll();
+ pollThread.wait(timeUntilNextPoll.toMillis());
+ if (stopped) return;
+ }
+ }
+ } catch (InterruptedException e) {}
}
@Override
public void deconstruct() {
- forkJoinPool.shutdownNow();
+ synchronized (pollThread) {
+ stopped = true;
+ pollThread.notifyAll();
+ }
+ fetchPool.shutdownNow();
+ try {
+ pollThread.join();
+ } catch (InterruptedException e) {}
super.deconstruct();
}
@@ -70,31 +121,62 @@ public class ApplicationMetricsRetriever extends AbstractComponent {
}
public Map<Node, List<MetricsPacket>> getMetrics(ConsumerId consumer) {
- log.log(Level.FINE, () -> "Retrieving metrics from " + clients.size() + " nodes.");
- var forkJoinTask = forkJoinPool.submit(() -> clients.parallelStream()
- .map(client -> getNodeMetrics(client, consumer))
- .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ log.log(Level.INFO, () -> "Retrieving metrics from " + clients.size() + " nodes.");
+ synchronized (pollThread) {
+ if (consumerSet.add(consumer)) {
+ // Wakeup poll thread first time we see a new consumer
+ pollThread.notifyAll();
+ }
+ }
+ Map<Node, List<MetricsPacket>> metrics = new HashMap<>();
+ for (NodeMetricsClient client : clients) {
+ metrics.put(client.node, client.getMetrics(consumer));
+ }
+ return metrics;
+ }
+ void startPollAnwWait() {
try {
- var metricsByNode = forkJoinTask.get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS);
-
- log.log(Level.FINE, () -> "Finished retrieving metrics from " + clients.size() + " nodes.");
- return metricsByNode;
+ synchronized (pollThread) {
+ if ( ! pollThread.isAlive()) {
+ pollThread.start();
+ }
+ long before = pollCount;
+ pollThread.notifyAll();
+ while (pollCount == before) {
+ pollThread.wait();
+ }
+ }
+ } catch (InterruptedException e) {}
+ }
- } catch (Exception e) {
- forkJoinTask.cancel(true);
- // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException
- throw new ApplicationMetricsException("Failed retrieving metrics.", e);
+ private int fetchMetricsAsync(ConsumerId consumer) {
+ Map<Node, Future<Boolean>> futures = new HashMap<>();
+ for (NodeMetricsClient client : clients) {
+ futures.put(client.node, fetchPool.submit(() -> updateMetrics(client, consumer)));
+ }
+ int numOk = 0;
+ int numTried = futures.size();
+ for (Map.Entry<Node, Future<Boolean>> entry : futures.entrySet()) {
+ try {
+ Boolean result = entry.getValue().get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ if (result != null && result) numOk++;
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException
+ log.log(Level.WARNING, "Failed retrieving metrics for '" + entry.getKey() + "' : ", e);
+ }
}
+ log.log(Level.INFO, () -> "Finished retrieving metrics from " + clients.size() + " nodes.");
+ return numTried - numOk;
}
- private Map.Entry<Node, List<MetricsPacket>> getNodeMetrics(NodeMetricsClient client, ConsumerId consumer) {
+ private boolean updateMetrics(NodeMetricsClient client, ConsumerId consumer) {
try {
- return new AbstractMap.SimpleEntry<>(client.node, client.getMetrics(consumer));
+ return client.updateSnapshots(consumer, METRICS_TTL);
} catch (Exception e) {
log.log(Level.WARNING, "Could not retrieve metrics from " + client.node.metricsUri(consumer), e);
+ return false;
}
- return new AbstractMap.SimpleEntry<>(client.node, emptyList());
}
private List<NodeMetricsClient> createNodeClients(MetricsNodesConfig nodesConfig) {
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
index c439a037774..c9be238cb2b 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
@@ -55,4 +55,10 @@ public class Node {
public int hashCode() {
return Objects.hash(role, hostname, port, path);
}
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(role).append(":").append(metricsUriBase);
+ return sb.toString();
+ }
}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
index 2e17443e821..f67d418f542 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
@@ -17,6 +17,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import static ai.vespa.metricsproxy.metric.model.processing.MetricsProcessor.applyProcessors;
@@ -38,7 +39,6 @@ public class NodeMetricsClient {
private static final Logger log = Logger.getLogger(NodeMetricsClient.class.getName());
- static final Duration METRICS_TTL = Duration.ofSeconds(30);
private static final int MAX_DIMENSIONS = 10;
final Node node;
@@ -46,7 +46,7 @@ public class NodeMetricsClient {
private final Clock clock;
private final Map<ConsumerId, Snapshot> snapshots = new ConcurrentHashMap<>();
- private long snapshotsRetrieved = 0;
+ private AtomicLong snapshotsRetrieved = new AtomicLong();
NodeMetricsClient(HttpClient httpClient, Node node, Clock clock) {
this.httpClient = httpClient;
@@ -54,15 +54,18 @@ public class NodeMetricsClient {
this.clock = clock;
}
- public List<MetricsPacket> getMetrics(ConsumerId consumer) {
- var currentSnapshot = snapshots.get(consumer);
- if (currentSnapshot == null || currentSnapshot.isStale(clock) || currentSnapshot.metrics.isEmpty()) {
- Snapshot snapshot = retrieveMetrics(consumer);
- snapshots.put(consumer, snapshot);
- return snapshot.metrics;
- } else {
- return snapshots.get(consumer).metrics;
- }
+ List<MetricsPacket> getMetrics(ConsumerId consumer) {
+ var snapshot = snapshots.get(consumer);
+ return (snapshot != null) ? snapshot.metrics : List.of();
+ }
+
+ boolean updateSnapshots(ConsumerId consumer, Duration ttl) {
+ var snapshot = snapshots.get(consumer);
+ if ((snapshot) != null && clock.instant().isBefore(snapshot.timestamp.plus(ttl))) return true;
+
+ snapshot = retrieveMetrics(consumer);
+ snapshots.put(consumer, snapshot);
+ return ! snapshot.metrics.isEmpty();
}
private Snapshot retrieveMetrics(ConsumerId consumer) {
@@ -76,7 +79,7 @@ public class NodeMetricsClient {
new ServiceIdDimensionProcessor(),
new ClusterIdDimensionProcessor(),
new PublicDimensionsProcessor(MAX_DIMENSIONS));
- snapshotsRetrieved ++;
+ snapshotsRetrieved.incrementAndGet();
log.log(FINE, () -> "Successfully retrieved " + metrics.size() + " metrics packets from " + metricsUri);
return new Snapshot(Instant.now(clock), metrics);
@@ -95,7 +98,7 @@ public class NodeMetricsClient {
}
long snapshotsRetrieved() {
- return snapshotsRetrieved;
+ return snapshotsRetrieved.get();
}
/**
@@ -110,10 +113,6 @@ public class NodeMetricsClient {
this.timestamp = timestamp;
this.metrics = metrics;
}
-
- boolean isStale(Clock clock) {
- return Instant.now(clock).isAfter(timestamp.plus(METRICS_TTL));
- }
}
}