From 7e19709c78350cd8a58918296ea6d6b5666320e1 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 8 Apr 2021 19:19:13 +0200 Subject: Revert "Use a PipedStream to process metrics as they flow in asynchronously." --- metrics-proxy/pom.xml | 4 +- .../metricsproxy/service/HttpMetricFetcher.java | 64 ++++------------------ .../vespa/metricsproxy/service/MetricsParser.java | 4 -- .../service/RemoteHealthMetricFetcher.java | 21 +++---- .../metricsproxy/service/RemoteMetricsFetcher.java | 3 +- 5 files changed, 24 insertions(+), 72 deletions(-) (limited to 'metrics-proxy') diff --git a/metrics-proxy/pom.xml b/metrics-proxy/pom.xml index 23831f9ac6d..928179a6124 100644 --- a/metrics-proxy/pom.xml +++ b/metrics-proxy/pom.xml @@ -112,8 +112,8 @@ simpleclient_common - org.apache.httpcomponents.client5 - httpclient5 + org.apache.httpcomponents + httpclient org.apache.velocity diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java index c9da5761a5f..2348f65bc9f 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java @@ -3,22 +3,15 @@ package ai.vespa.metricsproxy.service; import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder; -import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.logging.Level; import com.yahoo.yolean.Exceptions; -import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.HttpException; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.Method; -import org.apache.hc.core5.http.nio.support.BasicRequestProducer; import org.apache.hc.core5.util.Timeout; import java.io.IOException; @@ -38,7 +31,6 @@ public abstract class HttpMetricFetcher { // The call to apache will do 3 retries. As long as we check the services in series, we can't have this too high. public static int CONNECTION_TIMEOUT = 5000; private final static int SOCKET_TIMEOUT = 60000; - private final static int BUFFER_SIZE = 0x40000; // 256k private final URI url; protected final VespaService service; private static final CloseableHttpAsyncClient httpClient = createHttpClient(); @@ -55,50 +47,14 @@ public abstract class HttpMetricFetcher { log.log(Level.FINE, "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT); } - InputStream getJson() throws IOException { + byte [] getJson() throws IOException { log.log(Level.FINE, "Connecting to url " + url + " for service '" + service + "'"); - PipedInputStream input = new PipedInputStream(BUFFER_SIZE); - final PipedOutputStream output = new PipedOutputStream(input); - Future response = httpClient.execute( - new BasicRequestProducer(Method.GET, url), - new AbstractBinResponseConsumer(){ - @Override - public void releaseResources() { - try { - output.close(); - } catch (IOException e) { - System.out.println("releaseResources -> close failed"); - } - } - - @Override - protected int capacityIncrement() { - return BUFFER_SIZE; - } - - @Override - protected void data(ByteBuffer src, boolean endOfStream) throws IOException { - byte [] backingArray = src.array(); - int offset = src.arrayOffset(); - output.write(backingArray, offset, src.remaining()); - src.position(src.limit()); - output.flush(); - if (endOfStream) { - output.close(); - } - } - - @Override - protected void start(HttpResponse response, ContentType contentType) throws HttpException, IOException { - - } - - @Override - protected Void buildResult() { - return null; - } - }, null); - return input; + Future response = httpClient.execute(new SimpleHttpRequest("GET", url), null); + try { + return response.get().getBodyBytes(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException("Failed fetching '" + url + "': " + e); + } } public String toString() { diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java index 20c2325e1f3..e43aab8b26f 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.io.InputStream; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -33,9 +32,6 @@ public class MetricsParser { static Metrics parse(byte [] data) throws IOException { return parse(jsonMapper.createParser(data)); } - static Metrics parse(InputStream data) throws IOException { - return parse(jsonMapper.createParser(data)); - } private static Metrics parse(JsonParser parser) throws IOException { if (parser.nextToken() != JsonToken.START_OBJECT) { throw new IOException("Expected start of object, got " + parser.currentToken()); diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java index bf5ae0769d5..f078081c430 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java @@ -5,8 +5,6 @@ import ai.vespa.metricsproxy.metric.HealthMetric; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.util.logging.Level; import java.io.IOException; @@ -31,29 +29,32 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher { * Connect to remote service over http and fetch metrics */ public HealthMetric getHealth(int fetchCount) { + byte [] data = {'{', '}'}; try { - return createHealthMetrics(getJson(), fetchCount); + data = getJson(); } catch (IOException e) { logMessageNoResponse(errMsgNoResponse(e), fetchCount); - byte [] empty = {'{','}'}; - return createHealthMetrics(new ByteArrayInputStream(empty), fetchCount); } + return createHealthMetrics(data, fetchCount); } /** * Connect to remote service over http and fetch metrics */ - private HealthMetric createHealthMetrics(InputStream data, int fetchCount) { + private HealthMetric createHealthMetrics(byte [] data, int fetchCount) { + HealthMetric healthMetric = HealthMetric.getDown("Failed fetching status page for service"); try { - return parse(data); + healthMetric = parse(data); } catch (Exception e) { handleException(e, data, fetchCount); - return HealthMetric.getDown("Failed fetching status page for service"); } + return healthMetric; } - - private HealthMetric parse(InputStream data) { + private HealthMetric parse(byte [] data) { + if ((data == null) || (data.length == 0)) { + return HealthMetric.getUnknown("Empty response from status page"); + } try { JsonNode o = jsonMapper.readTree(data); JsonNode status = o.get("status"); diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java index c071cdb8fb9..787f0e33157 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java @@ -4,7 +4,6 @@ package ai.vespa.metricsproxy.service; import ai.vespa.metricsproxy.metric.Metrics; import java.io.IOException; -import java.io.InputStream; /** * Fetch metrics for a given vespa service @@ -40,7 +39,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { return remoteMetrics; } - Metrics createMetrics(InputStream data, int fetchCount) { + Metrics createMetrics(byte [] data, int fetchCount) { Metrics remoteMetrics = new Metrics(); try { remoteMetrics = MetricsParser.parse(data); -- cgit v1.2.3