diff options
author | gjoranv <gv@verizonmedia.com> | 2019-12-17 17:50:42 +0100 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2019-12-19 12:05:04 +0100 |
commit | c11e86632533e4e35d393364c7c7e7743566a1fa (patch) | |
tree | 3fa14d5f78d191fe6c6297638120757582db73ab /metrics-proxy | |
parent | 7a915cf61dd3e190a1eb5502715759c9c6e200ff (diff) |
Add retriever for metrics from all application nodes.
Diffstat (limited to 'metrics-proxy')
6 files changed, 336 insertions, 1 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsException.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsException.java new file mode 100644 index 00000000000..087959d4c73 --- /dev/null +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsException.java @@ -0,0 +1,16 @@ +/* + * Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + */ + +package ai.vespa.metricsproxy.http.application; + +/** + * @author gjoranv + */ +class ApplicationMetricsException extends RuntimeException { + + ApplicationMetricsException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java new file mode 100644 index 00000000000..332c070943e --- /dev/null +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java @@ -0,0 +1,124 @@ +/* + * Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + */ + +package ai.vespa.metricsproxy.http.application; + +import ai.vespa.metricsproxy.metric.model.MetricsPacket; +import ai.vespa.util.http.VespaHttpClientBuilder; +import com.google.inject.Inject; +import com.yahoo.component.AbstractComponent; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; + +import java.time.Clock; +import java.time.Duration; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toMap; + +/** + * This class retrieves metrics from all nodes in the given config, usually all + * nodes in a Vespa application. + * + * @author gjoranv + */ +public class ApplicationMetricsRetriever extends AbstractComponent { + private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName()); + + private static final int PARALLELISM = 20; + static final Duration MIN_TIMEOUT = Duration.ofSeconds(60); + private static final Duration MAX_TIMEOUT = Duration.ofSeconds(240); + + private static final int HTTP_CONNECT_TIMEOUT = 5000; + private static final int HTTP_SOCKET_TIMEOUT = 30000; + + private final HttpClient httpClient = createHttpClient(); + private final List<NodeMetricsClient> clients; + + // Non-final for testing + private Duration taskTimeout; + + private ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM); + + @Inject + public ApplicationMetricsRetriever(VespaNodesConfig nodesConfig) { + clients = createNodeClients(nodesConfig); + taskTimeout = timeout(clients.size()); + } + + @Override + public void deconstruct() { + forkJoinPool.shutdownNow(); + super.deconstruct(); + } + + public Map<Node, List<MetricsPacket.Builder>> getMetrics() { + log.info(() -> "Retrieving metrics from " + clients.size() + " nodes."); + var forkJoinTask = forkJoinPool.submit(() -> clients.parallelStream() + .map(this::getNodeMetrics) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))); + + try { + var metricsByNode = forkJoinTask.get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS); + + log.info(() -> "Successfully retrieved metrics from " + clients.size() + " nodes."); + return metricsByNode; + + } catch (Exception e) { + // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException + forkJoinPool.shutdownNow(); + forkJoinPool = new ForkJoinPool(PARALLELISM); + throw new ApplicationMetricsException("Failed retrieving metrics.", e); + } + } + + private Map.Entry<Node, List<MetricsPacket.Builder>> getNodeMetrics(NodeMetricsClient client) { + try { + return new AbstractMap.SimpleEntry<>(client.node, client.getMetrics()); + } catch (Exception e) { + log.warning("Could not retrieve metrics from " + client.node.metricsUri); + } + return new AbstractMap.SimpleEntry<>(client.node, emptyList()); + } + + private List<NodeMetricsClient> createNodeClients(VespaNodesConfig nodesConfig) { + var clients = new ArrayList<NodeMetricsClient>(); + for (var nc : nodesConfig.node()) { + var node = new Node(nc.configId(), nc.hostname(), nc.port(), nc.path()); + var client = new NodeMetricsClient(httpClient, node, Clock.systemUTC()); + clients.add(client); + } + return clients; + } + + private static CloseableHttpClient createHttpClient() { + return VespaHttpClientBuilder.create(PoolingHttpClientConnectionManager::new) + .setUserAgent("application-metrics-retriever") + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectTimeout(HTTP_CONNECT_TIMEOUT) + .setSocketTimeout(HTTP_SOCKET_TIMEOUT) + .build()) + .build(); + } + + private static Duration timeout(int clients) { + Duration timeout = Duration.ofSeconds(Long.max(MIN_TIMEOUT.toSeconds(), 20 * clients / PARALLELISM)); + return timeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : timeout; + } + + // For testing only! + void setTaskTimeout(Duration taskTimeout) { + this.taskTimeout = taskTimeout; + } + +} diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java index 06bb2e890ea..c22f81b42d5 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/Node.java @@ -5,6 +5,7 @@ package ai.vespa.metricsproxy.http.application; import java.net.URI; +import java.util.Objects; /** * Represents a node to retrieve metrics from. @@ -20,6 +21,10 @@ public class Node { final URI metricsUri; + public Node(VespaNodesConfig.Node nodeConfig) { + this(nodeConfig.configId(), nodeConfig.hostname(), nodeConfig.port() ,nodeConfig.path()); + } + public Node(String configId, String host, int port, String path) { this.configId = configId; this.host = host; @@ -32,4 +37,18 @@ public class Node { return URI.create("http://" + host + ":" + port + path); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Node node = (Node) o; + return port == node.port && + configId.equals(node.configId) && + host.equals(node.host); + } + + @Override + public int hashCode() { + return Objects.hash(configId, host, port); + } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java index 34ee38d44f2..1584a33d772 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/NodeMetricsClient.java @@ -33,8 +33,8 @@ public class NodeMetricsClient { static final Duration METRICS_TTL = Duration.ofSeconds(30); + final Node node; private final HttpClient httpClient; - private final Node node; private final Clock clock; private List<MetricsPacket.Builder> metrics = emptyList(); diff --git a/metrics-proxy/src/main/resources/configdefinitions/vespa-nodes.def b/metrics-proxy/src/main/resources/configdefinitions/vespa-nodes.def new file mode 100644 index 00000000000..0114e1ecfd2 --- /dev/null +++ b/metrics-proxy/src/main/resources/configdefinitions/vespa-nodes.def @@ -0,0 +1,7 @@ +# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package=ai.vespa.metricsproxy.http.application + +node[].configId string +node[].hostname string +node[].port int +node[].path string diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java new file mode 100644 index 00000000000..e2bd5c84dc0 --- /dev/null +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetrieverTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + */ + +package ai.vespa.metricsproxy.http.application; + +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.TimeoutException; + +import static ai.vespa.metricsproxy.TestUtil.getFileContents; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author gjoranv + */ +public class ApplicationMetricsRetrieverTest { + + private static final String TEST_FILE = "generic-sample.json"; + private static final String RESPONSE = getFileContents(TEST_FILE); + private static final String HOST = "localhost"; + + private static int port; + + @Rule + public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort(), false); + + @Before + public void setup() { + port = wireMockRule.port(); + } + + @Test + public void metrics_can_be_retrieved() { + var config = nodesConfig("/node0"); + Node node = new Node(config.node(0)); + + verifyRetrievingMetricsFromSingleNode(config, node); + } + + private void verifyRetrievingMetricsFromSingleNode(VespaNodesConfig config, Node node) { + wireMockRule.stubFor(get(urlEqualTo(config.node(0).path())) + .willReturn(aResponse().withBody(RESPONSE))); + + ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + var metricsByNode = retriever.getMetrics(); + assertEquals(1, metricsByNode.size()); + assertEquals(4, metricsByNode.get(node).size()); + } + + @Test + public void metrics_can_be_retrieved_from_multiple_nodes() { + var config = nodesConfig("/node0", "/node1"); + Node node0 = new Node(config.node(0)); + Node node1 = new Node(config.node(1)); + + wireMockRule.stubFor(get(urlEqualTo(config.node(0).path())) + .willReturn(aResponse().withBody(RESPONSE))); + wireMockRule.stubFor(get(urlEqualTo(config.node(1).path())) + .willReturn(aResponse().withBody(RESPONSE))); + + ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + var metricsByNode = retriever.getMetrics(); + assertEquals(2, metricsByNode.size()); + assertEquals(4, metricsByNode.get(node0).size()); + assertEquals(4, metricsByNode.get(node1).size()); + } + + @Test + public void dead_node_yields_empty_metrics() { + var config = nodesConfig("/non-existent"); + Node node = new Node(config.node(0)); + + ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + var metricsByNode = retriever.getMetrics(); + assertEquals(1, metricsByNode.size()); + assertEquals(0, metricsByNode.get(node).size()); + + } + + @Test + public void metrics_from_good_node_are_returned_even_if_another_node_is_dead() { + var config = nodesConfig("/node0", "/node1"); + Node node0 = new Node(config.node(0)); + Node node1 = new Node(config.node(1)); + + wireMockRule.stubFor(get(urlEqualTo(config.node(1).path())) + .willReturn(aResponse().withBody(RESPONSE))); + + ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + var metricsByNode = retriever.getMetrics(); + assertEquals(2, metricsByNode.size()); + assertEquals(0, metricsByNode.get(node0).size()); + assertEquals(4, metricsByNode.get(node1).size()); + } + + @Test + public void an_exception_is_thrown_when_retrieving_times_out() { + var config = nodesConfig("/node0"); + + wireMockRule.stubFor(get(urlEqualTo(config.node(0).path())) + .willReturn(aResponse() + .withBody(RESPONSE) + .withFixedDelay(10))); + + ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + retriever.setTaskTimeout(Duration.ofMillis(1)); + + try { + retriever.getMetrics(); + fail("Did not get expected exception"); + } catch (ApplicationMetricsException expected) { + assertTrue(expected.getCause() instanceof TimeoutException); + } + } + + @Test + public void metrics_can_be_retrieved_after_previous_call_threw_an_exception() { + var config = nodesConfig("/node0"); + Node node = new Node(config.node(0)); + + var delayedStub = wireMockRule.stubFor(get(urlEqualTo(config.node(0).path())) + .willReturn(aResponse() + .withBody(RESPONSE) + .withFixedDelay(10))); + + ApplicationMetricsRetriever retriever = new ApplicationMetricsRetriever(config); + retriever.setTaskTimeout(Duration.ofMillis(1)); + try { + retriever.getMetrics(); + fail("Did not get expected exception"); + } catch (ApplicationMetricsException expected) { + } + + // Verify successful retrieving + wireMockRule.removeStubMapping(delayedStub); + verifyRetrievingMetricsFromSingleNode(config, node); + } + + private VespaNodesConfig nodesConfig(String... paths) { + var nodes = Arrays.stream(paths) + .map(this::nodeConfig) + .collect(toList()); + return new VespaNodesConfig.Builder() + .node(nodes) + .build(); + } + + private VespaNodesConfig.Node.Builder nodeConfig(String path) { + return new VespaNodesConfig.Node.Builder() + .configId(path) + .hostname(HOST) + .path(path) + .port(port); + } + +} |