aboutsummaryrefslogtreecommitdiffstats
path: root/simplemetrics/src/main/java/com/yahoo/metrics/simple/MetricAggregator.java
diff options
context:
space:
mode:
Diffstat (limited to 'simplemetrics/src/main/java/com/yahoo/metrics/simple/MetricAggregator.java')
-rw-r--r--simplemetrics/src/main/java/com/yahoo/metrics/simple/MetricAggregator.java71
1 files changed, 71 insertions, 0 deletions
diff --git a/simplemetrics/src/main/java/com/yahoo/metrics/simple/MetricAggregator.java b/simplemetrics/src/main/java/com/yahoo/metrics/simple/MetricAggregator.java
new file mode 100644
index 00000000000..a388b1a1cc4
--- /dev/null
+++ b/simplemetrics/src/main/java/com/yahoo/metrics/simple/MetricAggregator.java
@@ -0,0 +1,71 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.metrics.simple;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.yahoo.concurrent.ThreadLocalDirectory;
+import com.yahoo.metrics.ManagerConfig;
+
+/**
+ * Worker thread to collect the data stored in worker threads and build
+ * snapshots for external consumption. Using the correct executor gives the
+ * necessary guarantuess for this being invoked from only a single thread.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+class MetricAggregator implements Runnable {
+ private final ThreadLocalDirectory<Bucket, Sample> metricsCollection;
+ private final AtomicReference<Bucket> currentSnapshot;
+ private int generation = 0;
+ private final Bucket[] buffer;
+ private long fromMillis;
+ private final DimensionCache dimensions;
+
+ MetricAggregator(ThreadLocalDirectory<Bucket, Sample> metricsCollection, AtomicReference<Bucket> currentSnapshot,
+ ManagerConfig settings) {
+ if (settings.reportPeriodSeconds() < 10) {
+ throw new IllegalArgumentException(
+ "Do not use this metrics implementation"
+ + " if report periods of less than 10 seconds is desired.");
+ }
+ buffer = new Bucket[settings.reportPeriodSeconds()];
+ dimensions = new DimensionCache(settings.pointsToKeepPerMetric());
+ fromMillis = System.currentTimeMillis();
+ this.metricsCollection = metricsCollection;
+ this.currentSnapshot = currentSnapshot;
+ }
+
+ @Override
+ public void run() {
+ Bucket toDelete = updateBuffer();
+ createSnapshot(toDelete);
+ }
+
+ private void createSnapshot(Bucket toDelete) {
+ final Bucket toPresent = new Bucket();
+ for (Bucket b : buffer) {
+ if (b == null) {
+ continue;
+ }
+ toPresent.merge(b);
+ }
+ dimensions.updateDimensionPersistence(toDelete, toPresent);
+ currentSnapshot.set(toPresent);
+ }
+
+ private Bucket updateBuffer() {
+ List<Bucket> buckets = metricsCollection.fetch();
+ final long toMillis = System.currentTimeMillis();
+ final int bucketIndex = generation++ % buffer.length;
+ Bucket bucketToDelete = buffer[bucketIndex];
+ Bucket latest = new Bucket(fromMillis, toMillis);
+ for (Bucket b : buckets) {
+ latest.merge(b, true);
+ }
+ buffer[bucketIndex] = latest;
+ this.fromMillis = toMillis;
+ return bucketToDelete;
+ }
+
+}