aboutsummaryrefslogtreecommitdiffstats
path: root/metrics-proxy
diff options
context:
space:
mode:
authorgjoranv <gv@verizonmedia.com>2019-12-17 17:50:42 +0100
committergjoranv <gv@verizonmedia.com>2019-12-19 12:05:04 +0100
commitc11e86632533e4e35d393364c7c7e7743566a1fa (patch)
tree3fa14d5f78d191fe6c6297638120757582db73ab /metrics-proxy
parent7a915cf61dd3e190a1eb5502715759c9c6e200ff (diff)
Add retriever for metrics from all application nodes.
Diffstat (limited to 'metrics-proxy')
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsException.java16
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java124
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java19
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java2
-rw-r--r--metrics-proxy/src/main/resources/configdefinitions/vespa-nodes.def7
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java169
6 files changed, 336 insertions, 1 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsException.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsException.java
new file mode 100644
index 00000000000..087959d4c73
--- /dev/null
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsException.java
@@ -0,0 +1,16 @@
+/*
+ * Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+ */
+
+package ai.vespa.metricsproxy.http.application;
+
+/**
+ * @author gjoranv
+ */
+class ApplicationMetricsException extends RuntimeException {
+
+ ApplicationMetricsException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
new file mode 100644
index 00000000000..332c070943e
--- /dev/null
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+ */
+
+package ai.vespa.metricsproxy.http.application;
+
+import ai.vespa.metricsproxy.metric.model.MetricsPacket;
+import ai.vespa.util.http.VespaHttpClientBuilder;
+import com.google.inject.Inject;
+import com.yahoo.component.AbstractComponent;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * This class retrieves metrics from all nodes in the given config, usually all
+ * nodes in a Vespa application.
+ *
+ * @author gjoranv
+ */
+public class ApplicationMetricsRetriever extends AbstractComponent {
+ private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName());
+
+ private static final int PARALLELISM = 20;
+ static final Duration MIN_TIMEOUT = Duration.ofSeconds(60);
+ private static final Duration MAX_TIMEOUT = Duration.ofSeconds(240);
+
+ private static final int HTTP_CONNECT_TIMEOUT = 5000;
+ private static final int HTTP_SOCKET_TIMEOUT = 30000;
+
+ private final HttpClient httpClient = createHttpClient();
+ private final List<NodeMetricsClient> clients;
+
+ // Non-final for testing
+ private Duration taskTimeout;
+
+ private ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM);
+
+ @Inject
+ public ApplicationMetricsRetriever(VespaNodesConfig nodesConfig) {
+ clients = createNodeClients(nodesConfig);
+ taskTimeout = timeout(clients.size());
+ }
+
+ @Override
+ public void deconstruct() {
+ forkJoinPool.shutdownNow();
+ super.deconstruct();
+ }
+
+ public Map<Node, List<MetricsPacket.Builder>> getMetrics() {
+ log.info(() -> "Retrieving metrics from " + clients.size() + " nodes.");
+ var forkJoinTask = forkJoinPool.submit(() -> clients.parallelStream()
+ .map(this::getNodeMetrics)
+ .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)));
+
+ try {
+ var metricsByNode = forkJoinTask.get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS);
+
+ log.info(() -> "Successfully retrieved metrics from " + clients.size() + " nodes.");
+ return metricsByNode;
+
+ } catch (Exception e) {
+ // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException
+ forkJoinPool.shutdownNow();
+ forkJoinPool = new ForkJoinPool(PARALLELISM);
+ throw new ApplicationMetricsException("Failed retrieving metrics.", e);
+ }
+ }
+
+ private Map.Entry<Node, List<MetricsPacket.Builder>> getNodeMetrics(NodeMetricsClient client) {
+ try {
+ return new AbstractMap.SimpleEntry<>(client.node, client.getMetrics());
+ } catch (Exception e) {
+ log.warning("Could not retrieve metrics from " + client.node.metricsUri);
+ }
+ return new AbstractMap.SimpleEntry<>(client.node, emptyList());
+ }
+
+ private List<NodeMetricsClient> createNodeClients(VespaNodesConfig nodesConfig) {
+ var clients = new ArrayList<NodeMetricsClient>();
+ for (var nc : nodesConfig.node()) {
+ var node = new Node(nc.configId(), nc.hostname(), nc.port(), nc.path());
+ var client = new NodeMetricsClient(httpClient, node, Clock.systemUTC());
+ clients.add(client);
+ }
+ return clients;
+ }
+
+ private static CloseableHttpClient createHttpClient() {
+ return VespaHttpClientBuilder.create(PoolingHttpClientConnectionManager::new)
+ .setUserAgent("application-metrics-retriever")
+ .setDefaultRequestConfig(RequestConfig.custom()
+ .setConnectTimeout(HTTP_CONNECT_TIMEOUT)
+ .setSocketTimeout(HTTP_SOCKET_TIMEOUT)
+ .build())
+ .build();
+ }
+
+ private static Duration timeout(int clients) {
+ Duration timeout = Duration.ofSeconds(Long.max(MIN_TIMEOUT.toSeconds(), 20 * clients / PARALLELISM));
+ return timeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : timeout;
+ }
+
+ // For testing only!
+ void setTaskTimeout(Duration taskTimeout) {
+ this.taskTimeout = taskTimeout;
+ }
+
+}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
index 06bb2e890ea..c22f81b42d5 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java
@@ -5,6 +5,7 @@
package ai.vespa.metricsproxy.http.application;
import java.net.URI;
+import java.util.Objects;
/**
* Represents a node to retrieve metrics from.
@@ -20,6 +21,10 @@ public class Node {
final URI metricsUri;
+ public Node(VespaNodesConfig.Node nodeConfig) {
+ this(nodeConfig.configId(), nodeConfig.hostname(), nodeConfig.port() ,nodeConfig.path());
+ }
+
public Node(String configId, String host, int port, String path) {
this.configId = configId;
this.host = host;
@@ -32,4 +37,18 @@ public class Node {
return URI.create("http://" + host + ":" + port + path);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Node node = (Node) o;
+ return port == node.port &&
+ configId.equals(node.configId) &&
+ host.equals(node.host);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configId, host, port);
+ }
}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
index 34ee38d44f2..1584a33d772 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java
@@ -33,8 +33,8 @@ public class NodeMetricsClient {
static final Duration METRICS_TTL = Duration.ofSeconds(30);
+ final Node node;
private final HttpClient httpClient;
- private final Node node;
private final Clock clock;
private List<MetricsPacket.Builder> metrics = emptyList();
diff --git a/metrics-proxy/src/main/resources/configdefinitions/vespa-nodes.def b/metrics-proxy/src/main/resources/configdefinitions/vespa-nodes.def
new file mode 100644
index 00000000000..0114e1ecfd2
--- /dev/null
+++ b/metrics-proxy/src/main/resources/configdefinitions/vespa-nodes.def
@@ -0,0 +1,7 @@
+# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package=ai.vespa.metricsproxy.http.application
+
+node[].configId string
+node[].hostname string
+node[].port int
+node[].path string
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
new file mode 100644
index 00000000000..e2bd5c84dc0
--- /dev/null
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+ */
+
+package ai.vespa.metricsproxy.http.application;
+
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import static ai.vespa.metricsproxy.TestUtil.getFileContents;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+import static java.util.stream.Collectors.toList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author gjoranv
+ */
+public class ApplicationMetricsRetrieverTest {
+
+ private static final String TEST_FILE = "generic-sample.json";
+ private static final String RESPONSE = getFileContents(TEST_FILE);
+ private static final String HOST = "localhost";
+
+ private static int port;
+
+ @Rule
+ public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort(), false);
+
+ @Before
+ public void setup() {
+ port = wireMockRule.port();
+ }
+
+ @Test
+ public void metrics_can_be_retrieved() {
+ var config = nodesConfig("/node0");
+ Node node = new Node(config.node(0));
+
+ verifyRetrievingMetricsFromSingleNode(config, node);
+ }
+
+ private void verifyRetrievingMetricsFromSingleNode(VespaNodesConfig config, Node node) {
+ wireMockRule.stubFor(get(urlEqualTo(config.node(0).path()))
+ .willReturn(aResponse().withBody(RESPONSE)));
+
+ ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ var metricsByNode = retriever.getMetrics();
+ assertEquals(1, metricsByNode.size());
+ assertEquals(4, metricsByNode.get(node).size());
+ }
+
+ @Test
+ public void metrics_can_be_retrieved_from_multiple_nodes() {
+ var config = nodesConfig("/node0", "/node1");
+ Node node0 = new Node(config.node(0));
+ Node node1 = new Node(config.node(1));
+
+ wireMockRule.stubFor(get(urlEqualTo(config.node(0).path()))
+ .willReturn(aResponse().withBody(RESPONSE)));
+ wireMockRule.stubFor(get(urlEqualTo(config.node(1).path()))
+ .willReturn(aResponse().withBody(RESPONSE)));
+
+ ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ var metricsByNode = retriever.getMetrics();
+ assertEquals(2, metricsByNode.size());
+ assertEquals(4, metricsByNode.get(node0).size());
+ assertEquals(4, metricsByNode.get(node1).size());
+ }
+
+ @Test
+ public void dead_node_yields_empty_metrics() {
+ var config = nodesConfig("/non-existent");
+ Node node = new Node(config.node(0));
+
+ ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ var metricsByNode = retriever.getMetrics();
+ assertEquals(1, metricsByNode.size());
+ assertEquals(0, metricsByNode.get(node).size());
+
+ }
+
+ @Test
+ public void metrics_from_good_node_are_returned_even_if_another_node_is_dead() {
+ var config = nodesConfig("/node0", "/node1");
+ Node node0 = new Node(config.node(0));
+ Node node1 = new Node(config.node(1));
+
+ wireMockRule.stubFor(get(urlEqualTo(config.node(1).path()))
+ .willReturn(aResponse().withBody(RESPONSE)));
+
+ ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ var metricsByNode = retriever.getMetrics();
+ assertEquals(2, metricsByNode.size());
+ assertEquals(0, metricsByNode.get(node0).size());
+ assertEquals(4, metricsByNode.get(node1).size());
+ }
+
+ @Test
+ public void an_exception_is_thrown_when_retrieving_times_out() {
+ var config = nodesConfig("/node0");
+
+ wireMockRule.stubFor(get(urlEqualTo(config.node(0).path()))
+ .willReturn(aResponse()
+ .withBody(RESPONSE)
+ .withFixedDelay(10)));
+
+ ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ retriever.setTaskTimeout(Duration.ofMillis(1));
+
+ try {
+ retriever.getMetrics();
+ fail("Did not get expected exception");
+ } catch (ApplicationMetricsException expected) {
+ assertTrue(expected.getCause() instanceof TimeoutException);
+ }
+ }
+
+ @Test
+ public void metrics_can_be_retrieved_after_previous_call_threw_an_exception() {
+ var config = nodesConfig("/node0");
+ Node node = new Node(config.node(0));
+
+ var delayedStub = wireMockRule.stubFor(get(urlEqualTo(config.node(0).path()))
+ .willReturn(aResponse()
+ .withBody(RESPONSE)
+ .withFixedDelay(10)));
+
+ ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config);
+ retriever.setTaskTimeout(Duration.ofMillis(1));
+ try {
+ retriever.getMetrics();
+ fail("Did not get expected exception");
+ } catch (ApplicationMetricsException expected) {
+ }
+
+ // Verify successful retrieving
+ wireMockRule.removeStubMapping(delayedStub);
+ verifyRetrievingMetricsFromSingleNode(config, node);
+ }
+
+ private VespaNodesConfig nodesConfig(String... paths) {
+ var nodes = Arrays.stream(paths)
+ .map(this::nodeConfig)
+ .collect(toList());
+ return new VespaNodesConfig.Builder()
+ .node(nodes)
+ .build();
+ }
+
+ private VespaNodesConfig.Node.Builder nodeConfig(String path) {
+ return new VespaNodesConfig.Node.Builder()
+ .configId(path)
+ .hostname(HOST)
+ .path(path)
+ .port(port);
+ }
+
+}