diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-08 11:55:33 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-08 12:11:44 +0200 |
commit | 89c46869095e571f8d93e9f7716c9a528e9e74db (patch) | |
tree | 88502d587328d774b05d2d57133f02b6aa1ed293 /metrics-proxy/src | |
parent | 39430e4e809cb2993a371df38fcac3bb0c798200 (diff) |
Use a PipedStream to process metrics as they flow in asynchronously.
Diffstat (limited to 'metrics-proxy/src')
4 files changed, 70 insertions, 22 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java index 2348f65bc9f..c9da5761a5f 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java @@ -3,15 +3,22 @@ package ai.vespa.metricsproxy.service; import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder; -import java.util.concurrent.ExecutionException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.ByteBuffer; import java.util.concurrent.Future; import java.util.logging.Level; import com.yahoo.yolean.Exceptions; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; import org.apache.hc.core5.util.Timeout; import java.io.IOException; @@ -31,6 +38,7 @@ public abstract class HttpMetricFetcher { // The call to apache will do 3 retries. As long as we check the services in series, we can't have this too high. public static int CONNECTION_TIMEOUT = 5000; private final static int SOCKET_TIMEOUT = 60000; + private final static int BUFFER_SIZE = 0x40000; // 256k private final URI url; protected final VespaService service; private static final CloseableHttpAsyncClient httpClient = createHttpClient(); @@ -47,14 +55,50 @@ public abstract class HttpMetricFetcher { log.log(Level.FINE, "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT); } - byte [] getJson() throws IOException { + InputStream getJson() throws IOException { log.log(Level.FINE, "Connecting to url " + url + " for service '" + service + "'"); - Future<SimpleHttpResponse> response = httpClient.execute(new SimpleHttpRequest("GET", url), null); - try { - return response.get().getBodyBytes(); - } catch (InterruptedException | ExecutionException e) { - throw new IOException("Failed fetching '" + url + "': " + e); - } + PipedInputStream input = new PipedInputStream(BUFFER_SIZE); + final PipedOutputStream output = new PipedOutputStream(input); + Future<Void> response = httpClient.execute( + new BasicRequestProducer(Method.GET, url), + new AbstractBinResponseConsumer<Void>(){ + @Override + public void releaseResources() { + try { + output.close(); + } catch (IOException e) { + System.out.println("releaseResources -> close failed"); + } + } + + @Override + protected int capacityIncrement() { + return BUFFER_SIZE; + } + + @Override + protected void data(ByteBuffer src, boolean endOfStream) throws IOException { + byte [] backingArray = src.array(); + int offset = src.arrayOffset(); + output.write(backingArray, offset, src.remaining()); + src.position(src.limit()); + output.flush(); + if (endOfStream) { + output.close(); + } + } + + @Override + protected void start(HttpResponse response, ContentType contentType) throws HttpException, IOException { + + } + + @Override + protected Void buildResult() { + return null; + } + }, null); + return input; } public String toString() { diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java index e43aab8b26f..20c2325e1f3 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -32,6 +33,9 @@ public class MetricsParser { static Metrics parse(byte [] data) throws IOException { return parse(jsonMapper.createParser(data)); } + static Metrics parse(InputStream data) throws IOException { + return parse(jsonMapper.createParser(data)); + } private static Metrics parse(JsonParser parser) throws IOException { if (parser.nextToken() != JsonToken.START_OBJECT) { throw new IOException("Expected start of object, got " + parser.currentToken()); diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java index f078081c430..bf5ae0769d5 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java @@ -5,6 +5,8 @@ import ai.vespa.metricsproxy.metric.HealthMetric; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.logging.Level; import java.io.IOException; @@ -29,32 +31,29 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher { * Connect to remote service over http and fetch metrics */ public HealthMetric getHealth(int fetchCount) { - byte [] data = {'{', '}'}; try { - data = getJson(); + return createHealthMetrics(getJson(), fetchCount); } catch (IOException e) { logMessageNoResponse(errMsgNoResponse(e), fetchCount); + byte [] empty = {'{','}'}; + return createHealthMetrics(new ByteArrayInputStream(empty), fetchCount); } - return createHealthMetrics(data, fetchCount); } /** * Connect to remote service over http and fetch metrics */ - private HealthMetric createHealthMetrics(byte [] data, int fetchCount) { - HealthMetric healthMetric = HealthMetric.getDown("Failed fetching status page for service"); + private HealthMetric createHealthMetrics(InputStream data, int fetchCount) { try { - healthMetric = parse(data); + return parse(data); } catch (Exception e) { handleException(e, data, fetchCount); + return HealthMetric.getDown("Failed fetching status page for service"); } - return healthMetric; } - private HealthMetric parse(byte [] data) { - if ((data == null) || (data.length == 0)) { - return HealthMetric.getUnknown("Empty response from status page"); - } + + private HealthMetric parse(InputStream data) { try { JsonNode o = jsonMapper.readTree(data); JsonNode status = o.get("status"); diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java index 787f0e33157..c071cdb8fb9 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java @@ -4,6 +4,7 @@ package ai.vespa.metricsproxy.service; import ai.vespa.metricsproxy.metric.Metrics; import java.io.IOException; +import java.io.InputStream; /** * Fetch metrics for a given vespa service @@ -39,7 +40,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { return remoteMetrics; } - Metrics createMetrics(byte [] data, int fetchCount) { + Metrics createMetrics(InputStream data, int fetchCount) { Metrics remoteMetrics = new Metrics(); try { remoteMetrics = MetricsParser.parse(data); |