// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.metrics.simple; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import com.yahoo.concurrent.ThreadLocalDirectory; import com.yahoo.metrics.ManagerConfig; /** * Worker thread to collect the data stored in worker threads and build * snapshots for external consumption. Using the correct executor gives the * necessary guarantuees for this being invoked from only a single thread. * * @author Steinar Knutsen */ class MetricAggregator implements Runnable { private final ThreadLocalDirectory metricsCollection; private final AtomicReference currentSnapshot; private int generation = 0; private final Bucket[] buffer; private long fromMillis; private final DimensionCache dimensions; MetricAggregator(ThreadLocalDirectory metricsCollection, AtomicReference currentSnapshot, ManagerConfig settings) { if (settings.reportPeriodSeconds() < 10) { throw new IllegalArgumentException( "Do not use this metrics implementation" + " if report periods of less than 10 seconds is desired."); } buffer = new Bucket[settings.reportPeriodSeconds()]; dimensions = new DimensionCache(settings.pointsToKeepPerMetric()); fromMillis = System.currentTimeMillis(); this.metricsCollection = metricsCollection; this.currentSnapshot = currentSnapshot; } @Override public void run() { Bucket toDelete = updateBuffer(); createSnapshot(toDelete); } private void createSnapshot(Bucket toDelete) { final Bucket toPresent = new Bucket(); for (Bucket b : buffer) { if (b == null) { continue; } toPresent.merge(b); } dimensions.updateDimensionPersistence(toDelete, toPresent); currentSnapshot.set(toPresent); } private Bucket updateBuffer() { List buckets = metricsCollection.fetch(); final long toMillis = System.currentTimeMillis(); final int bucketIndex = generation++ % buffer.length; Bucket bucketToDelete = buffer[bucketIndex]; Bucket latest = new Bucket(fromMillis, toMillis); for (Bucket b : buckets) { latest.merge(b, true); } buffer[bucketIndex] = latest; this.fromMillis = toMillis; return bucketToDelete; } }