From 4c2f86281542b752785d75f2f5eb8000ea9abf0f Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 7 Oct 2021 11:01:01 +0200 Subject: Execute a query over multiple document types in the same content cluster in parallel towards the backend. --- .../com/yahoo/prelude/cluster/ClusterSearcher.java | 65 ++++++++++++++-------- .../prelude/cluster/ClusterSearcherTestCase.java | 7 ++- 2 files changed, 48 insertions(+), 24 deletions(-) (limited to 'container-search') diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index 8685400b5c1..d312b3dd1bd 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -9,7 +9,6 @@ import com.yahoo.container.QrConfig; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.core.documentapi.VespaDocumentAccess; import com.yahoo.container.handler.VipStatus; -import com.yahoo.documentapi.DocumentAccess; import com.yahoo.prelude.IndexFacts; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; @@ -35,6 +34,10 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING; @@ -48,6 +51,9 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S @After("*") public class ClusterSearcher extends Searcher { + private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; + private final static long DEFAULT_MAX_QUERY_CACHE_TIMEOUT = 10000L; + private final String searchClusterName; // The set of document types contained in this search cluster @@ -57,15 +63,14 @@ public class ClusterSearcher extends Searcher { private final Map> rankProfiles = new HashMap<>(); private final long maxQueryTimeout; // in milliseconds - private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; - private final long maxQueryCacheTimeout; // in milliseconds - private final static long DEFAULT_MAX_QUERY_CACHE_TIMEOUT = 10000L; - private VespaBackEndSearcher server = null; + private final VespaBackEndSearcher server; + private final Executor executor; @Inject public ClusterSearcher(ComponentId id, + Executor executor, QrSearchersConfig qrsConfig, ClusterConfig clusterConfig, DocumentdbInfoConfig documentDbConfig, @@ -74,7 +79,7 @@ public class ClusterSearcher extends Searcher { VipStatus vipStatus, VespaDocumentAccess access) { super(id); - + this.executor = executor; int searchClusterIndex = clusterConfig.clusterId(); searchClusterName = clusterConfig.clusterName(); QrSearchersConfig.Searchcluster searchClusterConfig = getSearchClusterConfigFromClusterName(qrsConfig, searchClusterName); @@ -97,18 +102,12 @@ public class ClusterSearcher extends Searcher { } if (searchClusterConfig.indexingmode() == STREAMING) { - VdsStreamingSearcher searcher = vdsCluster(qrConfig.discriminator(), searchClusterIndex, + server = vdsCluster(qrConfig.discriminator(), searchClusterIndex, searchClusterConfig, docSumParams, documentDbConfig, access); - addBackendSearcher(searcher); - vipStatus.addToRotation(searcher.getName()); + vipStatus.addToRotation(server.getName()); } else { - FastSearcher searcher = searchDispatch(searchClusterIndex, searchClusterName, qrConfig.discriminator(), + server = searchDispatch(searchClusterIndex, searchClusterName, qrConfig.discriminator(), docSumParams, documentDbConfig, dispatchers); - addBackendSearcher(searcher); - - } - if ( server == null ) { - throw new IllegalStateException("ClusterSearcher should have backend."); } } @@ -159,15 +158,17 @@ public class ClusterSearcher extends Searcher { } /** Do not use, for internal testing purposes only. **/ - ClusterSearcher(Set documentTypes) { + ClusterSearcher(Set documentTypes, VespaBackEndSearcher searcher, Executor executor) { this.documentTypes = documentTypes; searchClusterName = "testScenario"; maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT; maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT; - } - - void addBackendSearcher(VespaBackEndSearcher searcher) { server = searcher; + this.executor = executor; + } + /** Do not use, for internal testing purposes only. **/ + ClusterSearcher(Set documentTypes) { + this(documentTypes, null, null); } void addValidRankProfile(String profileName, String docTypeName) { @@ -305,6 +306,17 @@ public class ClusterSearcher extends Searcher { } } + private void processResult(Query query, FutureTask task, Result mergedResult) { + try { + Result result = task.get(); + mergedResult.mergeWith(result); + mergedResult.hits().addAll(result.hits().asUnorderedHits()); + } catch (ExecutionException | InterruptedException e) { + mergedResult.mergeWith(new Result(query, + ErrorMessage.createInternalServerError("Unable to query restrict='" + query.getModel().getRestrict() + "'\n" + e))); + } + } + private Result searchMultipleDocumentTypes(Searcher searcher, Query query, Execution execution) { Set docTypes = resolveDocumentTypes(query, execution.context().getIndexFacts()); @@ -316,10 +328,19 @@ public class ClusterSearcher extends Searcher { return searcher.search(queries.get(0), execution); } else { Result mergedResult = new Result(query); + List> pending = new ArrayList<>(queries.size()); for (Query q : queries) { - Result result = searcher.search(q, execution); - mergedResult.mergeWith(result); - mergedResult.hits().addAll(result.hits().asUnorderedHits()); + FutureTask task = new FutureTask<>(() -> searcher.search(q, execution)); + try { + executor.execute(task); + pending.add(task); + } catch (RejectedExecutionException rej) { + task.run(); + processResult(query, task, mergedResult); + } + } + for (FutureTask task : pending) { + processResult(query, task, mergedResult); } // Should we trim the merged result? if (query.getOffset() > 0 || query.getHits() < mergedResult.hits().size()) { diff --git a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java index 38c231cb1bb..361c1ed9692 100644 --- a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java @@ -4,6 +4,7 @@ package com.yahoo.prelude.cluster; import com.google.common.collect.ImmutableList; import com.yahoo.component.ComponentId; import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.concurrent.InThreadExecutorService; import com.yahoo.container.QrConfig; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.handler.ClustersStatus; @@ -263,9 +264,10 @@ public class ClusterSearcherTestCase { private Execution createExecution(List docTypesList, boolean expectAttributePrefetch) { Set documentTypes = new LinkedHashSet<>(docTypesList); - ClusterSearcher cluster = new ClusterSearcher(documentTypes); + ClusterSearcher cluster = new ClusterSearcher(documentTypes, + new MyMockSearcher(expectAttributePrefetch), + new InThreadExecutorService()); try { - cluster.addBackendSearcher(new MyMockSearcher(expectAttributePrefetch)); cluster.setValidRankProfile("default", documentTypes); cluster.addValidRankProfile("testprofile", "type1"); return new Execution(cluster, Execution.Context.createContextStub()); @@ -523,6 +525,7 @@ public class ClusterSearcherTestCase { dispatchers.register(new ComponentId("dispatcher." + clusterName), dispatcher); return new ClusterSearcher(new ComponentId("test-id"), + new InThreadExecutorService(), qrSearchersConfig.build(), clusterConfig.build(), documentDbConfig.build(), -- cgit v1.2.3