summaryrefslogtreecommitdiffstats
path: root/metrics-proxy
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-04-08 22:43:27 +0200
committerGitHub <noreply@github.com>2021-04-08 22:43:27 +0200
commit10afef17237f7180547af531241cb509e93e1f47 (patch)
tree10fb8f016bfef71afe6606b3e7eac849c5b8c9a5 /metrics-proxy
parent62985a75aa1948334bfc248f0e8fe8526892c70f (diff)
Revert "Revert "Use a PipedStream to process metrics as they flow in asynchronously.""
Diffstat (limited to 'metrics-proxy')
-rw-r--r--metrics-proxy/pom.xml4
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java64
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java4
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java21
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java3
5 files changed, 72 insertions, 24 deletions
diff --git a/metrics-proxy/pom.xml b/metrics-proxy/pom.xml
index 928179a6124..23831f9ac6d 100644
--- a/metrics-proxy/pom.xml
+++ b/metrics-proxy/pom.xml
@@ -112,8 +112,8 @@
<artifactId>simpleclient_common</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
+ <groupId>org.apache.httpcomponents.client5</groupId>
+ <artifactId>httpclient5</artifactId>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
index 2348f65bc9f..c9da5761a5f 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/HttpMetricFetcher.java
@@ -3,15 +3,22 @@ package ai.vespa.metricsproxy.service;
import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder;
-import java.util.concurrent.ExecutionException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import java.util.logging.Level;
import com.yahoo.yolean.Exceptions;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
import org.apache.hc.core5.util.Timeout;
import java.io.IOException;
@@ -31,6 +38,7 @@ public abstract class HttpMetricFetcher {
// The call to apache will do 3 retries. As long as we check the services in series, we can't have this too high.
public static int CONNECTION_TIMEOUT = 5000;
private final static int SOCKET_TIMEOUT = 60000;
+ private final static int BUFFER_SIZE = 0x40000; // 256k
private final URI url;
protected final VespaService service;
private static final CloseableHttpAsyncClient httpClient = createHttpClient();
@@ -47,14 +55,50 @@ public abstract class HttpMetricFetcher {
log.log(Level.FINE, "Fetching metrics from " + u + " with timeout " + CONNECTION_TIMEOUT);
}
- byte [] getJson() throws IOException {
+ InputStream getJson() throws IOException {
log.log(Level.FINE, "Connecting to url " + url + " for service '" + service + "'");
- Future<SimpleHttpResponse> response = httpClient.execute(new SimpleHttpRequest("GET", url), null);
- try {
- return response.get().getBodyBytes();
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException("Failed fetching '" + url + "': " + e);
- }
+ PipedInputStream input = new PipedInputStream(BUFFER_SIZE);
+ final PipedOutputStream output = new PipedOutputStream(input);
+ Future<Void> response = httpClient.execute(
+ new BasicRequestProducer(Method.GET, url),
+ new AbstractBinResponseConsumer<Void>(){
+ @Override
+ public void releaseResources() {
+ try {
+ output.close();
+ } catch (IOException e) {
+ System.out.println("releaseResources -> close failed");
+ }
+ }
+
+ @Override
+ protected int capacityIncrement() {
+ return BUFFER_SIZE;
+ }
+
+ @Override
+ protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
+ byte [] backingArray = src.array();
+ int offset = src.arrayOffset();
+ output.write(backingArray, offset, src.remaining());
+ src.position(src.limit());
+ output.flush();
+ if (endOfStream) {
+ output.close();
+ }
+ }
+
+ @Override
+ protected void start(HttpResponse response, ContentType contentType) throws HttpException, IOException {
+
+ }
+
+ @Override
+ protected Void buildResult() {
+ return null;
+ }
+ }, null);
+ return input;
}
public String toString() {
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java
index e43aab8b26f..20c2325e1f3 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java
@@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -32,6 +33,9 @@ public class MetricsParser {
static Metrics parse(byte [] data) throws IOException {
return parse(jsonMapper.createParser(data));
}
+ static Metrics parse(InputStream data) throws IOException {
+ return parse(jsonMapper.createParser(data));
+ }
private static Metrics parse(JsonParser parser) throws IOException {
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected start of object, got " + parser.currentToken());
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
index f078081c430..bf5ae0769d5 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteHealthMetricFetcher.java
@@ -5,6 +5,8 @@ import ai.vespa.metricsproxy.metric.HealthMetric;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import java.util.logging.Level;
import java.io.IOException;
@@ -29,32 +31,29 @@ public class RemoteHealthMetricFetcher extends HttpMetricFetcher {
* Connect to remote service over http and fetch metrics
*/
public HealthMetric getHealth(int fetchCount) {
- byte [] data = {'{', '}'};
try {
- data = getJson();
+ return createHealthMetrics(getJson(), fetchCount);
} catch (IOException e) {
logMessageNoResponse(errMsgNoResponse(e), fetchCount);
+ byte [] empty = {'{','}'};
+ return createHealthMetrics(new ByteArrayInputStream(empty), fetchCount);
}
- return createHealthMetrics(data, fetchCount);
}
/**
* Connect to remote service over http and fetch metrics
*/
- private HealthMetric createHealthMetrics(byte [] data, int fetchCount) {
- HealthMetric healthMetric = HealthMetric.getDown("Failed fetching status page for service");
+ private HealthMetric createHealthMetrics(InputStream data, int fetchCount) {
try {
- healthMetric = parse(data);
+ return parse(data);
} catch (Exception e) {
handleException(e, data, fetchCount);
+ return HealthMetric.getDown("Failed fetching status page for service");
}
- return healthMetric;
}
- private HealthMetric parse(byte [] data) {
- if ((data == null) || (data.length == 0)) {
- return HealthMetric.getUnknown("Empty response from status page");
- }
+
+ private HealthMetric parse(InputStream data) {
try {
JsonNode o = jsonMapper.readTree(data);
JsonNode status = o.get("status");
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
index 787f0e33157..c071cdb8fb9 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java
@@ -4,6 +4,7 @@ package ai.vespa.metricsproxy.service;
import ai.vespa.metricsproxy.metric.Metrics;
import java.io.IOException;
+import java.io.InputStream;
/**
* Fetch metrics for a given vespa service
@@ -39,7 +40,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher {
return remoteMetrics;
}
- Metrics createMetrics(byte [] data, int fetchCount) {
+ Metrics createMetrics(InputStream data, int fetchCount) {
Metrics remoteMetrics = new Metrics();
try {
remoteMetrics = MetricsParser.parse(data);