summaryrefslogtreecommitdiffstats
path: root/metrics-proxy
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2022-06-21 10:38:37 +0200
committerGitHub <noreply@github.com>2022-06-21 10:38:37 +0200
commit1c2a36cce3c9ddf4ed64f0be558d2048f75758f9 (patch)
treebad07ea427619fdd759a3f7d5da2f988f0505bc2 /metrics-proxy
parent2561555b135f873352a38e55edbb23912f19a124 (diff)
parentf192b2538c6b751cb9e3266cef70cafb331c3b7f (diff)
Merge pull request #23138 from vespa-engine/bjorncs/metrics-proxy
Bjorncs/metrics proxy
Diffstat (limited to 'metrics-proxy')
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java58
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java35
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java29
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java3
-rw-r--r--metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java7
5 files changed, 54 insertions, 78 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
index b41e9d5c8a8..790b3298a81 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
@@ -1,25 +1,18 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.metricsproxy.service;
-import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder;
+import ai.vespa.util.http.hc5.VespaHttpClientBuilder;
import com.yahoo.yolean.Exceptions;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.config.RequestConfig;
-import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.Message;
-import org.apache.hc.core5.http.Method;
-import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
-import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
-import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
-import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.util.Timeout;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,10 +29,10 @@ public abstract class HttpMetricFetcher {
// The call to apache will do 3 retries. As long as we check the services in series, we can't have this too high.
public static volatile int CONNECTION_TIMEOUT = 5000;
private final static int SOCKET_TIMEOUT = 60000;
- private final static int BUFFER_SIZE = 0x40000; // 256k
+ final static int BUFFER_SIZE = 0x40000; // 256k
private final URI url;
protected final VespaService service;
- private static final CloseableHttpAsyncClient httpClient = createHttpClient();
+ private static final CloseableHttpClient httpClient = createHttpClient();
/**
* @param service the service to fetch metrics from
@@ -53,17 +46,9 @@ public abstract class HttpMetricFetcher {
log.log(Level.FINE, () -> "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT);
}
- InputStream getJson() throws IOException,InterruptedException, ExecutionException {
+ CloseableHttpResponse getResponse() throws IOException {
log.log(Level.FINE, () -> "Connecting to url " + url + " for service '" + service + "'");
- Future<Message<HttpResponse, InputStream>> response = httpClient.execute(
- new BasicRequestProducer(Method.GET, url),
- new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<>(BUFFER_SIZE, Runnable::run) {
- @Override
- protected InputStream consumeData(ContentType contentType, InputStream inputStream) {
- return inputStream;
- }
- }), null);
- return response.get().getBody();
+ return httpClient.execute(new HttpGet(url));
}
public String toString() {
@@ -95,20 +80,21 @@ public abstract class HttpMetricFetcher {
}
}
- private static CloseableHttpAsyncClient createHttpClient() {
- CloseableHttpAsyncClient client = VespaAsyncHttpClientBuilder.create()
+ private static CloseableHttpClient createHttpClient() {
+ return VespaHttpClientBuilder.create(registry -> {
+ var mgr = new PoolingHttpClientConnectionManager(registry);
+ mgr.setDefaultSocketConfig(SocketConfig.custom()
+ .setSoTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT))
+ .build());
+ return mgr;
+ })
.setUserAgent("metrics-proxy-http-client")
- .setIOReactorConfig(IOReactorConfig.custom()
- .setSoTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT))
- .build())
.setDefaultRequestConfig(RequestConfig.custom()
- .setConnectionRequestTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT))
- .setConnectTimeout(Timeout.ofMilliseconds(CONNECTION_TIMEOUT))
- .setResponseTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT))
- .build())
+ .setConnectionRequestTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT))
+ .setConnectTimeout(Timeout.ofMilliseconds(CONNECTION_TIMEOUT))
+ .setResponseTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT))
+ .build())
.build();
- client.start();
- return client;
}
}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
index cb8e07b0282..f13fa8eef65 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
@@ -4,12 +4,14 @@ package ai.vespa.metricsproxy.service;
import ai.vespa.metricsproxy.metric.HealthMetric;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import java.io.BufferedInputStream;
+import java.io.IOException;
import java.io.InputStream;
-import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
-
-import java.io.IOException;
import java.util.logging.Logger;
/**
@@ -31,9 +33,17 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher {
* Connect to remote service over http and fetch metrics
*/
public HealthMetric getHealth(int fetchCount) {
- try (InputStream stream = getJson()) {
- return createHealthMetrics(stream, fetchCount);
- } catch (IOException | InterruptedException | ExecutionException e) {
+ try (CloseableHttpResponse response = getResponse()) {
+ HttpEntity entity = response.getEntity();
+ try {
+ return parse(new BufferedInputStream(entity.getContent(), HttpMetricFetcher.BUFFER_SIZE));
+ } catch (Exception e) {
+ handleException(e, entity.getContentType(), fetchCount);
+ return HealthMetric.getDown("Failed fetching status page for service");
+ } finally {
+ EntityUtils.consumeQuietly(entity);
+ }
+ } catch (IOException e) {
if (service.isAlive()) {
logMessageNoResponse(errMsgNoResponse(e), fetchCount);
}
@@ -41,19 +51,6 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher {
}
}
- /**
- * Connect to remote service over http and fetch metrics
- */
- private HealthMetric createHealthMetrics(InputStream data, int fetchCount) throws IOException {
- try {
- return parse(data);
- } catch (Exception e) {
- handleException(e, data, fetchCount);
- while (data.read() != -1) {}
- return HealthMetric.getDown("Failed fetching status page for service");
- }
- }
-
private HealthMetric parse(InputStream data) {
try {
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
index 5ca9e6fd950..cd05945f4f1 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
@@ -1,9 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.metricsproxy.service;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+
+import java.io.BufferedInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.ExecutionException;
/**
* Fetch metrics for a given vespa service
@@ -22,21 +25,19 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher {
* Connect to remote service over http and fetch metrics
*/
public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) {
- try (InputStream stream = getJson()) {
- createMetrics(stream, consumer, fetchCount);
- } catch (IOException | InterruptedException | ExecutionException e) {
- }
+ try (CloseableHttpResponse response = getResponse()) {
+ HttpEntity entity = response.getEntity();
+ try {
+ MetricsParser.parse(new BufferedInputStream(entity.getContent(), HttpMetricFetcher.BUFFER_SIZE), consumer);
+ } catch (Exception e) {
+ handleException(e, entity.getContentType(), fetchCount);
+ } finally {
+ EntityUtils.consumeQuietly(entity);
+ }
+ } catch (IOException ignored) {}
}
void createMetrics(String data, MetricsParser.Consumer consumer, int fetchCount) throws IOException {
MetricsParser.parse(data, consumer);
}
- private void createMetrics(InputStream data, MetricsParser.Consumer consumer, int fetchCount) throws IOException {
- try {
- MetricsParser.parse(data, consumer);
- } catch (Exception e) {
- handleException(e, data, fetchCount);
- while (data.read() != -1) {}
- }
- }
}
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java
index cf6de804053..71a4466ea95 100644
--- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java
@@ -10,7 +10,6 @@ import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -43,7 +42,6 @@ public class RpcHealthMetricsTest {
public Timeout globalTimeout = Timeout.seconds(300);
@Test
- @Ignore("Temporarily ignore test until timeout issue is resolved")
public void expected_response_is_returned() {
try (IntegrationTester tester = new IntegrationTester()) {
MockHttpServer mockHttpServer = tester.httpServer();
@@ -69,7 +67,6 @@ public class RpcHealthMetricsTest {
}
@Test
- @Ignore("Temporarily ignore test until timeout issue is resolved")
public void non_existent_service_name_returns_an_error_message() {
try (IntegrationTester tester = new IntegrationTester()) {
String jsonRPCMessage = getHealthMetrics(tester, "non-existing service");
diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java
index 83149ad5ef7..a52e1daf878 100644
--- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java
+++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java
@@ -14,7 +14,6 @@ import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -22,10 +21,10 @@ import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.List;
-import static ai.vespa.metricsproxy.metric.model.MetricId.toMetricId;
import static ai.vespa.metricsproxy.TestUtil.getFileContents;
import static ai.vespa.metricsproxy.core.VespaMetrics.vespaMetricsConsumerId;
import static ai.vespa.metricsproxy.metric.model.DimensionId.toDimensionId;
+import static ai.vespa.metricsproxy.metric.model.MetricId.toMetricId;
import static ai.vespa.metricsproxy.rpc.IntegrationTester.CUSTOM_CONSUMER_ID;
import static ai.vespa.metricsproxy.rpc.IntegrationTester.MONITORING_SYSTEM;
import static ai.vespa.metricsproxy.rpc.IntegrationTester.SERVICE_1_CONFIG_ID;
@@ -68,7 +67,6 @@ public class RpcMetricsTest {
public Timeout globalTimeout = Timeout.seconds(300);
@Test
- @Ignore("Temporarily ignore test until timeout issue is resolved")
public void extra_metrics_are_added_to_output() throws Exception {
try (IntegrationTester tester = new IntegrationTester()) {
try (RpcClient rpcClient = new RpcClient(tester.rpcPort())) {
@@ -84,7 +82,6 @@ public class RpcMetricsTest {
}
@Test
- @Ignore("Temporarily ignore test until timeout issue is resolved")
public void extra_metrics_are_purged() throws Exception {
try (IntegrationTester tester = new IntegrationTester()) {
try (RpcClient rpcClient = new RpcClient(tester.rpcPort())) {
@@ -102,7 +99,6 @@ public class RpcMetricsTest {
}
@Test
- @Ignore("Temporarily ignore test until timeout issue is resolved")
public void testGetMetrics() throws Exception {
try (IntegrationTester tester = new IntegrationTester()) {
tester.httpServer().setResponse(METRICS_RESPONSE);
@@ -193,7 +189,6 @@ public class RpcMetricsTest {
}
@Test
- @Ignore("Temporarily ignore test until timeout issue is resolved")
public void testGetAllMetricNames() {
try (IntegrationTester tester = new IntegrationTester()) {