diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-07 11:19:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-07 11:19:38 +0200 |
commit | 5f96fad42a48c72aab4270b11c7ddda07a7851b0 (patch) | |
tree | b53797c23e01d49fbd834a62d7d8acd657878e9a /container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java | |
parent | dd1f85033c5c28360e46fadebbd5eeac3855e2d2 (diff) | |
parent | 4c2f86281542b752785d75f2f5eb8000ea9abf0f (diff) |
Merge pull request #19451 from vespa-engine/balder/execute-queries-to-multiple-documenttypes-in-same-cluster-in-parallel
Execute a query over multiple document types in the same content clus…
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index fcae76fbdd4..b2f752182bb 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -9,7 +9,6 @@ import com.yahoo.container.QrConfig; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.core.documentapi.VespaDocumentAccess; import com.yahoo.container.handler.VipStatus; -import com.yahoo.documentapi.DocumentAccess; import com.yahoo.prelude.IndexFacts; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; @@ -35,6 +34,10 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING; @@ -48,6 +51,9 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S @After("*") public class ClusterSearcher extends Searcher { + private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; + private final static long DEFAULT_MAX_QUERY_CACHE_TIMEOUT = 10000L; + private final String searchClusterName; // The set of document types contained in this search cluster @@ -57,15 +63,14 @@ public class ClusterSearcher extends Searcher { private final Map<String, Set<String>> rankProfiles = new HashMap<>(); private final long maxQueryTimeout; // in milliseconds - private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; - private final long maxQueryCacheTimeout; // in milliseconds - private final static long DEFAULT_MAX_QUERY_CACHE_TIMEOUT = 10000L; - private VespaBackEndSearcher server = null; + private final VespaBackEndSearcher server; + private final Executor executor; @Inject public ClusterSearcher(ComponentId id, + Executor executor, QrSearchersConfig qrsConfig, ClusterConfig clusterConfig, DocumentdbInfoConfig documentDbConfig, @@ -74,7 +79,7 @@ public class ClusterSearcher extends Searcher { VipStatus vipStatus, VespaDocumentAccess access) { super(id); - + this.executor = executor; int searchClusterIndex = clusterConfig.clusterId(); searchClusterName = clusterConfig.clusterName(); QrSearchersConfig.Searchcluster searchClusterConfig = getSearchClusterConfigFromClusterName(qrsConfig, searchClusterName); @@ -97,18 +102,12 @@ public class ClusterSearcher extends Searcher { } if (searchClusterConfig.indexingmode() == STREAMING) { - VdsStreamingSearcher searcher = vdsCluster(qrConfig.discriminator(), searchClusterIndex, + server = vdsCluster(qrConfig.discriminator(), searchClusterIndex, searchClusterConfig, docSumParams, documentDbConfig, access); - addBackendSearcher(searcher); - vipStatus.addToRotation(searcher.getName()); + vipStatus.addToRotation(server.getName()); } else { - FastSearcher searcher = searchDispatch(searchClusterIndex, searchClusterName, qrConfig.discriminator(), + server = searchDispatch(searchClusterIndex, searchClusterName, qrConfig.discriminator(), docSumParams, documentDbConfig, dispatchers); - addBackendSearcher(searcher); - - } - if ( server == null ) { - throw new IllegalStateException("ClusterSearcher should have backend."); } } @@ -159,15 +158,17 @@ public class ClusterSearcher extends Searcher { } /** Do not use, for internal testing purposes only. **/ - ClusterSearcher(Set<String> documentTypes) { + ClusterSearcher(Set<String> documentTypes, VespaBackEndSearcher searcher, Executor executor) { this.documentTypes = documentTypes; searchClusterName = "testScenario"; maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT; maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT; - } - - void addBackendSearcher(VespaBackEndSearcher searcher) { server = searcher; + this.executor = executor; + } + /** Do not use, for internal testing purposes only. **/ + ClusterSearcher(Set<String> documentTypes) { + this(documentTypes, null, null); } void addValidRankProfile(String profileName, String docTypeName) { @@ -305,6 +306,17 @@ public class ClusterSearcher extends Searcher { } } + private void processResult(Query query, FutureTask<Result> task, Result mergedResult) { + try { + Result result = task.get(); + mergedResult.mergeWith(result); + mergedResult.hits().addAll(result.hits().asUnorderedHits()); + } catch (ExecutionException | InterruptedException e) { + mergedResult.mergeWith(new Result(query, + ErrorMessage.createInternalServerError("Unable to query restrict='" + query.getModel().getRestrict() + "'\n" + e))); + } + } + private Result searchMultipleDocumentTypes(Searcher searcher, Query query, Execution execution) { Set<String> docTypes = resolveDocumentTypes(query, execution.context().getIndexFacts()); @@ -316,10 +328,19 @@ public class ClusterSearcher extends Searcher { return searcher.search(queries.get(0), execution); } else { Result mergedResult = new Result(query); + List<FutureTask<Result>> pending = new ArrayList<>(queries.size()); for (Query q : queries) { - Result result = searcher.search(q, execution); - mergedResult.mergeWith(result); - mergedResult.hits().addAll(result.hits().asUnorderedHits()); + FutureTask<Result> task = new FutureTask<>(() -> searcher.search(q, execution)); + try { + executor.execute(task); + pending.add(task); + } catch (RejectedExecutionException rej) { + task.run(); + processResult(query, task, mergedResult); + } + } + for (FutureTask<Result> task : pending) { + processResult(query, task, mergedResult); } // Should we trim the merged result? if (query.getOffset() > 0 || query.getHits() < mergedResult.hits().size()) { |