diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-09 07:25:50 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-09 07:25:50 +0200 |
commit | 44a348bc5c55ec23288a8beb9bd0caeca7c74e89 (patch) | |
tree | 3ab0b21726e55aa085a651f2eb49f579c418187e /metrics-proxy | |
parent | 304fdb85bb9bdebe36add9b5837bbcbb72755bc1 (diff) |
Use already present utility for streaming.
Diffstat (limited to 'metrics-proxy')
3 files changed, 16 insertions, 48 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java index c9da5761a5f..333d8696413 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java @@ -4,21 +4,20 @@ package ai.vespa.metricsproxy.service; import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder; import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.logging.Level; import com.yahoo.yolean.Exceptions; -import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; import org.apache.hc.core5.http.Method; import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer; import org.apache.hc.core5.util.Timeout; import java.io.IOException; @@ -55,57 +54,24 @@ public abstract class HttpMetricFetcher { log.log(Level.FINE, "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT); } - InputStream getJson() throws IOException { + InputStream getJson() throws IOException,InterruptedException, ExecutionException { log.log(Level.FINE, "Connecting to url " + url + " for service '" + service + "'"); - PipedInputStream input = new PipedInputStream(BUFFER_SIZE); - final PipedOutputStream output = new PipedOutputStream(input); - Future<Void> response = httpClient.execute( + Future<Message<HttpResponse, InputStream>> response = httpClient.execute( new BasicRequestProducer(Method.GET, url), - new AbstractBinResponseConsumer<Void>(){ + new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<>(BUFFER_SIZE, Runnable::run) { @Override - public void releaseResources() { - try { - output.close(); - } catch (IOException e) { - System.out.println("releaseResources -> close failed"); - } + protected InputStream consumeData(ContentType contentType, InputStream inputStream) { + return inputStream; } - - @Override - protected int capacityIncrement() { - return BUFFER_SIZE; - } - - @Override - protected void data(ByteBuffer src, boolean endOfStream) throws IOException { - byte [] backingArray = src.array(); - int offset = src.arrayOffset(); - output.write(backingArray, offset, src.remaining()); - src.position(src.limit()); - output.flush(); - if (endOfStream) { - output.close(); - } - } - - @Override - protected void start(HttpResponse response, ContentType contentType) throws HttpException, IOException { - - } - - @Override - protected Void buildResult() { - return null; - } - }, null); - return input; + }), null); + return response.get().getBody(); } public String toString() { return this.getClass().getSimpleName() + " using " + url; } - String errMsgNoResponse(IOException e) { + String errMsgNoResponse(Exception e) { return "Unable to get response from service '" + service + "': " + Exceptions.toMessageString(e); } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java index bf5ae0769d5..2ebf5b2532c 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.util.concurrent.ExecutionException; import java.util.logging.Level; import java.io.IOException; @@ -33,7 +34,7 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher { public HealthMetric getHealth(int fetchCount) { try { return createHealthMetrics(getJson(), fetchCount); - } catch (IOException e) { + } catch (IOException | InterruptedException | ExecutionException e) { logMessageNoResponse(errMsgNoResponse(e), fetchCount); byte [] empty = {'{','}'}; return createHealthMetrics(new ByteArrayInputStream(empty), fetchCount); diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java index c071cdb8fb9..f2cb5c4e8b3 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java @@ -5,6 +5,7 @@ import ai.vespa.metricsproxy.metric.Metrics; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ExecutionException; /** * Fetch metrics for a given vespa service @@ -25,7 +26,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { public Metrics getMetrics(int fetchCount) { try { return createMetrics(getJson(), fetchCount); - } catch (IOException e) { + } catch (IOException | InterruptedException | ExecutionException e) { return new Metrics(); } } |