// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.statistics;
import com.yahoo.component.chain.dependencies.Before;
import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.container.Server;
import com.yahoo.container.protect.Error;
import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.metrics.simple.MetricSettings;
import com.yahoo.metrics.simple.MetricReceiver;
import com.yahoo.search.Result;
import com.yahoo.search.Searcher;
import com.yahoo.search.result.ErrorHit;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.searchchain.Execution;
import com.yahoo.search.searchchain.PhaseNames;
import com.yahoo.statistics.Callback;
import com.yahoo.statistics.Counter;
import com.yahoo.statistics.Handle;
import com.yahoo.statistics.Value;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import static com.yahoo.container.protect.Error.*;
/**
*
A searcher to gather statistics such as queries completed and query latency. There
* may be more than 1 StatisticsSearcher in the Searcher chain, each identified by a
* Searcher ID. The statistics accumulated by all StatisticsSearchers are stored
* in the singleton StatisticsManager object.
*
* TODO: Fix events to handle more than one of these searchers properly.
*
* @author Gene Meyers
* @author Steinar Knutsen
* @author bergum
*/
@Before(PhaseNames.RAW_QUERY)
public class StatisticsSearcher extends Searcher {
private static final String MAX_QUERY_LATENCY_METRIC = "max_query_latency";
private static final String EMPTY_RESULTS_METRIC = "empty_results";
private static final String HITS_PER_QUERY_METRIC = "hits_per_query";
private static final String TOTALHITS_PER_QUERY_METRIC = "totalhits_per_query";
private static final String FAILED_QUERIES_METRIC = "failed_queries";
private static final String MEAN_QUERY_LATENCY_METRIC = "mean_query_latency";
private static final String QUERY_LATENCY_METRIC = "query_latency";
private static final String QUERIES_METRIC = "queries";
private static final String ACTIVE_QUERIES_METRIC = "active_queries";
private static final String PEAK_QPS_METRIC = "peak_qps";
private Counter queries; // basic counter
private Counter failedQueries; // basic counter
private Counter nullQueries; // basic counter
private Counter illegalQueries; // basic counter
private Value queryLatency; // mean pr 5 min
private Value queryLatencyBuckets;
private Value maxQueryLatency; // separate to avoid name mangling
@SuppressWarnings("unused") // all the work is done by the callback
private Value activeQueries; // raw measure every 5 minutes
private Value peakQPS; // peak 1s QPS
private Counter emptyResults; // number of results containing no concrete hits
private Value hitsPerQuery; // mean number of hits per query
private long prevMaxQPSTime; // previous measurement time of QPS
private double queriesForQPS = 0.0;
private final Object peakQpsLock = new Object();
private Metric metric;
private Map chainContexts = new CopyOnWriteHashMap<>();
private Map statePageOnlyContexts = new CopyOnWriteHashMap<>();
private void initEvents(com.yahoo.statistics.Statistics manager, MetricReceiver metricReceiver) {
queries = new Counter(QUERIES_METRIC, manager, false);
failedQueries = new Counter(FAILED_QUERIES_METRIC, manager, false);
nullQueries = new Counter("null_queries", manager, false);
illegalQueries = new Counter("illegal_queries", manager, false);
queryLatency = new Value(MEAN_QUERY_LATENCY_METRIC, manager,
new Value.Parameters().setLogRaw(false).setLogMean(true).setNameExtension(false));
maxQueryLatency = new Value(MAX_QUERY_LATENCY_METRIC, manager,
new Value.Parameters().setLogRaw(false).setLogMax(true).setNameExtension(false));
queryLatencyBuckets = Value.buildValue("query_latency", manager, null);
activeQueries = new Value(ACTIVE_QUERIES_METRIC, manager,
new Value.Parameters().setLogRaw(true).setCallback(new ActivitySampler()));
peakQPS = new Value(PEAK_QPS_METRIC, manager, new Value.Parameters().setLogRaw(false).setLogMax(true)
.setNameExtension(false));
hitsPerQuery = new Value(HITS_PER_QUERY_METRIC, manager,
new Value.Parameters().setLogRaw(false).setLogMean(true).setNameExtension(false));
emptyResults = new Counter(EMPTY_RESULTS_METRIC, manager, false);
metricReceiver.declareGauge(QUERY_LATENCY_METRIC, Optional.empty(), new MetricSettings.Builder().histogram(true).build());
}
// Callback to measure queries in flight every five minutes
private class ActivitySampler implements Callback {
public void run(Handle h, boolean firstRun) {
if (firstRun) {
metric.set(ACTIVE_QUERIES_METRIC, 0, null);
return;
}
// TODO Server.get() is to be removed
int searchQueriesInFlight = Server.get().searchQueriesInFlight();
((Value) h).put(searchQueriesInFlight);
metric.set(ACTIVE_QUERIES_METRIC, searchQueriesInFlight, null);
}
}
StatisticsSearcher(Metric metric) {
this(com.yahoo.statistics.Statistics.nullImplementation, metric, MetricReceiver.nullImplementation);
}
public StatisticsSearcher(com.yahoo.statistics.Statistics manager, Metric metric, MetricReceiver metricReceiver) {
this.metric = metric;
initEvents(manager, metricReceiver);
}
public String getMyID() {
return (getId().stringValue());
}
private void qps(long now, Metric.Context metricContext) {
// We can either have peakQpsLock _or_ have prevMaxQpsTime as a volatile
// and queriesForQPS as an AtomicInteger. That would lead no locking,
// but two memory barriers in the common case. Don't change till we know
// that is actually better.
synchronized (peakQpsLock) {
if ((now - prevMaxQPSTime) >= (1000)) {
double ms = (double) (now - prevMaxQPSTime);
final double peakQPS = queriesForQPS / (ms / 1000);
this.peakQPS.put(peakQPS);
metric.set(PEAK_QPS_METRIC, peakQPS, metricContext);
queriesForQPS = 1.0d;
prevMaxQPSTime = now;
} else {
queriesForQPS += 1.0d;
}
}
}
private Metric.Context getChainMetricContext(String chainName) {
Metric.Context context = chainContexts.get(chainName);
if (context == null) {
Map dimensions = new HashMap<>();
dimensions.put("chain", chainName);
context = this.metric.createContext(dimensions);
chainContexts.put(chainName, context);
}
return context;
}
/**
* Generate statistics for the query passing through this Searcher
* 1) Add 1 to total query count
* 2) Add response time to total response time (time from entry to return)
* 3) .....
*/
public Result search(com.yahoo.search.Query query, Execution execution) {
Metric.Context metricContext = getChainMetricContext(execution.chain().getId().stringValue());
incrQueryCount(metricContext);
logQuery(query);
long start = System.currentTimeMillis(); // Start time, in millisecs.
qps(start, metricContext);
Result result;
//handle exceptions thrown below in searchers
try {
result = execution.search(query); // Pass on down the chain
} catch (Exception e) {
incrErrorCount(null, metricContext);
throw e;
}
long end = System.currentTimeMillis(); // Start time, in millisecs.
long latency = end - start;
if (latency >= 0) {
addLatency(latency, metricContext);
} else {
getLogger().log(LogLevel.WARNING,
"Apparently negative latency measure, start: " + start
+ ", end: " + end + ", for query: " + query.toString());
}
if (result.hits().getError() != null) {
incrErrorCount(result, metricContext);
incrementStatePageOnlyErrors(result, execution);
}
int hitCount = result.getConcreteHitCount();
hitsPerQuery.put((double) hitCount);
metric.set(HITS_PER_QUERY_METRIC, (double) hitCount, metricContext);
metric.set(TOTALHITS_PER_QUERY_METRIC, (double) result.getTotalHitCount(), metricContext);
if (hitCount == 0) {
emptyResults.increment();
metric.add(EMPTY_RESULTS_METRIC, 1, metricContext);
}
// Update running averages
//setAverages();
return result;
}
private void logQuery(com.yahoo.search.Query query) {
// Don't parse the query if it's not necessary for the logging Query.toString triggers parsing
if (getLogger().isLoggable(Level.FINER)) {
getLogger().finer("Query: " + query.toString());
}
}
private void addLatency(long latency, Metric.Context metricContext) {
//myStats.addLatency(latency);
queryLatency.put(latency);
metric.set(QUERY_LATENCY_METRIC, latency, metricContext);
metric.set(MEAN_QUERY_LATENCY_METRIC, latency, metricContext);
maxQueryLatency.put(latency);
metric.set(MAX_QUERY_LATENCY_METRIC, latency, metricContext);
queryLatencyBuckets.put(latency);
}
private void incrQueryCount(Metric.Context metricContext) {
//myStats.incrQueryCnt();
queries.increment();
metric.add(QUERIES_METRIC, 1, metricContext);
}
private void incrErrorCount(Result result, Metric.Context metricContext) {
failedQueries.increment();
metric.add(FAILED_QUERIES_METRIC, 1, metricContext);
if (result == null) // the chain threw an exception
metric.add("error.unhandled_exception", 1, metricContext);
else if (result.hits().getErrorHit().hasOnlyErrorCode(Error.NULL_QUERY.code))
nullQueries.increment();
else if (result.hits().getErrorHit().hasOnlyErrorCode(Error.ILLEGAL_QUERY.code))
illegalQueries.increment();
}
/**
* Creates error metric for StateHandler only. These metrics are only exposed on /state/v1/metrics page
* and not forwarded to the log file.
*
* @param result The result to check for errors
*/
private void incrementStatePageOnlyErrors(Result result, Execution execution) {
if (result == null) return;
ErrorHit error = result.hits().getErrorHit();
if (error == null) return;
for (ErrorMessage m : error.errors()) {
int code = m.getCode();
Metric.Context c = getDimensions(m.getSource(), result, execution);
if (code == TIMEOUT.code) {
metric.add("error.timeout", 1, c);
} else if (code == NO_BACKENDS_IN_SERVICE.code) {
metric.add("error.backends_oos", 1, c);
} else if (code == ERROR_IN_PLUGIN.code) {
metric.add("error.plugin_failure", 1, c);
} else if (code == BACKEND_COMMUNICATION_ERROR.code) {
metric.add("error.backend_communication_error", 1, c);
} else if (code == EMPTY_DOCUMENTS.code) {
metric.add("error.empty_document_summaries", 1, c);
} else if (code == ILLEGAL_QUERY.code) {
metric.add("error.illegal_query", 1, c);
} else if (code == INVALID_QUERY_PARAMETER.code) {
metric.add("error.invalid_query_parameter", 1, c);
} else if (code == INTERNAL_SERVER_ERROR.code) {
metric.add("error.internal_server_error", 1, c);
} else if (code == SERVER_IS_MISCONFIGURED.code) {
metric.add("error.misconfigured_server", 1, c);
} else if (code == INVALID_QUERY_TRANSFORMATION.code) {
metric.add("error.invalid_query_transformation", 1, c);
} else if (code == RESULT_HAS_ERRORS.code) {
metric.add("error.result_with_errors", 1, c);
} else if (code == UNSPECIFIED.code) {
metric.add("error.unspecified", 1, c);
}
}
}
private Metric.Context getDimensions(String source, Result r, Execution execution) {
Metric.Context context = statePageOnlyContexts.get(source == null ? "" : source);
if (context == null) {
Map dims = new HashMap<>();
if (source != null) {
dims.put("source", source);
}
context = this.metric.createContext(dims);
statePageOnlyContexts.put(source == null ? "" : source, context);
}
// TODO add other relevant metric dimensions
// Would be nice to have chain as a dimension as
// we can separate errors from different chains
return context;
}
}