diff options
Diffstat (limited to 'container-search')
19 files changed, 651 insertions, 50 deletions
diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/TracingOptions.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/TracingOptions.java new file mode 100644 index 00000000000..3c96b00d628 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/TracingOptions.java @@ -0,0 +1,83 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors; + +import com.yahoo.vespa.streamingvisitors.tracing.LoggingTraceExporter; +import com.yahoo.vespa.streamingvisitors.tracing.MaxSamplesPerPeriod; +import com.yahoo.vespa.streamingvisitors.tracing.MonotonicNanoClock; +import com.yahoo.vespa.streamingvisitors.tracing.ProbabilisticSampleRate; +import com.yahoo.vespa.streamingvisitors.tracing.SamplingStrategy; +import com.yahoo.vespa.streamingvisitors.tracing.SamplingTraceExporter; +import com.yahoo.vespa.streamingvisitors.tracing.TraceExporter; + +import java.util.concurrent.TimeUnit; + +/** + * Encapsulates all trace-related components and options used by the streaming search Searcher. + * + * Provides a DEFAULT static instance which has the following characteristics: + * - Approximately 1 query every 2 seconds is traced + * - Trace level is set to 7 for traced queries + * - Only emits traces for queries that have timed out and where the elapsed time is at least 5x + * of the timeout specified in the query itself + * - Emits traces to the Vespa log + * - Only 1 trace every 10 seconds may be emitted to the log + */ +public class TracingOptions { + + private final SamplingStrategy samplingStrategy; + private final TraceExporter traceExporter; + private final MonotonicNanoClock clock; + private final int traceLevelOverride; + private final double traceTimeoutMultiplierThreshold; + + /** + * @param samplingStrategy used for choosing if a query should have its trace level implicitly altered. + * @param traceExporter used for emitting a visitor session trace to someplace it may be debugged later. + * @param clock monotonic clock used for relative time tracking. + * @param traceLevelOverride if a query is trace-sampled, its traceLevel will be set to this value + * @param traceTimeoutMultiplierThreshold only export traces if the elapsed time is greater than the query timeout * this value + */ + public TracingOptions(SamplingStrategy samplingStrategy, TraceExporter traceExporter, + MonotonicNanoClock clock, int traceLevelOverride, double traceTimeoutMultiplierThreshold) + { + this.samplingStrategy = samplingStrategy; + this.traceExporter = traceExporter; + this.clock = clock; + this.traceLevelOverride = traceLevelOverride; + this.traceTimeoutMultiplierThreshold = traceTimeoutMultiplierThreshold; + } + + public static final TracingOptions DEFAULT; + public static final int DEFAULT_TRACE_LEVEL_OVERRIDE = 7; // TODO determine appropriate trace level + // Traces won't be exported unless the query has timed out with a duration that is > timeout * multiplier + public static final double TRACE_TIMEOUT_MULTIPLIER_THRESHOLD = 5.0; + + static { + SamplingStrategy queryTraceSampler = ProbabilisticSampleRate.withSystemDefaults(0.5); + SamplingStrategy logExportSampler = MaxSamplesPerPeriod.withSteadyClock(TimeUnit.SECONDS.toNanos(10), 1); + TraceExporter traceExporter = new SamplingTraceExporter(new LoggingTraceExporter(), logExportSampler); + DEFAULT = new TracingOptions(queryTraceSampler, traceExporter, System::nanoTime, + DEFAULT_TRACE_LEVEL_OVERRIDE, TRACE_TIMEOUT_MULTIPLIER_THRESHOLD); + } + + public SamplingStrategy getSamplingStrategy() { + return samplingStrategy; + } + + public TraceExporter getTraceExporter() { + return traceExporter; + } + + public MonotonicNanoClock getClock() { + return clock; + } + + public int getTraceLevelOverride() { + return traceLevelOverride; + } + + public double getTraceTimeoutMultiplierThreshold() { + return traceTimeoutMultiplierThreshold; + } + +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java index 827fcb885df..c1217f7acc2 100644 --- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.streamingvisitors; import com.yahoo.document.DocumentId; -import com.yahoo.document.idstring.IdString; import com.yahoo.document.select.parser.ParseException; import com.yahoo.document.select.parser.TokenMgrException; import com.yahoo.fs4.DocsumPacket; @@ -26,11 +25,13 @@ import com.yahoo.searchlib.aggregation.Grouping; import com.yahoo.vdslib.DocumentSummary; import com.yahoo.vdslib.SearchResult; import com.yahoo.vdslib.VisitorStatistics; +import com.yahoo.vespa.streamingvisitors.tracing.TraceDescription; import java.io.IOException; import java.math.BigInteger; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** @@ -49,8 +50,10 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { private static final CompoundName streamingSelection=new CompoundName("streaming.selection"); public static final String STREAMING_STATISTICS = "streaming.statistics"; - private VisitorFactory visitorFactory; + private final VisitorFactory visitorFactory; + private final TracingOptions tracingOptions; private static final Logger log = Logger.getLogger(VdsStreamingSearcher.class.getName()); + private Route route; /** The configId used to access the searchcluster. */ private String searchClusterConfigId = null; @@ -73,26 +76,105 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { private static class VdsVisitorFactory implements VisitorFactory { @Override - public Visitor createVisitor(Query query, String searchCluster, Route route, String documentType) { - return new VdsVisitor(query, searchCluster, route, documentType); + public Visitor createVisitor(Query query, String searchCluster, Route route, String documentType, int traceLevelOverride) { + return new VdsVisitor(query, searchCluster, route, documentType, traceLevelOverride); } } public VdsStreamingSearcher() { - visitorFactory = new VdsVisitorFactory(); + this(new VdsVisitorFactory()); } public VdsStreamingSearcher(VisitorFactory visitorFactory) { this.visitorFactory = visitorFactory; + tracingOptions = TracingOptions.DEFAULT; + } + + public VdsStreamingSearcher(VisitorFactory visitorFactory, TracingOptions tracingOptions) { + this.visitorFactory = visitorFactory; + this.tracingOptions = tracingOptions; } @Override protected void doPartialFill(Result result, String summaryClass) { } + private double durationInMillisFromNanoTime(long startTimeNanos) { + return (tracingOptions.getClock().nanoTimeNow() - startTimeNanos) / (double)TimeUnit.MILLISECONDS.toNanos(1); + } + + private boolean timeoutBadEnoughToBeReported(Query query, double durationMillis) { + return (durationMillis > (query.getTimeout() * tracingOptions.getTraceTimeoutMultiplierThreshold())); + } + + private static boolean queryIsLocationConstrained(Query query) { + return ((query.properties().getString(streamingUserid) != null) || + (query.properties().getString(streamingGroupname) != null)); + } + + private static int documentSelectionQueryParameterCount(Query query) { + int paramCount = 0; + if (query.properties().getString(streamingUserid) != null) { + paramCount++; + } + if (query.properties().getString(streamingGroupname) != null) { + paramCount++; + } + if (query.properties().getString(streamingSelection) != null) { + paramCount++; + } + return paramCount; + } + + private boolean shouldTraceQuery(Query query) { + // Only trace for explicit bucket subset queries, as otherwise we'd get a trace entry for every superbucket in the system. + return (queryIsLocationConstrained(query) && + ((query.getTraceLevel() > 0) || tracingOptions.getSamplingStrategy().shouldSample())); + } + + private int inferEffectiveQueryTraceLevel(Query query) { + return ((query.getTraceLevel() == 0) && shouldTraceQuery(query)) // Honor query's explicit trace level if present. + ? tracingOptions.getTraceLevelOverride() + : query.getTraceLevel(); + } + @Override public Result doSearch2(Query query, Execution execution) { - // TODO refactor this method into smaller methods, it's hard to see the actual code + initializeMissingQueryFields(query); + if (documentSelectionQueryParameterCount(query) != 1) { + return new Result(query, ErrorMessage.createBackendCommunicationError("Streaming search needs one and " + + "only one of these query parameters to be set: streaming.userid, streaming.groupname, " + + "streaming.selection")); + } + query.trace("Routing to search cluster " + getSearchClusterConfigId() + " and document type " + documentType, 4); + long timeStartedNanos = tracingOptions.getClock().nanoTimeNow(); + int effectiveTraceLevel = inferEffectiveQueryTraceLevel(query); + + Visitor visitor = visitorFactory.createVisitor(query, getSearchClusterConfigId(), route, documentType, effectiveTraceLevel); + try { + visitor.doSearch(); + } catch (ParseException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError( + "Failed to parse document selection string: " + e.getMessage() + "'.")); + } catch (TokenMgrException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError( + "Failed to tokenize document selection string: " + e.getMessage() + "'.")); + } catch (TimeoutException e) { + double elapsedMillis = durationInMillisFromNanoTime(timeStartedNanos); + if ((effectiveTraceLevel > 0) && timeoutBadEnoughToBeReported(query, elapsedMillis)) { + tracingOptions.getTraceExporter().maybeExport(() -> new TraceDescription(visitor.getTrace(), + String.format("Trace of %s which timed out after %.3g seconds", + query.toString(), elapsedMillis / 1000.0))); + } + return new Result(query, ErrorMessage.createTimeout(e.getMessage())); + } catch (InterruptedException|IllegalArgumentException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError(e.getMessage())); + } + + return buildResultFromCompletedVisitor(query, visitor); + } + + private void initializeMissingQueryFields(Query query) { lazyTrace(query, 7, "Routing to storage cluster ", getStorageClusterRouteSpec()); if (route == null) { @@ -119,32 +201,9 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { lazyTrace(query, 8, "doSearch2(): sort specification=", query .getRanking().getSorting() == null ? null : query.getRanking() .getSorting().fieldOrders()); + } - int documentSelectionQueryParameterCount = 0; - if (query.properties().getString(streamingUserid) != null) documentSelectionQueryParameterCount++; - if (query.properties().getString(streamingGroupname) != null) documentSelectionQueryParameterCount++; - if (query.properties().getString(streamingSelection) != null) documentSelectionQueryParameterCount++; - if (documentSelectionQueryParameterCount != 1) { - return new Result(query, ErrorMessage.createBackendCommunicationError("Streaming search needs one and " + - "only one of these query parameters to be set: streaming.userid, streaming.groupname, " + - "streaming.selection")); - } - query.trace("Routing to search cluster " + getSearchClusterConfigId() + " and document type " + documentType, 4); - Visitor visitor = visitorFactory.createVisitor(query, getSearchClusterConfigId(), route, documentType); - try { - visitor.doSearch(); - } catch (ParseException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError( - "Failed to parse document selection string: " + e.getMessage() + "'.")); - } catch (TokenMgrException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError( - "Failed to tokenize document selection string: " + e.getMessage() + "'.")); - } catch (TimeoutException e) { - return new Result(query, ErrorMessage.createTimeout(e.getMessage())); - } catch (InterruptedException|IllegalArgumentException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError(e.getMessage())); - } - + private Result buildResultFromCompletedVisitor(Query query, Visitor visitor) { lazyTrace(query, 8, "offset=", query.getOffset(), ", hits=", query.getHits()); Result result = new Result(query); @@ -176,7 +235,6 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { DocumentSummary.Summary summary = summaryMap.get(hit.getDocId()); if (summary != null) { DocsumPacket dp = new DocsumPacket(summary.getSummary()); - //log.log(LogLevel.SPAM, "DocsumPacket: " + dp); summaryPackets[index] = dp; } else { return new Result(query, ErrorMessage.createBackendCommunicationError( @@ -213,7 +271,7 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { return new Result(query, ErrorMessage.createBackendCommunicationError("Error filling hits with summary fields")); } - if (skippedHits==0) { + if (skippedHits == 0) { query.trace("All hits have been filled",4); // TODO: cache results or result.analyzeHits(); ? } else { lazyTrace(query, 8, "Skipping some hits for query: ", result.getQuery()); @@ -221,7 +279,7 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { lazyTrace(query, 8, "Returning result ", result); - if ( skippedHits>0 ) { + if (skippedHits > 0) { getLogger().info("skipping " + skippedHits + " hits for query: " + result.getQuery()); result.hits().addError(com.yahoo.search.result.ErrorMessage.createTimeout("Missing hit summary data for " + skippedHits + " hits")); } diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java index 628c24fffd1..795b62663d5 100644 --- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsVisitor.java @@ -19,6 +19,7 @@ import com.yahoo.documentapi.messagebus.protocol.SearchResultMessage; import com.yahoo.io.GrowableByteBuffer; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Trace; import com.yahoo.messagebus.routing.Route; import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.processing.request.CompoundName; @@ -72,6 +73,8 @@ class VdsVisitor extends VisitorDataHandler implements Visitor { private final Map<Integer, Grouping> groupingMap = new ConcurrentHashMap<>(); private Query query = null; private VisitorSessionFactory visitorSessionFactory; + private final int traceLevelOverride; + private Trace sessionTrace; public interface VisitorSessionFactory { VisitorSession createVisitorSession(VisitorParameters params) throws ParseException; @@ -123,20 +126,22 @@ class VdsVisitor extends VisitorDataHandler implements Visitor { } } - public VdsVisitor(Query query, String searchCluster, Route route, String documentType) { - this(query, searchCluster, route, documentType, MessageBusVisitorSessionFactory.sharedInstance()); + public VdsVisitor(Query query, String searchCluster, Route route, String documentType, int traceLevelOverride) { + this(query, searchCluster, route, documentType, MessageBusVisitorSessionFactory.sharedInstance(), traceLevelOverride); } public VdsVisitor(Query query, String searchCluster, Route route, - String documentType, VisitorSessionFactory visitorSessionFactory) + String documentType, VisitorSessionFactory visitorSessionFactory, + int traceLevelOverride) { this.query = query; this.visitorSessionFactory = visitorSessionFactory; + this.traceLevelOverride = traceLevelOverride; setVisitorParameters(searchCluster, route, documentType); } - private static int inferSessionTraceLevel(Query query) { - int implicitLevel = 0; + private int inferSessionTraceLevel(Query query) { + int implicitLevel = traceLevelOverride; if (log.isLoggable(LogLevel.SPAM)) { implicitLevel = 9; } else if (log.isLoggable(LogLevel.DEBUG)) { @@ -331,11 +336,11 @@ class VdsVisitor extends VisitorDataHandler implements Visitor { } } finally { session.destroy(); - log.log(LogLevel.DEBUG, () -> session.getTrace().toString()); + sessionTrace = session.getTrace(); + log.log(LogLevel.DEBUG, () -> sessionTrace.toString()); + query.trace(sessionTrace.toString(), false, 9); } - query.trace(session.getTrace().toString(), false, 9); - if (params.getControlHandler().getResult().code == VisitorControlHandler.CompletionCode.SUCCESS) { if (log.isLoggable(LogLevel.DEBUG)) { log.log(LogLevel.DEBUG, "VdsVisitor completed successfully for " + query + " with selection " + params.getDocumentSelection()); @@ -368,6 +373,11 @@ class VdsVisitor extends VisitorDataHandler implements Visitor { ack(token); } + @Override + public Trace getTrace() { + return sessionTrace; + } + public void onQueryResult(SearchResult sr, DocumentSummary summary) { handleSearchResult(sr); handleSummary(summary); diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/Visitor.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/Visitor.java index 45a525896e9..e8b83495c69 100644 --- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/Visitor.java +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/Visitor.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.streamingvisitors; import com.yahoo.document.select.parser.ParseException; +import com.yahoo.messagebus.Trace; import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.searchlib.aggregation.Grouping; import com.yahoo.vdslib.DocumentSummary; @@ -29,4 +30,7 @@ interface Visitor { int getTotalHitCount(); List<Grouping> getGroupings(); + + Trace getTrace(); + } diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VisitorFactory.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VisitorFactory.java index 9762d05bf45..7ce323a2f2b 100644 --- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VisitorFactory.java +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VisitorFactory.java @@ -10,5 +10,7 @@ import com.yahoo.search.Query; * @author <a href="mailto:ulf@yahoo-inc.com">Ulf Carlin</a> */ interface VisitorFactory { - public Visitor createVisitor(Query query, String searchCluster, Route route, String documentType); + + Visitor createVisitor(Query query, String searchCluster, Route route, String documentType, int traceLevelOverride); + } diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/LoggingTraceExporter.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/LoggingTraceExporter.java new file mode 100644 index 00000000000..0aaf301e071 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/LoggingTraceExporter.java @@ -0,0 +1,25 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import com.yahoo.log.LogLevel; + +import java.util.function.Supplier; +import java.util.logging.Logger; + +/** + * Trace exporter which dumps traces and their description as warning-entries in the Vespa log. + */ +public class LoggingTraceExporter implements TraceExporter { + + private static final Logger log = Logger.getLogger(LoggingTraceExporter.class.getName()); + + @Override + public void maybeExport(Supplier<TraceDescription> traceDescriptionSupplier) { + var traceDescription = traceDescriptionSupplier.get(); + if (traceDescription.getTrace() != null) { + log.log(LogLevel.WARNING, String.format("%s: %s", traceDescription.getDescription(), + traceDescription.getTrace().toString())); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/MaxSamplesPerPeriod.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/MaxSamplesPerPeriod.java new file mode 100644 index 00000000000..83290985007 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/MaxSamplesPerPeriod.java @@ -0,0 +1,46 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +/** + * Very basic sampling strategy which allows for sampling N requests within a fixed + * time window. No attempt is made to distribute the samples evenly within the time + * period; this is on a first-come, first-serve basis. + */ +public class MaxSamplesPerPeriod implements SamplingStrategy { + + private final MonotonicNanoClock nanoClock; + private final long maxSamplesPerPeriod; + private final long periodLengthInNanos; + private long currentSamplingPeriod = 0; + private long samplesInCurrentPeriod = 0; + + public MaxSamplesPerPeriod(MonotonicNanoClock nanoClock, long periodLengthInNanos, long maxSamplesPerPeriod) { + this.nanoClock = nanoClock; + this.periodLengthInNanos = periodLengthInNanos; + this.maxSamplesPerPeriod = maxSamplesPerPeriod; + } + + public static MaxSamplesPerPeriod withSteadyClock(long periodLengthInNanos, long maxSamplesPerPeriod) { + // We make a reasonable assumption that System.nanoTime uses the underlying steady clock. + return new MaxSamplesPerPeriod(System::nanoTime, periodLengthInNanos, maxSamplesPerPeriod); + } + + @Override + public boolean shouldSample() { + long now = nanoClock.nanoTimeNow(); + long period = now / periodLengthInNanos; + synchronized (this) { + if (period != currentSamplingPeriod) { + currentSamplingPeriod = period; + samplesInCurrentPeriod = 1; + return true; + } + if (samplesInCurrentPeriod >= maxSamplesPerPeriod) { + return false; + } + ++samplesInCurrentPeriod; + return true; + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/MonotonicNanoClock.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/MonotonicNanoClock.java new file mode 100644 index 00000000000..9c9815c2dbc --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/MonotonicNanoClock.java @@ -0,0 +1,15 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +/** + * Clock which returns a monotonically increasing timestamp from an undefined epoch. + * The epoch is guaranteed to be stable within a single JVM execution, but not across + * processes. Should therefore only be used for relative duration tracking, not absolute + * wall clock time events. + */ +@FunctionalInterface +public interface MonotonicNanoClock { + + long nanoTimeNow(); + +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/ProbabilisticSampleRate.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/ProbabilisticSampleRate.java new file mode 100644 index 00000000000..bc6e91afe17 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/ProbabilisticSampleRate.java @@ -0,0 +1,52 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +/** + * Simple implementation of OpenCensus algorithm for probabilistic rate limiting as outlined in + * https://github.com/census-instrumentation/opencensus-specs/blob/master/trace/Sampling.md + */ +public class ProbabilisticSampleRate implements SamplingStrategy { + + private final MonotonicNanoClock nanoClock; + private final Supplier<Random> randomSupplier; + private final double desiredSamplesPerSec; + private final AtomicLong lastSampledAtNanoTime = new AtomicLong(0); + + public ProbabilisticSampleRate(MonotonicNanoClock nanoClock, + Supplier<Random> randomSupplier, + double desiredSamplesPerSec) + { + this.nanoClock = nanoClock; + this.randomSupplier = randomSupplier; + this.desiredSamplesPerSec = desiredSamplesPerSec; + } + + public static ProbabilisticSampleRate withSystemDefaults(double desiredSamplesPerSec) { + return new ProbabilisticSampleRate(System::nanoTime, ThreadLocalRandom::current, desiredSamplesPerSec); + } + + @Override + public boolean shouldSample() { + // This load might race with the store below, causing multiple threads to get a sample + // since the new timestamp has not been written yet, but it is extremely unlikely and + // the consequences are not severe since this is a probabilistic sampler that does not + // provide hard lower or upper bounds. + long lastSampledAt = lastSampledAtNanoTime.get(); // TODO getPlain? No transitive visibility requirements + long now = nanoClock.nanoTimeNow(); + double secsSinceLastSample = (now - lastSampledAt) / 1_000_000_000.0; + // As the time elapsed since last sample increases, so does the probability of a new sample + // being selected. + double sampleProb = Math.min(secsSinceLastSample * desiredSamplesPerSec, 1.0); + if (randomSupplier.get().nextDouble() < sampleProb) { + lastSampledAtNanoTime.set(now); // TODO setPlain? No transitive visibility requirements + return true; + } else { + return false; + } + } +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingStrategy.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingStrategy.java new file mode 100644 index 00000000000..3ce7864c081 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingStrategy.java @@ -0,0 +1,16 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +/** + * A sampling strategy makes the high-level decision of whether or not a query + * should be traced. + * + * Callers should be able to expect that calling shouldSample() is a cheap operation + * with little or no underlying locking. This in turn means that the sampling strategy + * may be consulted for each query with minimal overhead. + */ +public interface SamplingStrategy { + + boolean shouldSample(); + +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingTraceExporter.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingTraceExporter.java new file mode 100644 index 00000000000..b18e8d78266 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingTraceExporter.java @@ -0,0 +1,26 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import java.util.function.Supplier; + +/** + * Trace exporter which only exports a subset of traces as decided by the provided sampling strategy. + */ +public class SamplingTraceExporter implements TraceExporter { + + private final TraceExporter wrappedExporter; + private final SamplingStrategy samplingStrategy; + + public SamplingTraceExporter(TraceExporter wrappedExporter, SamplingStrategy samplingStrategy) { + this.wrappedExporter = wrappedExporter; + this.samplingStrategy = samplingStrategy; + } + + @Override + public void maybeExport(Supplier<TraceDescription> traceDescriptionSupplier) { + if (samplingStrategy.shouldSample()) { + wrappedExporter.maybeExport(traceDescriptionSupplier); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/TraceDescription.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/TraceDescription.java new file mode 100644 index 00000000000..74daef05756 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/TraceDescription.java @@ -0,0 +1,29 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import com.yahoo.messagebus.Trace; + +/** + * High-level description of a trace and the actual trace itself. Used to provide more + * information to logs etc than just the trace tree. The description will usually contain + * context for the trace, such as the orginal query string, desired timeout etc. + */ +public class TraceDescription { + + private final Trace trace; + private final String description; + + public TraceDescription(Trace trace, String description) { + this.trace = trace; + this.description = description; + } + + public Trace getTrace() { + return trace; + } + + public String getDescription() { + return description; + } + +} diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/TraceExporter.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/TraceExporter.java new file mode 100644 index 00000000000..0f55408e45b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/tracing/TraceExporter.java @@ -0,0 +1,15 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import java.util.function.Supplier; + +/** + * Potentially exports a trace to an underlying consumer. "Potentially" here means + * that the exporter may itself sample or otherwise limit which queries are actually + * exported. + */ +public interface TraceExporter { + + void maybeExport(Supplier<TraceDescription> traceDescriptionSupplier); + +} diff --git a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcherTestCase.java b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcherTestCase.java index 9dd6aae9e7b..c6ab1a8454f 100644 --- a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcherTestCase.java @@ -3,11 +3,11 @@ package com.yahoo.vespa.streamingvisitors; import com.yahoo.config.subscription.ConfigGetter; import com.yahoo.document.select.parser.TokenMgrException; +import com.yahoo.messagebus.Trace; import com.yahoo.messagebus.routing.Route; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; import com.yahoo.document.select.parser.ParseException; -import com.yahoo.fs4.QueryPacket; import com.yahoo.prelude.fastsearch.SummaryParameters; import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.search.Query; @@ -18,21 +18,31 @@ import com.yahoo.searchlib.aggregation.Grouping; import com.yahoo.vdslib.DocumentSummary; import com.yahoo.vdslib.SearchResult; import com.yahoo.vdslib.VisitorStatistics; +import com.yahoo.vespa.streamingvisitors.tracing.MockUtils; +import com.yahoo.vespa.streamingvisitors.tracing.MonotonicNanoClock; +import com.yahoo.vespa.streamingvisitors.tracing.SamplingStrategy; +import com.yahoo.vespa.streamingvisitors.tracing.TraceExporter; import org.junit.Test; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** - * @author <a href="mailto:ulf@yahoo-inc.com">Ulf Carlin</a> + * @author Ulf Carlin */ public class VdsStreamingSearcherTestCase { public static final String USERDOC_ID_PREFIX = "id:namespace:mytype:n=1:userspecific"; @@ -47,12 +57,14 @@ public class VdsStreamingSearcherTestCase { private final List<SearchResult.Hit> hits = new ArrayList<>(); private final Map<String, DocumentSummary.Summary> summaryMap = new HashMap<>(); private final List<Grouping> groupings = new ArrayList<>(); + int traceLevelOverride; - MockVisitor(Query query, String searchCluster, Route route, String documentType) { + MockVisitor(Query query, String searchCluster, Route route, String documentType, int traceLevelOverride) { this.query = query; this.searchCluster = searchCluster; this.route = route; this.documentType = documentType; + this.traceLevelOverride = traceLevelOverride; } @Override @@ -126,12 +138,21 @@ public class VdsStreamingSearcherTestCase { public List<Grouping> getGroupings() { return groupings; } + + @Override + public Trace getTrace() { + return new Trace(); + } } private static class MockVisitorFactory implements VisitorFactory { + + public MockVisitor lastCreatedVisitor; + @Override - public Visitor createVisitor(Query query, String searchCluster, Route route, String documentType) { - return new MockVisitor(query, searchCluster, route, documentType); + public Visitor createVisitor(Query query, String searchCluster, Route route, String documentType, int traceLevelOverride) { + lastCreatedVisitor = new MockVisitor(query, searchCluster, route, documentType, traceLevelOverride); + return lastCreatedVisitor; } } @@ -263,4 +284,73 @@ public class VdsStreamingSearcherTestCase { assertFalse(VdsStreamingSearcher.verifyDocId(badId, group1Query, false)); } + private static class TraceFixture { + SamplingStrategy sampler = mock(SamplingStrategy.class); + TraceExporter exporter = mock(TraceExporter.class); + MonotonicNanoClock clock; + TracingOptions options; + + MockVisitorFactory factory; + VdsStreamingSearcher searcher; + + private TraceFixture(Long firstTimestamp, Long... additionalTimestamps) { + clock = MockUtils.mockedClockReturning(firstTimestamp, additionalTimestamps); + options = new TracingOptions(sampler, exporter, clock, 8, 2.0); + factory = new MockVisitorFactory(); + searcher = new VdsStreamingSearcher(factory, options); + } + + private TraceFixture() { + this(TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10)); + } + + static TraceFixture withSampledTrace(boolean shouldTrace) { + var f = new TraceFixture(); + when(f.sampler.shouldSample()).thenReturn(shouldTrace); + return f; + } + + static TraceFixture withTracingAndClockSampledAt(long t1ms, long t2ms) { + var f = new TraceFixture(TimeUnit.MILLISECONDS.toNanos(t1ms), TimeUnit.MILLISECONDS.toNanos(t2ms)); + when(f.sampler.shouldSample()).thenReturn(true); + return f; + } + } + + @Test + public void trace_level_set_if_sampling_strategy_returns_true() { + var f = TraceFixture.withSampledTrace(true); + executeQuery(f.searcher, new Query("/?streaming.userid=1&query=timeoutexception")); + + assertNotNull(f.factory.lastCreatedVisitor); + assertEquals(f.factory.lastCreatedVisitor.traceLevelOverride, 8); + } + + @Test + public void trace_level_not_set_if_sampling_strategy_returns_false() { + var f = TraceFixture.withSampledTrace(false); + executeQuery(f.searcher, new Query("/?streaming.userid=1&query=timeoutexception")); + + assertNotNull(f.factory.lastCreatedVisitor); + assertEquals(f.factory.lastCreatedVisitor.traceLevelOverride, 0); + } + + @Test + public void trace_is_exported_if_timed_out_beyond_threshold() { + // Default mock timeout threshold is 2x timeout + var f = TraceFixture.withTracingAndClockSampledAt(1000, 3001); + executeQuery(f.searcher, new Query("/?streaming.userid=1&query=timeoutexception&timeout=1.0")); + + verify(f.exporter, times(1)).maybeExport(any()); + } + + @Test + public void trace_is_not_exported_if_timed_out_less_than_threshold() { + // Default mock timeout threshold is 2x timeout + var f = TraceFixture.withTracingAndClockSampledAt(1000, 2999); + executeQuery(f.searcher, new Query("/?streaming.userid=1&query=timeoutexception&timeout=1.0")); + + verify(f.exporter, times(0)).maybeExport(any()); + } + } diff --git a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsVisitorTestCase.java b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsVisitorTestCase.java index 9119a5cd0f1..7841b6f715c 100644 --- a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsVisitorTestCase.java +++ b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/VdsVisitorTestCase.java @@ -363,7 +363,7 @@ public class VdsVisitorTestCase { } private void verifyVisitorOk(MockVisitorSessionFactory factory, QueryArguments qa, Route route, String searchCluster) throws Exception { - VdsVisitor visitor = new VdsVisitor(buildQuery(qa), searchCluster, route, "mytype", factory); + VdsVisitor visitor = new VdsVisitor(buildQuery(qa), searchCluster, route, "mytype", factory, 0); visitor.doSearch(); verifyVisitorParameters(factory.getParams(), qa, searchCluster, "mytype", route); supplyResults(visitor); @@ -371,7 +371,7 @@ public class VdsVisitorTestCase { } private void verifyVisitorFails(MockVisitorSessionFactory factory, QueryArguments qa, Route route, String searchCluster) throws Exception { - VdsVisitor visitor = new VdsVisitor(buildQuery(qa), searchCluster, route, "mytype", factory); + VdsVisitor visitor = new VdsVisitor(buildQuery(qa), searchCluster, route, "mytype", factory, 0); try { visitor.doSearch(); assertTrue("Visitor did not fail", false); diff --git a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/MaxSamplesPerPeriodTest.java b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/MaxSamplesPerPeriodTest.java new file mode 100644 index 00000000000..0ab42cfdcfc --- /dev/null +++ b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/MaxSamplesPerPeriodTest.java @@ -0,0 +1,37 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MaxSamplesPerPeriodTest { + + @Test + public void first_sample_in_period_returns_true() { + var clock = MockUtils.mockedClockReturning(1000L); + var sampler = new MaxSamplesPerPeriod(clock, 1000L, 1L); + assertTrue(sampler.shouldSample()); + } + + @Test + public void samples_exceeding_period_count_return_false() { + var clock = MockUtils.mockedClockReturning(1000L, 1100L, 1200L); + var sampler = new MaxSamplesPerPeriod(clock, 1000L, 2L); + assertTrue(sampler.shouldSample()); + assertTrue(sampler.shouldSample()); + assertFalse(sampler.shouldSample()); + } + + @Test + public void sample_in_new_period_returns_true() { + var clock = MockUtils.mockedClockReturning(1000L, 1900L, 2000L, 2900L); + var sampler = new MaxSamplesPerPeriod(clock, 1000L, 1L); + assertTrue(sampler.shouldSample()); + assertFalse(sampler.shouldSample()); + assertTrue(sampler.shouldSample()); + assertFalse(sampler.shouldSample()); + } + +} diff --git a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/MockUtils.java b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/MockUtils.java new file mode 100644 index 00000000000..764e1fb6b49 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/MockUtils.java @@ -0,0 +1,24 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import java.util.Random; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MockUtils { + + public static MonotonicNanoClock mockedClockReturning(Long ts, Long... additionalTimestamps) { + var clock = mock(MonotonicNanoClock.class); + when(clock.nanoTimeNow()).thenReturn(ts, additionalTimestamps); + return clock; + } + + // Extremely high quality randomness :D + public static Random mockedRandomReturning(Double v, Double... additionalValues) { + var rng = mock(Random.class); + when(rng.nextDouble()).thenReturn(v, additionalValues); + return rng; + } + +} diff --git a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/ProbabilisticSampleRateTest.java b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/ProbabilisticSampleRateTest.java new file mode 100644 index 00000000000..c1772e91296 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/ProbabilisticSampleRateTest.java @@ -0,0 +1,41 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ProbabilisticSampleRateTest { + + private static long ms2ns(long ms) { + return TimeUnit.MILLISECONDS.toNanos(ms); + } + + @Test + public void samples_are_rate_limited_per_second() { + var clock = MockUtils.mockedClockReturning(ms2ns(10_000), ms2ns(10_500), ms2ns(10_500), ms2ns(10_501)); + var rng = MockUtils.mockedRandomReturning(0.1, 0.51, 0.49, 0.01); + var sampler = new ProbabilisticSampleRate(clock, () -> rng, 1.0); + // 1st invocation, 10 seconds (technically "infinity") since last sample. P = 1.0, sampled + assertTrue(sampler.shouldSample()); + // 2nd invocation, 0.5 seconds since last sample. rng = 0.51 >= P = 0.5, not sampled + assertFalse(sampler.shouldSample()); + // 3rd invocation, 0.5 seconds since last sample. rng = 0.49 < P = 0.5, sampled + assertTrue(sampler.shouldSample()); + // 4th invocation, 0.001 seconds since last sample. rng = 0.01 >= P = 0.001, not sampled + assertFalse(sampler.shouldSample()); + } + + @Test + public void zero_desired_sample_rate_returns_false() { + var clock = MockUtils.mockedClockReturning(ms2ns(10_000)); + var rng = MockUtils.mockedRandomReturning(0.99999999); // [0, 1) + var sampler = new ProbabilisticSampleRate(clock, () -> rng, 0.0); + + assertFalse(sampler.shouldSample()); + } + +} diff --git a/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingTraceExporterTest.java b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingTraceExporterTest.java new file mode 100644 index 00000000000..b7dde90d28f --- /dev/null +++ b/container-search/src/test/java/com/yahoo/vespa/streamingvisitors/tracing/SamplingTraceExporterTest.java @@ -0,0 +1,28 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors.tracing; + +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SamplingTraceExporterTest { + + @Test + public void sampling_decision_is_deferred_to_provided_sampler() { + var exporter = mock(TraceExporter.class); + var sampler = mock(SamplingStrategy.class); + when(sampler.shouldSample()).thenReturn(true, false); + var samplingExporter = new SamplingTraceExporter(exporter, sampler); + + samplingExporter.maybeExport(() -> new TraceDescription(null, "")); + verify(exporter, times(1)).maybeExport(any()); + + samplingExporter.maybeExport(() -> new TraceDescription(null, "")); + verify(exporter, times(1)).maybeExport(any()); // No further invocations since last + } + +} |