diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/vespa/streamingvisitors/StreamingBackend.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/vespa/streamingvisitors/StreamingBackend.java | 373 |
1 files changed, 373 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/StreamingBackend.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/StreamingBackend.java new file mode 100644 index 00000000000..9953d76f50a --- /dev/null +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/StreamingBackend.java @@ -0,0 +1,373 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.streamingvisitors; + +import com.yahoo.container.core.documentapi.VespaDocumentAccess; +import com.yahoo.document.DocumentId; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.document.select.parser.TokenMgrException; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.fs4.DocsumPacket; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.GroupingListHit; +import com.yahoo.prelude.fastsearch.TimeoutException; +import com.yahoo.prelude.fastsearch.VespaBackend; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.result.Coverage; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.FeatureData; +import com.yahoo.search.result.Relevance; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.searchlib.aggregation.Grouping; +import com.yahoo.vdslib.DocumentSummary; +import com.yahoo.vdslib.SearchResult; +import com.yahoo.vdslib.VisitorStatistics; +import com.yahoo.vespa.streamingvisitors.tracing.TraceDescription; + +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The searcher which forwards queries to storage nodes using visiting. + * The searcher is a visitor client responsible for starting search + * visitors in storage and collecting and merging the results. + * + * @author baldersheim + * @author Ulf Carlin + */ +public class StreamingBackend extends VespaBackend { + + private static final CompoundName streamingUserid = CompoundName.from("streaming.userid"); + private static final CompoundName streamingGroupname = CompoundName.from("streaming.groupname"); + private static final CompoundName streamingSelection = CompoundName.from("streaming.selection"); + + static final String STREAMING_STATISTICS = "streaming.statistics"; + private final VisitorFactory visitorFactory; + private final TracingOptions tracingOptions; + private static final Logger log = Logger.getLogger(StreamingBackend.class.getName()); + + private Route route; + + /** The configId used to access the searchcluster. */ + private String searchClusterName = null; + + /** The route to the storage cluster. */ + private String storageClusterRouteSpec = null; + + StreamingBackend(VisitorFactory visitorFactory) { + this.visitorFactory = visitorFactory; + tracingOptions = TracingOptions.DEFAULT; + } + + StreamingBackend(VisitorFactory visitorFactory, TracingOptions tracingOptions) { + this.visitorFactory = visitorFactory; + this.tracingOptions = tracingOptions; + } + + public StreamingBackend(VespaDocumentAccess access) { + this(new VespaVisitorFactory(access)); + } + + private String getSearchClusterName() { return searchClusterName; } + private String getStorageClusterRouteSpec() { return storageClusterRouteSpec; } + public final void setSearchClusterName(String clusterName) { this.searchClusterName = clusterName; } + + public final void setStorageClusterRouteSpec(String storageClusterRouteSpec) { + this.storageClusterRouteSpec = storageClusterRouteSpec; + } + + @Override + protected void doPartialFill(Result result, String summaryClass) { + } + + private double durationInMillisFromNanoTime(long startTimeNanos) { + return (tracingOptions.getClock().nanoTimeNow() - startTimeNanos) / (double)TimeUnit.MILLISECONDS.toNanos(1); + } + + private boolean timeoutBadEnoughToBeReported(Query query, double durationMillis) { + return (durationMillis > (query.getTimeout() * tracingOptions.getTraceTimeoutMultiplierThreshold())); + } + + private static boolean queryIsLocationConstrained(Query query) { + return ((query.properties().getString(streamingUserid) != null) || + (query.properties().getString(streamingGroupname) != null)); + } + + private static int documentSelectionQueryParameterCount(Query query) { + int paramCount = 0; + if (query.properties().getString(streamingUserid) != null) + paramCount++; + if (query.properties().getString(streamingGroupname) != null) + paramCount++; + if (query.properties().getString(streamingSelection) != null) + paramCount++; + return paramCount; + } + + private boolean shouldTraceQuery(Query query) { + // Only trace for explicit bucket subset queries, as otherwise we'd get a trace entry for every superbucket in the system. + return (queryIsLocationConstrained(query) && + ((query.getTrace().getLevel() > 0) || tracingOptions.getSamplingStrategy().shouldSample())); + } + + private int inferEffectiveQueryTraceLevel(Query query) { + return ((query.getTrace().getLevel() == 0) && shouldTraceQuery(query)) // Honor query's explicit trace level if present. + ? tracingOptions.getTraceLevelOverride() + : query.getTrace().getLevel(); + } + + @Override + public Result doSearch2(String schema, Query query) { + if (query.getTimeLeft() <= 0) + return new Result(query, ErrorMessage.createTimeout(String.format("No time left for searching (timeout=%d)", query.getTimeout()))); + + initializeMissingQueryFields(query); + if (documentSelectionQueryParameterCount(query) != 1) { + return new Result(query, ErrorMessage.createIllegalQuery("Streaming search requires either " + + "streaming.groupname or streaming.selection")); + } + + if (query.getTrace().isTraceable(4)) + query.trace("Routing to search cluster " + getSearchClusterName() + " and document type " + schema, 4); + long timeStartedNanos = tracingOptions.getClock().nanoTimeNow(); + int effectiveTraceLevel = inferEffectiveQueryTraceLevel(query); + + Visitor visitor = visitorFactory.createVisitor(query, getSearchClusterName(), route, schema, effectiveTraceLevel); + try { + visitor.doSearch(); + } catch (ParseException e) { + return new Result(query, ErrorMessage.createInvalidQueryParameter("Failed to parse document selection string: " + + e.getMessage())); + } catch (TokenMgrException e) { + return new Result(query, ErrorMessage.createInvalidQueryParameter("Failed to tokenize document selection string: " + + e.getMessage())); + } catch (TimeoutException e) { + double elapsedMillis = durationInMillisFromNanoTime(timeStartedNanos); + if ((effectiveTraceLevel > 0) && timeoutBadEnoughToBeReported(query, elapsedMillis)) { + tracingOptions.getTraceExporter().maybeExport(() -> new TraceDescription(visitor.getTrace(), + String.format("Trace of %s which timed out after %.3g seconds", + query, elapsedMillis / 1000.0))); + } + return new Result(query, ErrorMessage.createTimeout(e.getMessage())); + } catch (InterruptedException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError(e.getMessage())); + } + return buildResultFromCompletedVisitor(query, visitor); + } + + private void initializeMissingQueryFields(Query query) { + lazyTrace(query, 7, "Routing to storage cluster ", getStorageClusterRouteSpec()); + + if (route == null) { + route = Route.parse(getStorageClusterRouteSpec()); + } + lazyTrace(query, 8, "Route is ", route); + + lazyTrace(query, 7, "doSearch2(): query docsum class=", + query.getPresentation().getSummary(), ", default docsum class=", + getDefaultDocsumClass()); + + if (query.getPresentation().getSummary() == null) { + lazyTrace(query, 6, + "doSearch2(): No summary class specified in query, using default: ", + getDefaultDocsumClass()); + query.getPresentation().setSummary(getDefaultDocsumClass()); + } else { + lazyTrace(query, 6, + "doSearch2(): Summary class has been specified in query: ", + query.getPresentation().getSummary()); + } + + lazyTrace(query, 8, "doSearch2(): rank properties=", query.getRanking()); + lazyTrace(query, 8, "doSearch2(): sort specification=", query + .getRanking().getSorting() == null ? null : query.getRanking() + .getSorting().fieldOrders()); + } + + private Result buildResultFromCompletedVisitor(Query query, Visitor visitor) { + lazyTrace(query, 8, "offset=", query.getOffset(), ", hits=", query.getHits()); + + Result result = new Result(query); + List<SearchResult.Hit> hits = visitor.getHits(); // Sorted on rank + Map<String, DocumentSummary.Summary> summaryMap = visitor.getSummaryMap(); + + lazyTrace(query, 7, "total hit count = ", visitor.getTotalHitCount(), + ", returned hit count = ", hits.size(), ", summary count = ", + summaryMap.size()); + + VisitorStatistics stats = visitor.getStatistics(); + result.setTotalHitCount(visitor.getTotalHitCount()); + result.setCoverage(new Coverage(stats.getDocumentsVisited(), stats.getDocumentsVisited(), 1, 1)); + query.trace(visitor.getStatistics().toString(), false, 2); + query.getContext(true).setProperty(STREAMING_STATISTICS, stats); + + DocsumPacket[] summaryPackets = new DocsumPacket [hits.size()]; + + int index = 0; + boolean skippedEarlierResult = false; + for (SearchResult.Hit hit : hits) { + if (!verifyDocId(hit.getDocId(), query, skippedEarlierResult)) { + skippedEarlierResult = true; + continue; + } + FastHit fastHit = buildSummaryHit(query, hit); + result.hits().add(fastHit); + + DocumentSummary.Summary summary = summaryMap.get(hit.getDocId()); + if (summary != null) { + DocsumPacket dp = new DocsumPacket(summary.getSummary()); + summaryPackets[index] = dp; + } else { + return new Result(query, ErrorMessage.createBackendCommunicationError("Did not find summary for hit with document id " + + hit.getDocId())); + } + + index++; + } + if (result.isFilled(query.getPresentation().getSummary())) { + lazyTrace(query, 8, "Result is filled for summary class ", query.getPresentation().getSummary()); + } else { + lazyTrace(query, 8, "Result is not filled for summary class ", query.getPresentation().getSummary()); + } + + List<Grouping> groupingList = visitor.getGroupings(); + lazyTrace(query, 8, "Grouping list=", groupingList); + if ( ! groupingList.isEmpty() ) { + GroupingListHit groupHit = new GroupingListHit(groupingList, getDocumentDatabase(query), query); + result.hits().add(groupHit); + } + + FillHitsResult fillHitsResult = fillHits(result, summaryPackets, query.getPresentation().getSummary()); + int skippedHits = fillHitsResult.skippedHits; + if (fillHitsResult.error != null) { + result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); + return result; + } + + if (skippedHits == 0) { + query.trace("All hits have been filled",4); // TODO: cache results or result.analyzeHits(); ? + } else { + lazyTrace(query, 8, "Skipping some hits for query: ", result.getQuery()); + } + + lazyTrace(query, 8, "Returning result ", result); + + if (skippedHits > 0) { + log.info("skipping " + skippedHits + " hits for query: " + result.getQuery()); + result.hits().addError(ErrorMessage.createTimeout("Missing hit summary data for " + skippedHits + " hits")); + } + + return result; + } + + private FastHit buildSummaryHit(Query query, SearchResult.Hit hit) { + FastHit fastHit = new FastHit(); + fastHit.setQuery(query); + fastHit.setSource(getName()); + fastHit.setId(hit.getDocId()); + fastHit.setRelevance(new Relevance(hit.getRank())); + if (hit instanceof SearchResult.HitWithSortBlob sortedHit) { + fastHit.setSortData(sortedHit.getSortBlob(), query.getRanking().getSorting()); + } + if (hit.getMatchFeatures().isPresent()) { + fastHit.setField("matchfeatures", new FeatureData(hit.getMatchFeatures().get())); + } + + fastHit.setFillable(); + return fastHit; + } + + private static void lazyTrace(Query query, int level, Object... args) { + if (query.getTrace().isTraceable(level)) { + StringBuilder s = new StringBuilder(); + for (Object arg : args) { + s.append(arg); + } + query.trace(s.toString(), level); + } + } + + static boolean verifyDocId(String id, Query query, boolean skippedEarlierResult) { + String expectedUserId = query.properties().getString(streamingUserid); + String expectedGroupName = query.properties().getString(streamingGroupname); + + Level logLevel = Level.SEVERE; + if (skippedEarlierResult) { + logLevel = Level.FINE; + } + + DocumentId docId; + try { + docId = new DocumentId(id); + } catch (IllegalArgumentException iae) { + log.log(logLevel, "Bad result for " + query + ": " + iae.getMessage()); + return false; + } + + if (expectedUserId != null) { + long userId; + + if (docId.getScheme().hasNumber()) { + userId = docId.getScheme().getNumber(); + } else { + log.log(logLevel, "Got result with wrong scheme in document ID (" + id + ") for " + query); + return false; + } + if (new BigInteger(expectedUserId).longValue() != userId) { + log.log(logLevel, "Got result with wrong user ID (expected " + expectedUserId + ") in document ID (" + + id + ") for " + query); + return false; + } + } else if (expectedGroupName != null) { + String groupName; + + if (docId.getScheme().hasGroup()) { + groupName = docId.getScheme().getGroup(); + } else { + log.log(logLevel, "Got result with wrong scheme in document ID (" + id + ") for " + query); + return false; + } + if (!expectedGroupName.equals(groupName)) { + log.log(logLevel, "Got result with wrong group name (expected " + expectedGroupName + ") in document ID (" + + id + ") for " + query); + return false; + } + } + return true; + } + + public Pong ping(Ping ping, Execution execution) { + // TODO add a real pong + return new Pong(); + } + + private static class VespaVisitorFactory implements StreamingVisitor.VisitorSessionFactory, VisitorFactory { + + private final VespaDocumentAccess access; + + private VespaVisitorFactory(VespaDocumentAccess access) { + this.access = access; + } + + @Override + public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException { + return access.createVisitorSession(params); + } + + @Override + public Visitor createVisitor(Query query, String searchCluster, Route route, String schema, int traceLevelOverride) { + return new StreamingVisitor(query, searchCluster, route, schema, this, traceLevelOverride); + } + + } + +} |