summaryrefslogtreecommitdiffstats
path: root/metrics-proxy
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-04-09 07:25:50 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2021-04-09 07:25:50 +0200
commit44a348bc5c55ec23288a8beb9bd0caeca7c74e89 (patch)
tree3ab0b21726e55aa085a651f2eb49f579c418187e /metrics-proxy
parent304fdb85bb9bdebe36add9b5837bbcbb72755bc1 (diff)
Use already present utility for streaming.
Diffstat (limited to 'metrics-proxy')
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java58
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java3
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java3
3 files changed, 16 insertions, 48 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
index c9da5761a5f..333d8696413 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
@@ -4,21 +4,20 @@ package ai.vespa.metricsproxy.service;
import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder;
import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import com.yahoo.yolean.Exceptions;
-import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
import org.apache.hc.core5.util.Timeout;
import java.io.IOException;
@@ -55,57 +54,24 @@ public abstract class HttpMetricFetcher {
log.log(Level.FINE, "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT);
}
- InputStream getJson() throws IOException {
+ InputStream getJson() throws IOException,InterruptedException, ExecutionException {
log.log(Level.FINE, "Connecting to url " + url + " for service '" + service + "'");
- PipedInputStream input = new PipedInputStream(BUFFER_SIZE);
- final PipedOutputStream output = new PipedOutputStream(input);
- Future<Void> response = httpClient.execute(
+ Future<Message<HttpResponse, InputStream>> response = httpClient.execute(
new BasicRequestProducer(Method.GET, url),
- new AbstractBinResponseConsumer<Void>(){
+ new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<>(BUFFER_SIZE, Runnable::run) {
@Override
- public void releaseResources() {
- try {
- output.close();
- } catch (IOException e) {
- System.out.println("releaseResources -> close failed");
- }
+ protected InputStream consumeData(ContentType contentType, InputStream inputStream) {
+ return inputStream;
}
-
- @Override
- protected int capacityIncrement() {
- return BUFFER_SIZE;
- }
-
- @Override
- protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
- byte [] backingArray = src.array();
- int offset = src.arrayOffset();
- output.write(backingArray, offset, src.remaining());
- src.position(src.limit());
- output.flush();
- if (endOfStream) {
- output.close();
- }
- }
-
- @Override
- protected void start(HttpResponse response, ContentType contentType) throws HttpException, IOException {
-
- }
-
- @Override
- protected Void buildResult() {
- return null;
- }
- }, null);
- return input;
+ }), null);
+ return response.get().getBody();
}
public String toString() {
return this.getClass().getSimpleName() + " using " + url;
}
- String errMsgNoResponse(IOException e) {
+ String errMsgNoResponse(Exception e) {
return "Unable to get response from service '" + service + "': " +
Exceptions.toMessageString(e);
}
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
index bf5ae0769d5..2ebf5b2532c 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.io.IOException;
@@ -33,7 +34,7 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher {
public HealthMetric getHealth(int fetchCount) {
try {
return createHealthMetrics(getJson(), fetchCount);
- } catch (IOException e) {
+ } catch (IOException | InterruptedException | ExecutionException e) {
logMessageNoResponse(errMsgNoResponse(e), fetchCount);
byte [] empty = {'{','}'};
return createHealthMetrics(new ByteArrayInputStream(empty), fetchCount);
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
index c071cdb8fb9..f2cb5c4e8b3 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
@@ -5,6 +5,7 @@ import ai.vespa.metricsproxy.metric.Metrics;
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
/**
* Fetch metrics for a given vespa service
@@ -25,7 +26,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher {
public Metrics getMetrics(int fetchCount) {
try {
return createMetrics(getJson(), fetchCount);
- } catch (IOException e) {
+ } catch (IOException | InterruptedException | ExecutionException e) {
return new Metrics();
}
}