diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-06-21 10:38:37 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-21 10:38:37 +0200 |
commit | 1c2a36cce3c9ddf4ed64f0be558d2048f75758f9 (patch) | |
tree | bad07ea427619fdd759a3f7d5da2f988f0505bc2 /metrics-proxy | |
parent | 2561555b135f873352a38e55edbb23912f19a124 (diff) | |
parent | f192b2538c6b751cb9e3266cef70cafb331c3b7f (diff) |
Merge pull request #23138 from vespa-engine/bjorncs/metrics-proxy
Bjorncs/metrics proxy
Diffstat (limited to 'metrics-proxy')
5 files changed, 54 insertions, 78 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java index b41e9d5c8a8..790b3298a81 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java @@ -1,25 +1,18 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.metricsproxy.service; -import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder; +import ai.vespa.util.http.hc5.VespaHttpClientBuilder; import com.yahoo.yolean.Exceptions; +import org.apache.hc.client5.http.classic.methods.HttpGet; import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.Message; -import org.apache.hc.core5.http.Method; -import org.apache.hc.core5.http.nio.support.BasicRequestProducer; -import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; -import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer; -import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.util.Timeout; import java.io.IOException; -import java.io.InputStream; import java.net.URI; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,10 +29,10 @@ public abstract class HttpMetricFetcher { // The call to apache will do 3 retries. As long as we check the services in series, we can't have this too high. public static volatile int CONNECTION_TIMEOUT = 5000; private final static int SOCKET_TIMEOUT = 60000; - private final static int BUFFER_SIZE = 0x40000; // 256k + final static int BUFFER_SIZE = 0x40000; // 256k private final URI url; protected final VespaService service; - private static final CloseableHttpAsyncClient httpClient = createHttpClient(); + private static final CloseableHttpClient httpClient = createHttpClient(); /** * @param service the service to fetch metrics from @@ -53,17 +46,9 @@ public abstract class HttpMetricFetcher { log.log(Level.FINE, () -> "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT); } - InputStream getJson() throws IOException,InterruptedException, ExecutionException { + CloseableHttpResponse getResponse() throws IOException { log.log(Level.FINE, () -> "Connecting to url " + url + " for service '" + service + "'"); - Future<Message<HttpResponse, InputStream>> response = httpClient.execute( - new BasicRequestProducer(Method.GET, url), - new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<>(BUFFER_SIZE, Runnable::run) { - @Override - protected InputStream consumeData(ContentType contentType, InputStream inputStream) { - return inputStream; - } - }), null); - return response.get().getBody(); + return httpClient.execute(new HttpGet(url)); } public String toString() { @@ -95,20 +80,21 @@ public abstract class HttpMetricFetcher { } } - private static CloseableHttpAsyncClient createHttpClient() { - CloseableHttpAsyncClient client = VespaAsyncHttpClientBuilder.create() + private static CloseableHttpClient createHttpClient() { + return VespaHttpClientBuilder.create(registry -> { + var mgr = new PoolingHttpClientConnectionManager(registry); + mgr.setDefaultSocketConfig(SocketConfig.custom() + .setSoTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) + .build()); + return mgr; + }) .setUserAgent("metrics-proxy-http-client") - .setIOReactorConfig(IOReactorConfig.custom() - .setSoTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) - .build()) .setDefaultRequestConfig(RequestConfig.custom() - .setConnectionRequestTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) - .setConnectTimeout(Timeout.ofMilliseconds(CONNECTION_TIMEOUT)) - .setResponseTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) - .build()) + .setConnectionRequestTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) + .setConnectTimeout(Timeout.ofMilliseconds(CONNECTION_TIMEOUT)) + .setResponseTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) + .build()) .build(); - client.start(); - return client; } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java index cb8e07b0282..f13fa8eef65 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java @@ -4,12 +4,14 @@ package ai.vespa.metricsproxy.service; import ai.vespa.metricsproxy.metric.HealthMetric; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.ExecutionException; import java.util.logging.Level; - -import java.io.IOException; import java.util.logging.Logger; /** @@ -31,9 +33,17 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher { * Connect to remote service over http and fetch metrics */ public HealthMetric getHealth(int fetchCount) { - try (InputStream stream = getJson()) { - return createHealthMetrics(stream, fetchCount); - } catch (IOException | InterruptedException | ExecutionException e) { + try (CloseableHttpResponse response = getResponse()) { + HttpEntity entity = response.getEntity(); + try { + return parse(new BufferedInputStream(entity.getContent(), HttpMetricFetcher.BUFFER_SIZE)); + } catch (Exception e) { + handleException(e, entity.getContentType(), fetchCount); + return HealthMetric.getDown("Failed fetching status page for service"); + } finally { + EntityUtils.consumeQuietly(entity); + } + } catch (IOException e) { if (service.isAlive()) { logMessageNoResponse(errMsgNoResponse(e), fetchCount); } @@ -41,19 +51,6 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher { } } - /** - * Connect to remote service over http and fetch metrics - */ - private HealthMetric createHealthMetrics(InputStream data, int fetchCount) throws IOException { - try { - return parse(data); - } catch (Exception e) { - handleException(e, data, fetchCount); - while (data.read() != -1) {} - return HealthMetric.getDown("Failed fetching status page for service"); - } - } - private HealthMetric parse(InputStream data) { try { diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java index 5ca9e6fd950..cd05945f4f1 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java @@ -1,9 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.metricsproxy.service; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; + +import java.io.BufferedInputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.ExecutionException; /** * Fetch metrics for a given vespa service @@ -22,21 +25,19 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { * Connect to remote service over http and fetch metrics */ public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) { - try (InputStream stream = getJson()) { - createMetrics(stream, consumer, fetchCount); - } catch (IOException | InterruptedException | ExecutionException e) { - } + try (CloseableHttpResponse response = getResponse()) { + HttpEntity entity = response.getEntity(); + try { + MetricsParser.parse(new BufferedInputStream(entity.getContent(), HttpMetricFetcher.BUFFER_SIZE), consumer); + } catch (Exception e) { + handleException(e, entity.getContentType(), fetchCount); + } finally { + EntityUtils.consumeQuietly(entity); + } + } catch (IOException ignored) {} } void createMetrics(String data, MetricsParser.Consumer consumer, int fetchCount) throws IOException { MetricsParser.parse(data, consumer); } - private void createMetrics(InputStream data, MetricsParser.Consumer consumer, int fetchCount) throws IOException { - try { - MetricsParser.parse(data, consumer); - } catch (Exception e) { - handleException(e, data, fetchCount); - while (data.read() != -1) {} - } - } } diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java index cf6de804053..71a4466ea95 100644 --- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcHealthMetricsTest.java @@ -10,7 +10,6 @@ import com.yahoo.jrt.StringValue; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -43,7 +42,6 @@ public class RpcHealthMetricsTest { public Timeout globalTimeout = Timeout.seconds(300); @Test - @Ignore("Temporarily ignore test until timeout issue is resolved") public void expected_response_is_returned() { try (IntegrationTester tester = new IntegrationTester()) { MockHttpServer mockHttpServer = tester.httpServer(); @@ -69,7 +67,6 @@ public class RpcHealthMetricsTest { } @Test - @Ignore("Temporarily ignore test until timeout issue is resolved") public void non_existent_service_name_returns_an_error_message() { try (IntegrationTester tester = new IntegrationTester()) { String jsonRPCMessage = getHealthMetrics(tester, "non-existing service"); diff --git a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java index 83149ad5ef7..a52e1daf878 100644 --- a/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java +++ b/metrics-proxy/src/test/java/ai/vespa/metricsproxy/rpc/RpcMetricsTest.java @@ -14,7 +14,6 @@ import com.yahoo.jrt.StringValue; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -22,10 +21,10 @@ import org.junit.rules.Timeout; import java.io.IOException; import java.util.List; -import static ai.vespa.metricsproxy.metric.model.MetricId.toMetricId; import static ai.vespa.metricsproxy.TestUtil.getFileContents; import static ai.vespa.metricsproxy.core.VespaMetrics.vespaMetricsConsumerId; import static ai.vespa.metricsproxy.metric.model.DimensionId.toDimensionId; +import static ai.vespa.metricsproxy.metric.model.MetricId.toMetricId; import static ai.vespa.metricsproxy.rpc.IntegrationTester.CUSTOM_CONSUMER_ID; import static ai.vespa.metricsproxy.rpc.IntegrationTester.MONITORING_SYSTEM; import static ai.vespa.metricsproxy.rpc.IntegrationTester.SERVICE_1_CONFIG_ID; @@ -68,7 +67,6 @@ public class RpcMetricsTest { public Timeout globalTimeout = Timeout.seconds(300); @Test - @Ignore("Temporarily ignore test until timeout issue is resolved") public void extra_metrics_are_added_to_output() throws Exception { try (IntegrationTester tester = new IntegrationTester()) { try (RpcClient rpcClient = new RpcClient(tester.rpcPort())) { @@ -84,7 +82,6 @@ public class RpcMetricsTest { } @Test - @Ignore("Temporarily ignore test until timeout issue is resolved") public void extra_metrics_are_purged() throws Exception { try (IntegrationTester tester = new IntegrationTester()) { try (RpcClient rpcClient = new RpcClient(tester.rpcPort())) { @@ -102,7 +99,6 @@ public class RpcMetricsTest { } @Test - @Ignore("Temporarily ignore test until timeout issue is resolved") public void testGetMetrics() throws Exception { try (IntegrationTester tester = new IntegrationTester()) { tester.httpServer().setResponse(METRICS_RESPONSE); @@ -193,7 +189,6 @@ public class RpcMetricsTest { } @Test - @Ignore("Temporarily ignore test until timeout issue is resolved") public void testGetAllMetricNames() { try (IntegrationTester tester = new IntegrationTester()) { |