diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java | 211 |
1 files changed, 108 insertions, 103 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index f481d58d334..855a524473d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.cluster; +import com.yahoo.collections.TinyIdentitySet; import com.yahoo.component.annotation.Inject; import com.yahoo.component.ComponentId; import com.yahoo.component.chain.dependencies.After; @@ -10,9 +11,8 @@ import com.yahoo.container.core.documentapi.VespaDocumentAccess; import com.yahoo.container.handler.VipStatus; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; -import com.yahoo.prelude.fastsearch.FastSearcher; -import com.yahoo.prelude.fastsearch.SummaryParameters; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.prelude.fastsearch.IndexedBackend; +import com.yahoo.prelude.fastsearch.VespaBackend; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.Searcher; @@ -24,14 +24,17 @@ import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.schema.Cluster; import com.yahoo.search.schema.SchemaInfo; import com.yahoo.search.searchchain.Execution; -import com.yahoo.vespa.streamingvisitors.StreamingSearcher; +import com.yahoo.vespa.streamingvisitors.StreamingBackend; import com.yahoo.yolean.Exceptions; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -40,8 +43,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; -import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING; - /** * A searcher which forwards to a cluster of monitored native Vespa backends. * @@ -58,13 +59,12 @@ public class ClusterSearcher extends Searcher { private final String searchClusterName; // The set of document types contained in this search cluster - private final Set<String> schemas; + private final Map<String, VespaBackend> schema2Searcher; private final SchemaInfo schemaInfo; private final long maxQueryTimeout; // in milliseconds private final long maxQueryCacheTimeout; // in milliseconds - private final VespaBackEndSearcher server; private final Executor executor; private final GlobalPhaseRanker globalPhaseRanker; @@ -86,26 +86,28 @@ public class ClusterSearcher extends Searcher { searchClusterName = clusterConfig.clusterName(); QrSearchersConfig.Searchcluster searchClusterConfig = getSearchClusterConfigFromClusterName(qrsConfig, searchClusterName); this.globalPhaseRanker = globalPhaseRanker; - schemas = new LinkedHashSet<>(); + schema2Searcher = new LinkedHashMap<>(); maxQueryTimeout = ParameterParser.asMilliSeconds(clusterConfig.maxQueryTimeout(), DEFAULT_MAX_QUERY_TIMEOUT); maxQueryCacheTimeout = ParameterParser.asMilliSeconds(clusterConfig.maxQueryCacheTimeout(), DEFAULT_MAX_QUERY_CACHE_TIMEOUT); - SummaryParameters docSumParams = new SummaryParameters(qrsConfig - .com().yahoo().prelude().fastsearch().FastSearcher().docsum() - .defaultclass()); - - for (DocumentdbInfoConfig.Documentdb docDb : documentDbConfig.documentdb()) - schemas.add(docDb.name()); - - String uniqueServerId = UUID.randomUUID().toString(); - if (searchClusterConfig.indexingmode() == STREAMING) { - server = streamingCluster(uniqueServerId, searchClusterIndex, - searchClusterConfig, docSumParams, documentDbConfig, schemaInfo, access); - vipStatus.addToRotation(server.getName()); - } else { - server = searchDispatch(searchClusterIndex, searchClusterName, uniqueServerId, - docSumParams, documentDbConfig, schemaInfo, dispatchers); + VespaBackend streaming = null, indexed = null; + ClusterParams clusterParams = makeClusterParams(searchClusterIndex, qrsConfig + .com().yahoo().prelude().fastsearch().IndexedBackend().docsum() + .defaultclass(), documentDbConfig, schemaInfo); + for (DocumentdbInfoConfig.Documentdb docDb : documentDbConfig.documentdb()) { + if (docDb.mode() == DocumentdbInfoConfig.Documentdb.Mode.Enum.INDEX) { + if (indexed == null) { + indexed = searchDispatch(clusterParams, searchClusterName, dispatchers); + } + schema2Searcher.put(docDb.name(), indexed); + } else if (docDb.mode() == DocumentdbInfoConfig.Documentdb.Mode.Enum.STREAMING) { + if (streaming == null) { + streaming = streamingCluster(clusterParams, searchClusterConfig, access); + vipStatus.addToRotation(streaming.getName()); + } + schema2Searcher.put(docDb.name(), streaming); + } } } @@ -115,91 +117,88 @@ public class ClusterSearcher extends Searcher { return searchCluster; } } - return null; + throw new IllegalStateException("No configured search cluster '" + name + "' among : " + + config.searchcluster().stream().map(QrSearchersConfig.Searchcluster::name).toList()); } - private static ClusterParams makeClusterParams(int searchclusterIndex) { - return new ClusterParams("sc" + searchclusterIndex + ".num" + 0); + private static ClusterParams makeClusterParams(int searchclusterIndex, String defaultSummary, + DocumentdbInfoConfig documentDbConfig, SchemaInfo schemaInfo) + { + return new ClusterParams("sc" + searchclusterIndex + ".num" + 0, UUID.randomUUID().toString(), + defaultSummary, documentDbConfig, schemaInfo); } - private static FastSearcher searchDispatch(int searchclusterIndex, - String searchClusterName, - String serverId, - SummaryParameters docSumParams, - DocumentdbInfoConfig documentdbInfoConfig, - SchemaInfo schemaInfo, - ComponentRegistry<Dispatcher> dispatchers) { - ClusterParams clusterParams = makeClusterParams(searchclusterIndex); + private static IndexedBackend searchDispatch(ClusterParams clusterParams, + String searchClusterName, + ComponentRegistry<Dispatcher> dispatchers) + { ComponentId dispatcherComponentId = new ComponentId("dispatcher." + searchClusterName); Dispatcher dispatcher = dispatchers.getComponent(dispatcherComponentId); if (dispatcher == null) - throw new IllegalArgumentException("Configuration error: No dispatcher " + dispatcherComponentId + - " is configured"); - return new FastSearcher(serverId, dispatcher, docSumParams, clusterParams, documentdbInfoConfig, schemaInfo); + throw new IllegalArgumentException("Configuration error: No dispatcher " + dispatcherComponentId + " is configured"); + return new IndexedBackend(clusterParams, dispatcher); } - private static StreamingSearcher streamingCluster(String serverId, - int searchclusterIndex, - QrSearchersConfig.Searchcluster searchClusterConfig, - SummaryParameters docSumParams, - DocumentdbInfoConfig documentdbInfoConfig, - SchemaInfo schemaInfo, - VespaDocumentAccess access) { - if (searchClusterConfig.searchdef().size() != 1) - throw new IllegalArgumentException("Streaming search clusters can only contain a single schema but got " + - searchClusterConfig.searchdef()); - ClusterParams clusterParams = makeClusterParams(searchclusterIndex); - StreamingSearcher searcher = new StreamingSearcher(access); - searcher.setSearchClusterName(searchClusterConfig.rankprofiles_configid()); - searcher.setStorageClusterRouteSpec(searchClusterConfig.storagecluster().routespec()); - searcher.init(serverId, docSumParams, clusterParams, documentdbInfoConfig, schemaInfo); - return searcher; + private static StreamingBackend streamingCluster(ClusterParams clusterParams, + QrSearchersConfig.Searchcluster searchClusterConfig, + VespaDocumentAccess access) + { + return new StreamingBackend(clusterParams, searchClusterConfig.rankprofiles_configid(), + access, searchClusterConfig.storagecluster().routespec()); } /** Do not use, for internal testing purposes only. **/ - ClusterSearcher(SchemaInfo schemaInfo, Set<String> schemas, VespaBackEndSearcher searcher, Executor executor) { + ClusterSearcher(SchemaInfo schemaInfo, Map<String, VespaBackend> schema2Searcher, Executor executor) { this.schemaInfo = schemaInfo; searchClusterName = "testScenario"; maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT; maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT; - server = searcher; this.executor = executor; this.globalPhaseRanker = null; - this.schemas = schemas; + this.schema2Searcher = schema2Searcher; } /** Do not use, for internal testing purposes only. **/ - ClusterSearcher(SchemaInfo schemaInfo, Set<String> schemas) { - this(schemaInfo, schemas, null, null); + ClusterSearcher(SchemaInfo schemaInfo, Map<String, VespaBackend> schema2Searcher) { + this(schemaInfo, schema2Searcher, null); } @Override public Result search(Query query, Execution execution) { validateQueryTimeout(query); validateQueryCache(query); - Searcher searcher = server; - if (searcher == null) { + if (schema2Searcher.isEmpty()) { return new Result(query, ErrorMessage.createNoBackendsInService("Could not search")); } if (query.getTimeLeft() <= 0) { return new Result(query, ErrorMessage.createTimeout("No time left for searching")); } - return doSearch(searcher, query, execution); + return doSearch(query); } @Override - public void fill(com.yahoo.search.Result result, String summaryClass, Execution execution) { + public void fill(Result result, String summaryClass, Execution execution) { + fill(result, summaryClass); + } + private void fill(Result result, String summaryClass) { Query query = result.getQuery(); - - Searcher searcher = server; - if (searcher != null) { - if (query.getTimeLeft() > 0) { - searcher.fill(result, summaryClass, execution); - } else { - if (result.hits().getErrorHit() == null) { - result.hits().addError(ErrorMessage.createTimeout("No time left to get summaries, query timeout was " + - query.getTimeout() + " ms")); + var restrict = query.getModel().getRestrict(); + Collection<VespaBackend> servers = (restrict != null && ! restrict.isEmpty()) + ? query.getModel().getRestrict().stream() + .map(schema2Searcher::get) + .collect(Collectors.toCollection(TinyIdentitySet::new)) + : schema2Searcher.values().stream().collect(Collectors.toCollection(TinyIdentitySet::new)); + + if ( ! servers.isEmpty() ) { + for (var server : servers) { + if (query.getTimeLeft() > 0) { + server.fill(result, summaryClass); + } else { + if (result.hits().getErrorHit() == null) { + result.hits().addError(ErrorMessage.createTimeout("No time left to get summaries, query timeout was " + + query.getTimeout() + " ms")); + } } } } else { @@ -230,22 +229,21 @@ public class ClusterSearcher extends Searcher { query.getRanking().setQueryCache(false); } - private Result doSearch(Searcher searcher, Query query, Execution execution) { - if (schemas.size() > 1) { - return searchMultipleDocumentTypes(searcher, query, execution); + private Result doSearch(Query query) { + if (schema2Searcher.size() > 1) { + return searchMultipleDocumentTypes(query); } else { - String docType = schemas.iterator().next(); - query.getModel().setRestrict(docType); - return perSchemaSearch(searcher, query, execution); + String schema = schema2Searcher.keySet().iterator().next(); + query.getModel().setRestrict(schema); + return perSchemaSearch(schema, query); } } - private Result perSchemaSearch(Searcher searcher, Query query, Execution execution) { + private Result perSchemaSearch(String schema, Query query) { Set<String> restrict = query.getModel().getRestrict(); if (restrict.size() != 1) { throw new IllegalStateException("perSchemaSearch must always be called with 1 schema, got: " + restrict.size()); } - String schema = restrict.iterator().next(); int rerankCount = globalPhaseRanker != null ? globalPhaseRanker.getRerankCount(query, schema) : 0; boolean useGlobalPhase = rerankCount > 0; final int wantOffset = query.getOffset(); @@ -257,7 +255,7 @@ public class ClusterSearcher extends Searcher { query.setOffset(0); query.setHits(useHits); } - Result result = searcher.search(query, execution); + Result result = schema2Searcher.get(schema).search(schema, query); if (useGlobalPhase) { globalPhaseRanker.rerankHits(query, result, schema); result.hits().trim(wantOffset, wantHits); @@ -284,16 +282,17 @@ public class ClusterSearcher extends Searcher { } } - private Result searchMultipleDocumentTypes(Searcher searcher, Query query, Execution execution) { + private Result searchMultipleDocumentTypes(Query query) { Set<String> schemas = resolveSchemas(query); - List<Query> queries = createQueries(query, schemas); - if (queries.size() == 1) { - return perSchemaSearch(searcher, queries.get(0), execution); + Map<String, Query> schemaQueries = createQueries(query, schemas); + if (schemaQueries.size() == 1) { + var entry = schemaQueries.entrySet().iterator().next(); + return perSchemaSearch(entry.getKey(), entry.getValue()); } else { Result mergedResult = new Result(query); - List<FutureTask<Result>> pending = new ArrayList<>(queries.size()); - for (Query q : queries) { - FutureTask<Result> task = new FutureTask<>(() -> perSchemaSearch(searcher, q, execution)); + List<FutureTask<Result>> pending = new ArrayList<>(schemaQueries.size()); + for (var entry : schemaQueries.entrySet()) { + FutureTask<Result> task = new FutureTask<>(() -> perSchemaSearch(entry.getKey(), entry.getValue())); try { executor.execute(task); pending.add(task); @@ -309,7 +308,7 @@ public class ClusterSearcher extends Searcher { if (query.getOffset() > 0 || query.getHits() < mergedResult.hits().size()) { if (mergedResult.getHitOrderer() != null) { // Make sure we have the necessary data for sorting - searcher.fill(mergedResult, VespaBackEndSearcher.SORTABLE_ATTRIBUTES_SUMMARY_CLASS, execution); + fill(mergedResult, VespaBackend.SORTABLE_ATTRIBUTES_SUMMARY_CLASS); } mergedResult.hits().trim(query.getOffset(), query.getHits()); query.setOffset(0); // Needed when doing a trim @@ -326,7 +325,7 @@ public class ClusterSearcher extends Searcher { candidates.addAll(cluster.schemas()); } return (candidates.isEmpty() ? sources : candidates).stream() - .filter(schemas::contains).collect(Collectors.toUnmodifiableSet()); + .filter(schema2Searcher::containsKey).collect(Collectors.toUnmodifiableSet()); } Set<String> resolveSchemas(Query query) { @@ -334,7 +333,7 @@ public class ClusterSearcher extends Searcher { if (restrict == null || restrict.isEmpty()) { Set<String> sources = query.getModel().getSources(); return (sources == null || sources.isEmpty()) - ? schemas + ? schema2Searcher.keySet() : resolveSourceSubset(sources); } else { return filterValidDocumentTypes(restrict); @@ -344,34 +343,40 @@ public class ClusterSearcher extends Searcher { private Set<String> filterValidDocumentTypes(Collection<String> restrict) { Set<String> retval = new LinkedHashSet<>(); for (String docType : restrict) { - if (docType != null && schemas.contains(docType)) { + if (docType != null && schema2Searcher.containsKey(docType)) { retval.add(docType); } } return retval; } - private List<Query> createQueries(Query query, Set<String> docTypes) { + private Map<String, Query> createQueries(Query query, Set<String> schemas) { query.getModel().getQueryTree(); // performance: parse query before cloning such that it is only done once - List<Query> retval = new ArrayList<>(docTypes.size()); - if (docTypes.size() == 1) { - query.getModel().setRestrict(docTypes.iterator().next()); - retval.add(query); - } else if ( ! docTypes.isEmpty() ) { - for (String docType : docTypes) { + if (schemas.size() == 1) { + String schema = schemas.iterator().next(); + query.getModel().setRestrict(schema); + return Map.of(schema, query); + } else if ( ! schemas.isEmpty() ) { + var schemaQueries = new HashMap<String, Query>(); + for (String schema : schemas) { Query q = query.clone(); q.setOffset(0); q.setHits(query.getOffset() + query.getHits()); - q.getModel().setRestrict(docType); - retval.add(q); + q.getModel().setRestrict(schema); + schemaQueries.put(schema, q); } + return schemaQueries; } - return retval; + return Map.of(); } @Override public void deconstruct() { - if (server != null) { + Map<String, VespaBackend> servers = new HashMap<>(); + for (var server : schema2Searcher.values()) { + servers.put(server.getName(), server); + } + for (var server : servers.values()) { server.shutDown(); } } |