diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-09-09 13:35:29 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-09 13:35:29 +0200 |
commit | 98168a895d8b10f0d3717b80e366dcfeb1fafb7f (patch) | |
tree | a82041865cebfa8c74f8d23423ebfba6e52b00ff /metrics-proxy/src/main/java/ai | |
parent | 09d858d307b1ea04db5229d5844a8cb6fcc00495 (diff) |
Revert "Consume and process metrics as they are parsed."
Diffstat (limited to 'metrics-proxy/src/main/java/ai')
5 files changed, 128 insertions, 141 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java index a8ef79d827e..f112acd6e05 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java @@ -12,7 +12,6 @@ import ai.vespa.metricsproxy.metric.model.Dimension; import ai.vespa.metricsproxy.metric.model.DimensionId; import ai.vespa.metricsproxy.metric.model.MetricId; import ai.vespa.metricsproxy.metric.model.MetricsPacket; -import ai.vespa.metricsproxy.service.MetricsParser; import ai.vespa.metricsproxy.service.VespaService; import java.util.ArrayList; @@ -76,20 +75,23 @@ public class VespaMetrics { Optional<MetricsPacket.Builder> systemCheck = getSystemMetrics(service); systemCheck.ifPresent(metricsPackets::add); - MetricAggregator aggregator = new MetricAggregator(service.getDimensions()); - GetServiceMetricsConsumer metricsConsumer = new GetServiceMetricsConsumer(consumersByMetric, aggregator); - service.consumeMetrics(metricsConsumer); + Metrics allServiceMetrics = service.getMetrics(); - if (! aggregator.getAggregated().isEmpty()) { + if (! allServiceMetrics.getMetrics().isEmpty()) { + Metrics serviceMetrics = getServiceMetrics(allServiceMetrics, consumersByMetric); // One metrics packet per set of metrics that share the same dimensions+consumers + // TODO: Move aggregation into MetricsPacket itself? + MetricAggregator aggregator = new MetricAggregator(service.getDimensions()); + serviceMetrics.getMetrics().forEach(metric -> aggregator.aggregate(metric)); + aggregator.getAggregated().forEach((aggregationKey, metrics) -> { MetricsPacket.Builder builder = new MetricsPacket.Builder(service.getMonitoringName()) .putMetrics(metrics) .putDimension(METRIC_TYPE_DIMENSION_ID, "standard") .putDimension(INSTANCE_DIMENSION_ID, service.getInstanceName()) .putDimensions(aggregationKey.getDimensions()); - setMetaInfo(builder, metrics.get(0).getTimeStamp()); + setMetaInfo(builder, serviceMetrics.getTimeStamp()); builder.addConsumers(aggregationKey.getConsumers()); metricsPackets.add(builder); }); @@ -118,20 +120,15 @@ public class VespaMetrics { * In order to include a metric, it must exist in the given map of metric to consumers. * Each returned metric will contain a collection of consumers that it should be routed to. */ - private class GetServiceMetricsConsumer implements MetricsParser.Consumer { - private final MetricAggregator aggregator; - private final Map<ConfiguredMetric, Set<ConsumerId>> consumersByMetric; - GetServiceMetricsConsumer(Map<ConfiguredMetric, Set<ConsumerId>> consumersByMetric, MetricAggregator aggregator) { - this.consumersByMetric = consumersByMetric; - this.aggregator = aggregator; - } - - @Override - public void consume(Metric candidate) { + private Metrics getServiceMetrics(Metrics allServiceMetrics, Map<ConfiguredMetric, Set<ConsumerId>> consumersByMetric) { + Metrics configuredServiceMetrics = new Metrics(); + configuredServiceMetrics.setTimeStamp(getMostRecentTimestamp(allServiceMetrics)); + for (Metric candidate : allServiceMetrics.getMetrics()) { getConfiguredMetrics(candidate.getName(), consumersByMetric.keySet()).forEach( - configuredMetric -> aggregator.aggregate( + configuredMetric -> configuredServiceMetrics.add( metricWithConfigProperties(candidate, configuredMetric, consumersByMetric))); } + return configuredServiceMetrics; } private Map<DimensionId, String> extractDimensions(Map<DimensionId, String> dimensions, List<Dimension> configuredDimensions) { @@ -188,6 +185,16 @@ public class VespaMetrics { return Optional.of(builder); } + private long getMostRecentTimestamp(Metrics metrics) { + long mostRecentTimestamp = 0L; + for (Metric metric : metrics.getMetrics()) { + if (metric.getTimeStamp() > mostRecentTimestamp) { + mostRecentTimestamp = metric.getTimeStamp(); + } + } + return mostRecentTimestamp; + } + private static class MetricAggregator { private final Map<AggregationKey, List<Metric>> aggregated = new HashMap<>(); private final Map<DimensionId, String> serviceDimensions; @@ -211,6 +218,15 @@ public class VespaMetrics { } } + private Map<AggregationKey, List<Metric>> aggregateMetrics(Map<DimensionId, String> serviceDimensions, Metrics metrics) { + MetricAggregator aggregator = new MetricAggregator(serviceDimensions); + + for (Metric metric : metrics.getMetrics() ) { + aggregator.aggregate(metric); + } + return aggregator.getAggregated(); + } + private List<ConfiguredMetric> getMetricDefinitions(ConsumerId consumer) { if (metricsConsumers == null) return Collections.emptyList(); @@ -224,100 +240,75 @@ public class VespaMetrics { .statusMessage("Data collected successfully"); } - private class MetricStringBuilder implements MetricsParser.Consumer { - private final StringBuilder sb = new StringBuilder(); - private VespaService service; - @Override - public void consume(Metric metric) { - MetricId key = metric.getName(); - MetricId alias = key; - - boolean isForwarded = false; - for (ConfiguredMetric metricConsumer : getMetricDefinitions(vespaMetricsConsumerId)) { - if (metricConsumer.id().equals(key)) { - alias = metricConsumer.outputname(); - isForwarded = true; - } - } - if (isForwarded) { - sb.append(formatter.format(service, alias.id, metric.getValue())).append(" "); - } - } - - @Override - public String toString() { - return sb.toString(); - } - } /** * Returns a string representation of metrics for the given services; * a space separated list of key=value. */ public String getMetricsAsString(List<VespaService> services) { - MetricStringBuilder msb = new MetricStringBuilder(); - for (VespaService service : services) { - msb.service = service; - service.consumeMetrics(msb); - } - return msb.toString(); - } - - private class MetricNamesBuilder implements MetricsParser.Consumer { - private final StringBuilder bufferOn = new StringBuilder(); - private final StringBuilder bufferOff = new StringBuilder(); - private final ConsumerId consumer; - MetricNamesBuilder(ConsumerId consumer) { - this.consumer = consumer; - } - @Override - public void consume(Metric m) { - String description = m.getDescription(); - MetricId alias = MetricId.empty; - boolean isForwarded = false; - - for (ConfiguredMetric metric : getMetricDefinitions(consumer)) { - if (metric.id().equals(m.getName())) { - alias = metric.outputname(); - isForwarded = true; - if (description.isEmpty()) { - description = metric.description(); + StringBuilder b = new StringBuilder(); + for (VespaService s : services) { + for (Metric metric : s.getMetrics().getMetrics()) { + MetricId key = metric.getName(); + MetricId alias = key; + + boolean isForwarded = false; + for (ConfiguredMetric metricConsumer : getMetricDefinitions(vespaMetricsConsumerId)) { + if (metricConsumer.id().equals(key)) { + alias = metricConsumer.outputname(); + isForwarded = true; } } + if (isForwarded) { + b.append(formatter.format(s, alias.id, metric.getValue())).append(" "); + } } - - String message = "OFF"; - StringBuilder buffer = bufferOff; - if (isForwarded) { - buffer = bufferOn; - message = "ON"; - } - buffer.append(m.getName()).append('=').append(message); - if (!description.isEmpty()) { - buffer.append(";description=").append(description); - } - if (!alias.id.isEmpty()) { - buffer.append(";output-name=").append(alias); - } - buffer.append(','); - } - - @Override - public String toString() { - return bufferOn.append(bufferOff).toString(); } + return b.toString(); } + /** * Get all metric names for the given services * * @return String representation */ public String getMetricNames(List<VespaService> services, ConsumerId consumer) { - MetricNamesBuilder metricNamesBuilder = new MetricNamesBuilder(consumer); - for (VespaService service : services) { - service.consumeMetrics(metricNamesBuilder); + StringBuilder bufferOn = new StringBuilder(); + StringBuilder bufferOff = new StringBuilder(); + for (VespaService s : services) { + + for (Metric m : s.getMetrics().getMetrics()) { + String description = m.getDescription(); + MetricId alias = MetricId.empty; + boolean isForwarded = false; + + for (ConfiguredMetric metric : getMetricDefinitions(consumer)) { + if (metric.id().equals(m.getName())) { + alias = metric.outputname(); + isForwarded = true; + if (description.isEmpty()) { + description = metric.description(); + } + } + } + + String message = "OFF"; + StringBuilder buffer = bufferOff; + if (isForwarded) { + buffer = bufferOn; + message = "ON"; + } + buffer.append(m.getName()).append('=').append(message); + if (!description.isEmpty()) { + buffer.append(";description=").append(description); + } + if (!alias.id.isEmpty()) { + buffer.append(";output-name=").append(alias); + } + buffer.append(','); + } } - return metricNamesBuilder.toString(); + return bufferOn.append(bufferOff).toString(); } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java index e9fbc942e6e..b304e5d74d3 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java @@ -1,6 +1,8 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.metricsproxy.service; +import ai.vespa.metricsproxy.metric.Metrics; + /** * Dummy class used for getting health status for a vespa service that has no HTTP service * for getting metrics @@ -19,6 +21,7 @@ public class DummyMetricsFetcher extends RemoteMetricsFetcher { /** * Connect to remote service over http and fetch metrics */ - public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) { + public Metrics getMetrics(int fetchCount) { + return new Metrics(); } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java index 22812c07b78..48621b9abab 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java @@ -2,6 +2,7 @@ package ai.vespa.metricsproxy.service; import ai.vespa.metricsproxy.metric.Metric; +import ai.vespa.metricsproxy.metric.Metrics; import ai.vespa.metricsproxy.metric.model.DimensionId; import ai.vespa.metricsproxy.metric.model.MetricId; import com.fasterxml.jackson.core.JsonParser; @@ -24,61 +25,59 @@ import static ai.vespa.metricsproxy.metric.model.DimensionId.toDimensionId; * @author Jo Kristian Bergum */ public class MetricsParser { - public interface Consumer { - void consume(Metric metric); - } private static final ObjectMapper jsonMapper = new ObjectMapper(); - static void parse(String data, Consumer consumer) throws IOException { - parse(jsonMapper.createParser(data), consumer); + static Metrics parse(String data) throws IOException { + return parse(jsonMapper.createParser(data)); } - static void parse(InputStream data, Consumer consumer) throws IOException { - parse(jsonMapper.createParser(data), consumer); + static Metrics parse(InputStream data) throws IOException { + return parse(jsonMapper.createParser(data)); } - private static void parse(JsonParser parser, Consumer consumer) throws IOException { + private static Metrics parse(JsonParser parser) throws IOException { if (parser.nextToken() != JsonToken.START_OBJECT) { throw new IOException("Expected start of object, got " + parser.currentToken()); } + Metrics metrics = new Metrics(); for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { String fieldName = parser.getCurrentName(); JsonToken token = parser.nextToken(); if (fieldName.equals("metrics")) { - parseMetrics(parser, consumer); + metrics = parseMetrics(parser); } else { if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { parser.skipChildren(); } } } + return metrics; } - private static long secondsSince1970UTC() { - return System.currentTimeMillis() / 1000L; - } - static private long parseSnapshot(JsonParser parser) throws IOException { + + static private Metrics parseSnapshot(JsonParser parser) throws IOException { if (parser.getCurrentToken() != JsonToken.START_OBJECT) { throw new IOException("Expected start of 'snapshot' object, got " + parser.currentToken()); } - long timestamp = secondsSince1970UTC(); + Metrics metrics = new Metrics(); for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { String fieldName = parser.getCurrentName(); JsonToken token = parser.nextToken(); if (fieldName.equals("to")) { - timestamp = parser.getLongValue(); + long timestamp = parser.getLongValue(); long now = System.currentTimeMillis() / 1000; timestamp = Metric.adjustTime(timestamp, now); + metrics = new Metrics(timestamp); } else { if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { parser.skipChildren(); } } } - return timestamp; + return metrics; } - static private void parseValues(JsonParser parser, long timestamp, Consumer consumer) throws IOException { + static private void parseValues(JsonParser parser, Metrics metrics) throws IOException { if (parser.getCurrentToken() != JsonToken.START_ARRAY) { throw new IOException("Expected start of 'metrics:values' array, got " + parser.currentToken()); } @@ -88,34 +87,34 @@ public class MetricsParser { // read everything from this START_OBJECT to the matching END_OBJECT // and return it as a tree model ObjectNode JsonNode value = jsonMapper.readTree(parser); - handleValue(value, timestamp, consumer, uniqueDimensions); + handleValue(value, metrics.getTimeStamp(), metrics, uniqueDimensions); // do whatever you need to do with this object } } - static private void parseMetrics(JsonParser parser, Consumer consumer) throws IOException { + static private Metrics parseMetrics(JsonParser parser) throws IOException { if (parser.getCurrentToken() != JsonToken.START_OBJECT) { throw new IOException("Expected start of 'metrics' object, got " + parser.currentToken()); } - long timestamp = System.currentTimeMillis() / 1000L; + Metrics metrics = new Metrics(); for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { String fieldName = parser.getCurrentName(); JsonToken token = parser.nextToken(); if (fieldName.equals("snapshot")) { - timestamp = parseSnapshot(parser); + metrics = parseSnapshot(parser); } else if (fieldName.equals("values")) { - parseValues(parser, timestamp, consumer); + parseValues(parser, metrics); } else { if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { parser.skipChildren(); } } } + return metrics; } - static private void handleValue(JsonNode metric, long timestamp, Consumer consumer, - Map<String, Map<DimensionId, String>> uniqueDimensions) { + static private void handleValue(JsonNode metric, long timestamp, Metrics metrics, Map<String, Map<DimensionId, String>> uniqueDimensions) { String name = metric.get("name").textValue(); String description = ""; @@ -156,7 +155,7 @@ public class MetricsParser { throw new IllegalArgumentException("Value for aggregator '" + aggregator + "' is not a number"); } String metricName = new StringBuilder().append(name).append(".").append(aggregator).toString(); - consumer.consume(new Metric(MetricId.toMetricId(metricName), value, timestamp, dim, description)); + metrics.add(new Metric(MetricId.toMetricId(metricName), value, timestamp, dim, description)); } } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java index 8acaa0fb58e..f2cb5c4e8b3 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java @@ -23,25 +23,32 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { /** * Connect to remote service over http and fetch metrics */ - public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) { + public Metrics getMetrics(int fetchCount) { try { - createMetrics(getJson(), consumer, fetchCount); + return createMetrics(getJson(), fetchCount); } catch (IOException | InterruptedException | ExecutionException e) { + return new Metrics(); } } - void createMetrics(String data, MetricsParser.Consumer consumer, int fetchCount) { + Metrics createMetrics(String data, int fetchCount) { + Metrics remoteMetrics = new Metrics(); try { - MetricsParser.parse(data, consumer); + remoteMetrics = MetricsParser.parse(data); } catch (Exception e) { handleException(e, data, fetchCount); } + + return remoteMetrics; } - private void createMetrics(InputStream data, MetricsParser.Consumer consumer, int fetchCount) { + Metrics createMetrics(InputStream data, int fetchCount) { + Metrics remoteMetrics = new Metrics(); try { - MetricsParser.parse(data, consumer); + remoteMetrics = MetricsParser.parse(data); } catch (Exception e) { handleException(e, data, fetchCount); } + + return remoteMetrics; } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java index 1b37abde3c8..9d165b2d5a9 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java @@ -2,7 +2,6 @@ package ai.vespa.metricsproxy.service; import ai.vespa.metricsproxy.metric.HealthMetric; -import ai.vespa.metricsproxy.metric.Metric; import ai.vespa.metricsproxy.metric.Metrics; import ai.vespa.metricsproxy.metric.model.DimensionId; import ai.vespa.metricsproxy.metric.model.ServiceId; @@ -138,22 +137,10 @@ public class VespaService implements Comparable<VespaService> { * * @return the non-system metrics */ - public void consumeMetrics(MetricsParser.Consumer consumer) { - remoteMetricsFetcher.getMetrics(consumer, metricsFetchCount.get()); + public Metrics getMetrics() { + Metrics remoteMetrics = remoteMetricsFetcher.getMetrics(metricsFetchCount.get()); metricsFetchCount.getAndIncrement(); - } - - private static class CollectMetrics implements MetricsParser.Consumer { - private final Metrics metrics = new Metrics(); - @Override - public void consume(Metric metric) { - metrics.add(metric); - } - } - public final Metrics getMetrics() { - CollectMetrics collector = new CollectMetrics(); - consumeMetrics(collector); - return collector.metrics; + return remoteMetrics; } /** |