From ee7a234316d5fee80e24efef7b7f0708815aa772 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Thu, 16 Jun 2022 16:20:13 +0200 Subject: Use synchronous http client --- .../metricsproxy/service/HttpMetricFetcher.java | 58 ++++++++-------------- .../service/RemoteHealthMetricFetcher.java | 35 ++++++------- .../metricsproxy/service/RemoteMetricsFetcher.java | 29 +++++------ 3 files changed, 53 insertions(+), 69 deletions(-) (limited to 'metrics-proxy') diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java index b41e9d5c8a8..790b3298a81 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java @@ -1,25 +1,18 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.metricsproxy.service; -import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder; +import ai.vespa.util.http.hc5.VespaHttpClientBuilder; import com.yahoo.yolean.Exceptions; +import org.apache.hc.client5.http.classic.methods.HttpGet; import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.Message; -import org.apache.hc.core5.http.Method; -import org.apache.hc.core5.http.nio.support.BasicRequestProducer; -import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; -import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer; -import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.util.Timeout; import java.io.IOException; -import java.io.InputStream; import java.net.URI; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,10 +29,10 @@ public abstract class HttpMetricFetcher { // The call to apache will do 3 retries. As long as we check the services in series, we can't have this too high. public static volatile int CONNECTION_TIMEOUT = 5000; private final static int SOCKET_TIMEOUT = 60000; - private final static int BUFFER_SIZE = 0x40000; // 256k + final static int BUFFER_SIZE = 0x40000; // 256k private final URI url; protected final VespaService service; - private static final CloseableHttpAsyncClient httpClient = createHttpClient(); + private static final CloseableHttpClient httpClient = createHttpClient(); /** * @param service the service to fetch metrics from @@ -53,17 +46,9 @@ public abstract class HttpMetricFetcher { log.log(Level.FINE, () -> "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT); } - InputStream getJson() throws IOException,InterruptedException, ExecutionException { + CloseableHttpResponse getResponse() throws IOException { log.log(Level.FINE, () -> "Connecting to url " + url + " for service '" + service + "'"); - Future> response = httpClient.execute( - new BasicRequestProducer(Method.GET, url), - new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<>(BUFFER_SIZE, Runnable::run) { - @Override - protected InputStream consumeData(ContentType contentType, InputStream inputStream) { - return inputStream; - } - }), null); - return response.get().getBody(); + return httpClient.execute(new HttpGet(url)); } public String toString() { @@ -95,20 +80,21 @@ public abstract class HttpMetricFetcher { } } - private static CloseableHttpAsyncClient createHttpClient() { - CloseableHttpAsyncClient client = VespaAsyncHttpClientBuilder.create() + private static CloseableHttpClient createHttpClient() { + return VespaHttpClientBuilder.create(registry -> { + var mgr = new PoolingHttpClientConnectionManager(registry); + mgr.setDefaultSocketConfig(SocketConfig.custom() + .setSoTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) + .build()); + return mgr; + }) .setUserAgent("metrics-proxy-http-client") - .setIOReactorConfig(IOReactorConfig.custom() - .setSoTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) - .build()) .setDefaultRequestConfig(RequestConfig.custom() - .setConnectionRequestTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) - .setConnectTimeout(Timeout.ofMilliseconds(CONNECTION_TIMEOUT)) - .setResponseTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) - .build()) + .setConnectionRequestTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) + .setConnectTimeout(Timeout.ofMilliseconds(CONNECTION_TIMEOUT)) + .setResponseTimeout(Timeout.ofMilliseconds(SOCKET_TIMEOUT)) + .build()) .build(); - client.start(); - return client; } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java index cb8e07b0282..f13fa8eef65 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java @@ -4,12 +4,14 @@ package ai.vespa.metricsproxy.service; import ai.vespa.metricsproxy.metric.HealthMetric; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.ExecutionException; import java.util.logging.Level; - -import java.io.IOException; import java.util.logging.Logger; /** @@ -31,9 +33,17 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher { * Connect to remote service over http and fetch metrics */ public HealthMetric getHealth(int fetchCount) { - try (InputStream stream = getJson()) { - return createHealthMetrics(stream, fetchCount); - } catch (IOException | InterruptedException | ExecutionException e) { + try (CloseableHttpResponse response = getResponse()) { + HttpEntity entity = response.getEntity(); + try { + return parse(new BufferedInputStream(entity.getContent(), HttpMetricFetcher.BUFFER_SIZE)); + } catch (Exception e) { + handleException(e, entity.getContentType(), fetchCount); + return HealthMetric.getDown("Failed fetching status page for service"); + } finally { + EntityUtils.consumeQuietly(entity); + } + } catch (IOException e) { if (service.isAlive()) { logMessageNoResponse(errMsgNoResponse(e), fetchCount); } @@ -41,19 +51,6 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher { } } - /** - * Connect to remote service over http and fetch metrics - */ - private HealthMetric createHealthMetrics(InputStream data, int fetchCount) throws IOException { - try { - return parse(data); - } catch (Exception e) { - handleException(e, data, fetchCount); - while (data.read() != -1) {} - return HealthMetric.getDown("Failed fetching status page for service"); - } - } - private HealthMetric parse(InputStream data) { try { diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java index 5ca9e6fd950..cd05945f4f1 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java @@ -1,9 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.metricsproxy.service; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; + +import java.io.BufferedInputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.ExecutionException; /** * Fetch metrics for a given vespa service @@ -22,21 +25,19 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { * Connect to remote service over http and fetch metrics */ public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) { - try (InputStream stream = getJson()) { - createMetrics(stream, consumer, fetchCount); - } catch (IOException | InterruptedException | ExecutionException e) { - } + try (CloseableHttpResponse response = getResponse()) { + HttpEntity entity = response.getEntity(); + try { + MetricsParser.parse(new BufferedInputStream(entity.getContent(), HttpMetricFetcher.BUFFER_SIZE), consumer); + } catch (Exception e) { + handleException(e, entity.getContentType(), fetchCount); + } finally { + EntityUtils.consumeQuietly(entity); + } + } catch (IOException ignored) {} } void createMetrics(String data, MetricsParser.Consumer consumer, int fetchCount) throws IOException { MetricsParser.parse(data, consumer); } - private void createMetrics(InputStream data, MetricsParser.Consumer consumer, int fetchCount) throws IOException { - try { - MetricsParser.parse(data, consumer); - } catch (Exception e) { - handleException(e, data, fetchCount); - while (data.read() != -1) {} - } - } } -- cgit v1.2.3