diff options
Diffstat (limited to 'metrics-proxy/src/main/java/ai/vespa')
5 files changed, 89 insertions, 72 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java index ebb5d2fe8fb..641e771aafa 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/core/VespaMetrics.java @@ -75,9 +75,9 @@ public class VespaMetrics { systemCheck.ifPresent(metricsPackets::add); MetricAggregator aggregator = new MetricAggregator(service.getDimensions()); - MetricsParser.Consumer metricsConsumer = (consumerId != null) - ? new GetServiceMetricsConsumer(metricsConsumers, aggregator, consumerId) - : new GetServiceMetricsConsumerForAll(metricsConsumers, aggregator); + MetricsParser.Collector metricsConsumer = (consumerId != null) + ? new ServiceMetricsCollector(metricsConsumers, aggregator, consumerId) + : new ServiceMetricsCollectorForAll(metricsConsumers, aggregator); service.consumeMetrics(metricsConsumer); if (! aggregator.getAggregated().isEmpty()) { @@ -118,10 +118,10 @@ public class VespaMetrics { * In order to include a metric, it must exist in the given map of metric to consumers. * Each returned metric will contain a collection of consumers that it should be routed to. */ - private static abstract class GetServiceMetricsConsumerBase implements MetricsParser.Consumer { + private static abstract class ServiceMetricsCollectorBase implements MetricsParser.Collector { protected final MetricAggregator aggregator; - GetServiceMetricsConsumerBase(MetricAggregator aggregator) { + ServiceMetricsCollectorBase(MetricAggregator aggregator) { this.aggregator = aggregator; } @@ -150,18 +150,18 @@ public class VespaMetrics { } } - private static class GetServiceMetricsConsumer extends GetServiceMetricsConsumerBase { + private static class ServiceMetricsCollector extends ServiceMetricsCollectorBase { private final Map<MetricId, ConfiguredMetric> configuredMetrics; private final Set<ConsumerId> consumerId; - GetServiceMetricsConsumer(MetricsConsumers metricsConsumers, MetricAggregator aggregator, ConsumerId consumerId) { + ServiceMetricsCollector(MetricsConsumers metricsConsumers, MetricAggregator aggregator, ConsumerId consumerId) { super(aggregator); this.consumerId = Set.of(consumerId); this.configuredMetrics = metricsConsumers.getMetricsForConsumer(consumerId); } @Override - public void consume(Metric candidate) { + public void accept(Metric candidate) { ConfiguredMetric configuredMetric = configuredMetrics.get(candidate.getName()); if (configuredMetric != null) { aggregator.aggregate( @@ -170,16 +170,16 @@ public class VespaMetrics { } } - private static class GetServiceMetricsConsumerForAll extends GetServiceMetricsConsumerBase { + private static class ServiceMetricsCollectorForAll extends ServiceMetricsCollectorBase { private final MetricsConsumers metricsConsumers; - GetServiceMetricsConsumerForAll(MetricsConsumers metricsConsumers, MetricAggregator aggregator) { + ServiceMetricsCollectorForAll(MetricsConsumers metricsConsumers, MetricAggregator aggregator) { super(aggregator); this.metricsConsumers = metricsConsumers; } @Override - public void consume(Metric candidate) { + public void accept(Metric candidate) { Map<ConfiguredMetric, Set<ConsumerId>> consumersByMetric = metricsConsumers.getConsumersByMetric(candidate.getName()); if (consumersByMetric != null) { consumersByMetric.keySet().forEach( @@ -234,11 +234,11 @@ public class VespaMetrics { .statusMessage("Data collected successfully"); } - private class MetricStringBuilder implements MetricsParser.Consumer { + private class MetricStringBuilder implements MetricsParser.Collector { private final StringBuilder sb = new StringBuilder(); private VespaService service; @Override - public void consume(Metric metric) { + public void accept(Metric metric) { MetricId key = metric.getName(); MetricId alias = key; @@ -272,7 +272,7 @@ public class VespaMetrics { return msb.toString(); } - private class MetricNamesBuilder implements MetricsParser.Consumer { + private class MetricNamesBuilder implements MetricsParser.Collector { private final StringBuilder bufferOn = new StringBuilder(); private final StringBuilder bufferOff = new StringBuilder(); private final ConsumerId consumer; @@ -280,7 +280,7 @@ public class VespaMetrics { this.consumer = consumer; } @Override - public void consume(Metric m) { + public void accept(Metric m) { String description = m.getDescription(); MetricId alias = MetricId.empty; boolean isForwarded = false; diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java index db7caa84bac..3d5ced63ab9 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/DummyMetricsFetcher.java @@ -19,6 +19,6 @@ public class DummyMetricsFetcher extends RemoteMetricsFetcher { /** * Connect to remote service over http and fetch metrics */ - public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) { + public void getMetrics(MetricsParser.Collector consumer, int fetchCount) { } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java index bb21485e0e7..e8f6459a1bc 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/MetricsParser.java @@ -13,8 +13,10 @@ import java.io.InputStream; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static ai.vespa.metricsproxy.metric.model.DimensionId.toDimensionId; @@ -25,20 +27,22 @@ import static ai.vespa.metricsproxy.metric.model.DimensionId.toDimensionId; * @author Jo Kristian Bergum */ public class MetricsParser { - public interface Consumer { - void consume(Metric metric); + public interface Collector { + void accept(Metric metric); } private static final ObjectMapper jsonMapper = new ObjectMapper(); - public static void parse(String data, Consumer consumer) throws IOException { + public static void parse(String data, Collector consumer) throws IOException { parse(jsonMapper.createParser(data), consumer); } - static void parse(InputStream data, Consumer consumer) throws IOException { + static void parse(InputStream data, Collector consumer) throws IOException { parse(jsonMapper.createParser(data), consumer); } - private static void parse(JsonParser parser, Consumer consumer) throws IOException { + + // Top level 'metrics' object, with e.g. 'time', 'status' and 'metrics'. + private static void parse(JsonParser parser, Collector consumer) throws IOException { if (parser.nextToken() != JsonToken.START_OBJECT) { throw new IOException("Expected start of object, got " + parser.currentToken()); } @@ -55,6 +59,7 @@ public class MetricsParser { } } } + static private Instant parseSnapshot(JsonParser parser) throws IOException { if (parser.getCurrentToken() != JsonToken.START_OBJECT) { throw new IOException("Expected start of 'snapshot' object, got " + parser.currentToken()); @@ -75,59 +80,97 @@ public class MetricsParser { return timestamp; } - static private void parseMetricValues(JsonParser parser, Instant timestamp, Consumer consumer) throws IOException { + // 'metrics' object with 'snapshot' and 'values' arrays + static private void parseMetrics(JsonParser parser, Collector consumer) throws IOException { + if (parser.getCurrentToken() != JsonToken.START_OBJECT) { + throw new IOException("Expected start of 'metrics' object, got " + parser.currentToken()); + } + Instant timestamp = Instant.now(); + for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { + String fieldName = parser.getCurrentName(); + JsonToken token = parser.nextToken(); + if (fieldName.equals("snapshot")) { + timestamp = parseSnapshot(parser); + } else if (fieldName.equals("values")) { + parseMetricValues(parser, timestamp, consumer); + } else { + if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { + parser.skipChildren(); + } + } + } + } + + // 'values' array + static private void parseMetricValues(JsonParser parser, Instant timestamp, Collector consumer) throws IOException { if (parser.getCurrentToken() != JsonToken.START_ARRAY) { throw new IOException("Expected start of 'metrics:values' array, got " + parser.currentToken()); } - Map<Long, Map<DimensionId, String>> uniqueDimensions = new HashMap<>(); + Map<Set<Dimension>, Map<DimensionId, String>> uniqueDimensions = new HashMap<>(); while (parser.nextToken() == JsonToken.START_OBJECT) { handleValue(parser, timestamp, consumer, uniqueDimensions); } } - static private void parseMetrics(JsonParser parser, Consumer consumer) throws IOException { - if (parser.getCurrentToken() != JsonToken.START_OBJECT) { - throw new IOException("Expected start of 'metrics' object, got " + parser.currentToken()); - } - Instant timestamp = Instant.now(); + // One item in the 'values' array, where each item has 'name', 'values' and 'dimensions' + static private void handleValue(JsonParser parser, Instant timestamp, Collector consumer, + Map<Set<Dimension>, Map<DimensionId, String>> uniqueDimensions) throws IOException { + String name = ""; + String description = ""; + Map<DimensionId, String> dim = Map.of(); + List<Map.Entry<String, Number>> values = List.of(); for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { String fieldName = parser.getCurrentName(); JsonToken token = parser.nextToken(); - if (fieldName.equals("snapshot")) { - timestamp = parseSnapshot(parser); + if (fieldName.equals("name")) { + name = parser.getText(); + } else if (fieldName.equals("description")) { + description = parser.getText(); + } else if (fieldName.equals("dimensions")) { + dim = parseDimensions(parser, uniqueDimensions); } else if (fieldName.equals("values")) { - parseMetricValues(parser, timestamp, consumer); + values = parseValues(name+".", parser); } else { if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { parser.skipChildren(); } } } + for (Map.Entry<String, Number> value : values) { + consumer.accept(new Metric(MetricId.toMetricId(value.getKey()), value.getValue(), timestamp, dim, description)); + } } private static Map<DimensionId, String> parseDimensions(JsonParser parser, - Map<Long, Map<DimensionId, String>> uniqueDimensions) throws IOException { - List<Map.Entry<String, String>> dims = new ArrayList<>(); - int keyHash = 0; - int valueHash = 0; + Map<Set<Dimension>, Map<DimensionId, String>> uniqueDimensions) throws IOException { + + Set<Dimension> dimensions = new HashSet<>(); + for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { String fieldName = parser.getCurrentName(); JsonToken token = parser.nextToken(); + if (token == JsonToken.VALUE_STRING){ String value = parser.getValueAsString(); - dims.add(Map.entry(fieldName, value)); - keyHash ^= fieldName.hashCode(); - valueHash ^= value.hashCode(); + dimensions.add(Dimension.of(fieldName, value)); } else if (token == JsonToken.VALUE_NULL) { // TODO Should log a warning if this happens } else { throw new IllegalArgumentException("Dimension '" + fieldName + "' must be a string"); } } - Long uniqueKey = (((long) keyHash) << 32) | (valueHash & 0xffffffffL); - return uniqueDimensions.computeIfAbsent(uniqueKey, key -> dims.stream().collect(Collectors.toUnmodifiableMap(e -> toDimensionId(e.getKey()), Map.Entry::getValue))); + return uniqueDimensions.computeIfAbsent(dimensions, + key -> dimensions.stream().collect(Collectors.toUnmodifiableMap( + dim -> toDimensionId(dim.id), dim -> dim.value))); + } + + record Dimension(String id, String value) { + static Dimension of(String id, String value) { + return new Dimension(id, value); + } } + private static List<Map.Entry<String, Number>> parseValues(String prefix, JsonParser parser) throws IOException { List<Map.Entry<String, Number>> metrics = new ArrayList<>(); for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { @@ -144,31 +187,5 @@ public class MetricsParser { } return metrics; } - static private void handleValue(JsonParser parser, Instant timestamp, Consumer consumer, - Map<Long, Map<DimensionId, String>> uniqueDimensions) throws IOException { - String name = ""; - String description = ""; - Map<DimensionId, String> dim = Map.of(); - List<Map.Entry<String, Number>> values = List.of(); - for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) { - String fieldName = parser.getCurrentName(); - JsonToken token = parser.nextToken(); - if (fieldName.equals("name")) { - name = parser.getText(); - } else if (fieldName.equals("description")) { - description = parser.getText(); - } else if (fieldName.equals("dimensions")) { - dim = parseDimensions(parser, uniqueDimensions); - } else if (fieldName.equals("values")) { - values = parseValues(name+".", parser); - } else { - if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { - parser.skipChildren(); - } - } - } - for (Map.Entry<String, Number> value : values) { - consumer.consume(new Metric(MetricId.toMetricId(value.getKey()), value.getValue(), timestamp, dim, description)); - } - } + } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java index cd05945f4f1..7a0cca26afe 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/RemoteMetricsFetcher.java @@ -24,7 +24,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { /** * Connect to remote service over http and fetch metrics */ - public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) { + public void getMetrics(MetricsParser.Collector consumer, int fetchCount) { try (CloseableHttpResponse response = getResponse()) { HttpEntity entity = response.getEntity(); try { @@ -37,7 +37,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher { } catch (IOException ignored) {} } - void createMetrics(String data, MetricsParser.Consumer consumer, int fetchCount) throws IOException { + void createMetrics(String data, MetricsParser.Collector consumer, int fetchCount) throws IOException { MetricsParser.parse(data, consumer); } } diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java index 666fa31b7ed..8dd8d002c84 100644 --- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java +++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/service/VespaService.java @@ -137,15 +137,15 @@ public class VespaService implements Comparable<VespaService> { * Get the Metrics registered for this service. Metrics are fetched over HTTP * if a metric http port has been defined, otherwise from log file */ - public void consumeMetrics(MetricsParser.Consumer consumer) { + public void consumeMetrics(MetricsParser.Collector consumer) { remoteMetricsFetcher.getMetrics(consumer, metricsFetchCount.get()); metricsFetchCount.getAndIncrement(); } - private static class CollectMetrics implements MetricsParser.Consumer { + private static class CollectMetrics implements MetricsParser.Collector { private final Metrics metrics = new Metrics(); @Override - public void consume(Metric metric) { + public void accept(Metric metric) { metrics.add(metric); } } |