aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java1
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java128
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java6
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java33
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java3
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java23
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java33
7 files changed, 162 insertions, 65 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java
index f8a6f47f946..7eb59917f9a 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandler.java
@@ -50,6 +50,7 @@ public class ApplicationMetricsHandler extends HttpHandlerBase {
super(executor);
this.metricsRetriever = metricsRetriever;
this.metricsConsumers = metricsConsumers;
+ metricsRetriever.startPollAnwWait();
}
@Override
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
index ad5e01744c5..e5336a58c4e 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
@@ -6,26 +6,32 @@ import ai.vespa.metricsproxy.metric.model.MetricsPacket;
import ai.vespa.util.http.hc5.VespaHttpClientBuilder;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.ThreadFactoryFactory;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.util.Timeout;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.time.Clock;
import java.time.Duration;
-import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import static ai.vespa.metricsproxy.http.ValuesFetcher.defaultMetricsConsumerId;
-import static java.util.Collections.emptyList;
-import static java.util.stream.Collectors.toMap;
/**
* This class retrieves metrics from all nodes in the given config, usually all
@@ -33,7 +39,7 @@ import static java.util.stream.Collectors.toMap;
*
* @author gjoranv
*/
-public class ApplicationMetricsRetriever extends AbstractComponent {
+public class ApplicationMetricsRetriever extends AbstractComponent implements Runnable {
private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName());
@@ -43,10 +49,15 @@ public class ApplicationMetricsRetriever extends AbstractComponent {
private static final int HTTP_CONNECT_TIMEOUT = 5000;
private static final int HTTP_SOCKET_TIMEOUT = 30000;
+ private static final Duration METRICS_TTL = Duration.ofSeconds(30);
private final HttpClient httpClient = createHttpClient();
private final List<NodeMetricsClient> clients;
- private final ForkJoinPool forkJoinPool;
+ private final ExecutorService fetchPool;
+ private final Thread pollThread;
+ private final Set<ConsumerId> consumerSet;
+ private long pollCount = 0;
+ private boolean stopped;
// Non-final for testing
private Duration taskTimeout;
@@ -56,12 +67,52 @@ public class ApplicationMetricsRetriever extends AbstractComponent {
clients = createNodeClients(nodesConfig);
int numThreads = Math.min(clients.size(), MAX_THREADS);
taskTimeout = timeout(clients.size(), numThreads);
- forkJoinPool = new ForkJoinPool(numThreads);
+ fetchPool = Executors.newFixedThreadPool(numThreads, ThreadFactoryFactory.getDaemonThreadFactory("metrics-fetcher"));
+ stopped = false;
+ consumerSet = new HashSet<>();
+ consumerSet.add(defaultMetricsConsumerId);
+ pollThread = new Thread(this, "metrics-poller");
+ pollThread.setDaemon(true);
+ pollThread.start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ ConsumerId [] consumers;
+ synchronized (pollThread) {
+ consumers = consumerSet.toArray(new ConsumerId[0]);
+ }
+ for (ConsumerId consumer : consumers) {
+ int numFailed = fetchMetricsAsync(consumer);
+ if (numFailed > 0 ) {
+ log.log(Level.WARNING, "Updated metrics for consumer '" + consumer +"' failed for " + numFailed + " services");
+ } else {
+ log.log(Level.FINE, "Updated metrics for consumer '" + consumer +"'.");
+ }
+ }
+ Duration timeUntilNextPoll = Duration.ofMillis(1000);
+ synchronized (pollThread) {
+ pollCount++;
+ pollThread.notifyAll();
+ pollThread.wait(timeUntilNextPoll.toMillis());
+ if (stopped) return;
+ }
+ }
+ } catch (InterruptedException e) {}
}
@Override
public void deconstruct() {
- forkJoinPool.shutdownNow();
+ synchronized (pollThread) {
+ stopped = true;
+ pollThread.notifyAll();
+ }
+ fetchPool.shutdownNow();
+ try {
+ pollThread.join();
+ } catch (InterruptedException e) {}
super.deconstruct();
}
@@ -70,31 +121,62 @@ public class ApplicationMetricsRetriever extends AbstractComponent {
}
public Map<Node, List<MetricsPacket>> getMetrics(ConsumerId consumer) {
- log.log(Level.FINE, () -> "Retrieving metrics from " + clients.size() + " nodes.");
- var forkJoinTask = forkJoinPool.submit(() -> clients.parallelStream()
- .map(client -> getNodeMetrics(client, consumer))
- .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ log.log(Level.INFO, () -> "Retrieving metrics from " + clients.size() + " nodes.");
+ synchronized (pollThread) {
+ if (consumerSet.add(consumer)) {
+ // Wakeup poll thread first time we see a new consumer
+ pollThread.notifyAll();
+ }
+ }
+ Map<Node, List<MetricsPacket>> metrics = new HashMap<>();
+ for (NodeMetricsClient client : clients) {
+ metrics.put(client.node, client.getMetrics(consumer));
+ }
+ return metrics;
+ }
+ void startPollAnwWait() {
try {
- var metricsByNode = forkJoinTask.get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS);
-
- log.log(Level.FINE, () -> "Finished retrieving metrics from " + clients.size() + " nodes.");
- return metricsByNode;
+ synchronized (pollThread) {
+ if ( ! pollThread.isAlive()) {
+ pollThread.start();
+ }
+ long before = pollCount;
+ pollThread.notifyAll();
+ while (pollCount == before) {
+ pollThread.wait();
+ }
+ }
+ } catch (InterruptedException e) {}
+ }
- } catch (Exception e) {
- forkJoinTask.cancel(true);
- // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException
- throw new ApplicationMetricsException("Failed retrieving metrics.", e);
+ private int fetchMetricsAsync(ConsumerId consumer) {
+ Map<Node, Future<Boolean>> futures = new HashMap<>();
+ for (NodeMetricsClient client : clients) {
+ futures.put(client.node, fetchPool.submit(() -> updateMetrics(client, consumer)));
+ }
+ int numOk = 0;
+ int numTried = futures.size();
+ for (Map.Entry<Node, Future<Boolean>> entry : futures.entrySet()) {
+ try {
+ Boolean result = entry.getValue().get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ if (result != null && result) numOk++;
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException
+ log.log(Level.WARNING, "Failed retrieving metrics for '" + entry.getKey() + "' : ", e);
+ }
}
+ log.log(Level.INFO, () -> "Finished retrieving metrics from " + clients.size() + " nodes.");
+ return numTried - numOk;
}
- private Map.Entry<Node, List<MetricsPacket>> getNodeMetrics(NodeMetricsClient client, ConsumerId consumer) {
+ private boolean updateMetrics(NodeMetricsClient client, ConsumerId consumer) {
try {
- return new AbstractMap.SimpleEntry<>(client.node, client.getMetrics(consumer));
+ return client.updateSnapshots(consumer, METRICS_TTL);
} catch (Exception e) {
log.log(Level.WARNING, "Could not retrieve metrics from " + client.node.metricsUri(consumer), e);
+ return false;
}
- return new AbstractMap.SimpleEntry<>(client.node, emptyList());
}
private List<NodeMetricsClient> createNodeClients(MetricsNodesConfig nodesConfig) {
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
index c439a037774..c9be238cb2b 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
@@ -55,4 +55,10 @@ public class Node {
public int hashCode() {
return Objects.hash(role, hostname, port, path);
}
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(role).append(":").append(metricsUriBase);
+ return sb.toString();
+ }
}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
index 2e17443e821..f67d418f542 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
@@ -17,6 +17,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import static ai.vespa.metricsproxy.metric.model.processing.MetricsProcessor.applyProcessors;
@@ -38,7 +39,6 @@ public class NodeMetricsClient {
private static final Logger log = Logger.getLogger(NodeMetricsClient.class.getName());
- static final Duration METRICS_TTL = Duration.ofSeconds(30);
private static final int MAX_DIMENSIONS = 10;
final Node node;
@@ -46,7 +46,7 @@ public class NodeMetricsClient {
private final Clock clock;
private final Map<ConsumerId, Snapshot> snapshots = new ConcurrentHashMap<>();
- private long snapshotsRetrieved = 0;
+ private AtomicLong snapshotsRetrieved = new AtomicLong();
NodeMetricsClient(HttpClient httpClient, Node node, Clock clock) {
this.httpClient = httpClient;
@@ -54,15 +54,18 @@ public class NodeMetricsClient {
this.clock = clock;
}
- public List<MetricsPacket> getMetrics(ConsumerId consumer) {
- var currentSnapshot = snapshots.get(consumer);
- if (currentSnapshot == null || currentSnapshot.isStale(clock) || currentSnapshot.metrics.isEmpty()) {
- Snapshot snapshot = retrieveMetrics(consumer);
- snapshots.put(consumer, snapshot);
- return snapshot.metrics;
- } else {
- return snapshots.get(consumer).metrics;
- }
+ List<MetricsPacket> getMetrics(ConsumerId consumer) {
+ var snapshot = snapshots.get(consumer);
+ return (snapshot != null) ? snapshot.metrics : List.of();
+ }
+
+ boolean updateSnapshots(ConsumerId consumer, Duration ttl) {
+ var snapshot = snapshots.get(consumer);
+ if ((snapshot) != null && clock.instant().isBefore(snapshot.timestamp.plus(ttl))) return true;
+
+ snapshot = retrieveMetrics(consumer);
+ snapshots.put(consumer, snapshot);
+ return ! snapshot.metrics.isEmpty();
}
private Snapshot retrieveMetrics(ConsumerId consumer) {
@@ -76,7 +79,7 @@ public class NodeMetricsClient {
new ServiceIdDimensionProcessor(),
new ClusterIdDimensionProcessor(),
new PublicDimensionsProcessor(MAX_DIMENSIONS));
- snapshotsRetrieved ++;
+ snapshotsRetrieved.incrementAndGet();
log.log(FINE, () -> "Successfully retrieved " + metrics.size() + " metrics packets from " + metricsUri);
return new Snapshot(Instant.now(clock), metrics);
@@ -95,7 +98,7 @@ public class NodeMetricsClient {
}
long snapshotsRetrieved() {
- return snapshotsRetrieved;
+ return snapshotsRetrieved.get();
}
/**
@@ -110,10 +113,6 @@ public class NodeMetricsClient {
this.timestamp = timestamp;
this.metrics = metrics;
}
-
- boolean isStale(Clock clock) {
- return Instant.now(clock).isAfter(timestamp.plus(METRICS_TTL));
- }
}
}
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java
index cf1eac3c691..563ecab1346 100644
--- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsHandlerTest.java
@@ -4,6 +4,7 @@ package ai.vespa.metricsproxy.http.application;
import ai.vespa.metricsproxy.core.ConsumersConfig;
import ai.vespa.metricsproxy.core.MetricsConsumers;
import ai.vespa.metricsproxy.metric.dimensions.PublicDimensions;
+import ai.vespa.metricsproxy.metric.model.ConsumerId;
import ai.vespa.metricsproxy.metric.model.json.GenericApplicationModel;
import ai.vespa.metricsproxy.metric.model.json.GenericJsonModel;
import ai.vespa.metricsproxy.metric.model.json.GenericMetrics;
@@ -84,6 +85,8 @@ public class ApplicationMetricsHandlerTest {
ApplicationMetricsHandler handler = new ApplicationMetricsHandler(Executors.newSingleThreadExecutor(),
applicationMetricsRetriever,
getMetricsConsumers());
+ applicationMetricsRetriever.getMetrics(ConsumerId.toConsumerId(CUSTOM_CONSUMER));
+ applicationMetricsRetriever.startPollAnwWait();
testDriver = new RequestHandlerTestDriver(handler);
}
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
index a74989f7af6..d01001b9e93 100644
--- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
@@ -1,6 +1,7 @@
// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.metricsproxy.http.application;
+import ai.vespa.metricsproxy.metric.model.MetricsPacket;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import org.junit.Before;
import org.junit.Rule;
@@ -8,6 +9,7 @@ import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeoutException;
import static ai.vespa.metricsproxy.TestUtil.getFileContents;
@@ -56,6 +58,7 @@ public class ApplicationMetricsRetrieverTest {
.willReturn(aResponse().withBody(RESPONSE)));
ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ retriever.startPollAnwWait();
var metricsByNode = retriever.getMetrics();
assertEquals(1, metricsByNode.size());
assertEquals(4, metricsByNode.get(node).size());
@@ -73,6 +76,7 @@ public class ApplicationMetricsRetrieverTest {
.willReturn(aResponse().withBody(RESPONSE)));
ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ retriever.startPollAnwWait();
var metricsByNode = retriever.getMetrics();
assertEquals(2, metricsByNode.size());
assertEquals(4, metricsByNode.get(node0).size());
@@ -101,6 +105,7 @@ public class ApplicationMetricsRetrieverTest {
.willReturn(aResponse().withBody(RESPONSE)));
ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ retriever.startPollAnwWait();
var metricsByNode = retriever.getMetrics();
assertEquals(2, metricsByNode.size());
assertEquals(0, metricsByNode.get(node0).size());
@@ -110,7 +115,7 @@ public class ApplicationMetricsRetrieverTest {
@Test
public void an_exception_is_thrown_when_retrieving_times_out() {
var config = nodesConfig("/node0");
-
+ Node node = new Node(config.node(0));
wireMockRule.stubFor(get(urlPathEqualTo(config.node(0).metricsPath()))
.willReturn(aResponse()
.withBody(RESPONSE)
@@ -118,13 +123,9 @@ public class ApplicationMetricsRetrieverTest {
ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
retriever.setTaskTimeout(Duration.ofMillis(1));
+ retriever.startPollAnwWait();
+ assertTrue(retriever.getMetrics().get(node).isEmpty());
- try {
- retriever.getMetrics();
- fail("Did not get expected exception");
- } catch (ApplicationMetricsException expected) {
- assertTrue(expected.getCause() instanceof TimeoutException);
- }
}
@Test
@@ -139,12 +140,8 @@ public class ApplicationMetricsRetrieverTest {
ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
retriever.setTaskTimeout(Duration.ofMillis(1));
- try {
- retriever.getMetrics();
- fail("Did not get expected exception");
- } catch (ApplicationMetricsException expected) {
- }
-
+ retriever.startPollAnwWait();
+ assertTrue(retriever.getMetrics().get(node).isEmpty());
// Verify successful retrieving
wireMockRule.removeStubMapping(delayedStub);
verifyRetrievingMetricsFromSingleNode(config, node);
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java
index ab84a4edcde..6644579147f 100644
--- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/NodeMetricsClientTest.java
@@ -14,6 +14,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import java.net.URI;
+import java.time.Duration;
import java.util.List;
import static ai.vespa.metricsproxy.TestUtil.getFileContents;
@@ -45,6 +46,8 @@ public class NodeMetricsClientTest {
private static final String CPU_METRIC = "cpu.util";
private static final String REPLACED_CPU_METRIC = "replaced_cpu_util";
private static final String CUSTOM_CONSUMER = "custom-consumer";
+ private static final Duration TTL = Duration.ofSeconds(30);
+
private static Node node;
@@ -85,38 +88,44 @@ public class NodeMetricsClientTest {
}
@Test
- public void metrics_are_retrieved_upon_first_request() {
+ public void metrics_are_retrieved_upon_first_update() {
+ assertEquals(0, nodeMetricsClient.getMetrics(defaultMetricsConsumerId).size());
+ assertEquals(0, nodeMetricsClient.snapshotsRetrieved());
+ assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
+ assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
List<MetricsPacket> metrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
assertEquals(4, metrics.size());
}
@Test
- public void cached_metrics_are_used_when_ttl_has_not_expired() {
- nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
- assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
-
- clock.advance(NodeMetricsClient.METRICS_TTL.minusMillis(1));
- nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
+ public void metrics_are_refreshed_on_every_update() {
+ assertEquals(0, nodeMetricsClient.snapshotsRetrieved());
+ assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
+ assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO));
+ assertEquals(2, nodeMetricsClient.snapshotsRetrieved());
}
@Test
- public void metrics_are_refreshed_when_ttl_has_expired() {
- nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
+ public void metrics_are_not_refreshed_if_ttl_not_expired() {
+ assertEquals(0, nodeMetricsClient.snapshotsRetrieved());
+ assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
-
- clock.advance(NodeMetricsClient.METRICS_TTL.plusMillis(1));
- nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
+ assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, TTL));
+ assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
+ assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId, Duration.ZERO));
assertEquals(2, nodeMetricsClient.snapshotsRetrieved());
}
@Test
public void metrics_for_different_consumers_are_cached_separately() {
+ assertTrue(nodeMetricsClient.updateSnapshots(defaultMetricsConsumerId,TTL));
List<MetricsPacket> defaultMetrics = nodeMetricsClient.getMetrics(defaultMetricsConsumerId);
assertEquals(1, nodeMetricsClient.snapshotsRetrieved());
assertEquals(4, defaultMetrics.size());
+ assertTrue(nodeMetricsClient.updateSnapshots(toConsumerId(CUSTOM_CONSUMER), TTL));
List<MetricsPacket> customMetrics = nodeMetricsClient.getMetrics(toConsumerId(CUSTOM_CONSUMER));
assertEquals(2, nodeMetricsClient.snapshotsRetrieved());
assertEquals(4, customMetrics.size());