From f8f24d65c8f0de9f7b4e913285d6efbb96d34954 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 1 Mar 2022 14:37:04 +0100 Subject: Merge grouping results incrementally in search invoker --- .../search/dispatch/GroupingResultAggregator.java | 50 ++++++++++++++++++++++ .../search/dispatch/InterleavedSearchInvoker.java | 17 ++++++-- 2 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/GroupingResultAggregator.java (limited to 'container-search/src/main/java/com/yahoo/search/dispatch') diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/GroupingResultAggregator.java b/container-search/src/main/java/com/yahoo/search/dispatch/GroupingResultAggregator.java new file mode 100644 index 00000000000..5ce7accfdd4 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/GroupingResultAggregator.java @@ -0,0 +1,50 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; +import com.yahoo.prelude.fastsearch.GroupingListHit; +import com.yahoo.searchlib.aggregation.Grouping; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Incrementally merges underlying {@link Grouping} instances from {@link GroupingListHit} hits. + * + * @author bjorncs + */ +class GroupingResultAggregator { + private static final Logger log = Logger.getLogger(GroupingResultAggregator.class.getName()); + + private final Map groupings = new LinkedHashMap<>(); + private DocsumDefinitionSet documentDefinitions = null; + private int groupingHitsMerged = 0; + + void mergeWith(GroupingListHit result) { + if (groupingHitsMerged == 0) documentDefinitions = result.getDocsumDefinitionSet(); + ++groupingHitsMerged; + log.log(Level.FINE, () -> + String.format("Merging hit #%d having %d groupings", + groupingHitsMerged, result.getGroupingList().size())); + for (Grouping grouping : result.getGroupingList()) { + groupings.merge(grouping.getId(), grouping, (existingGrouping, newGrouping) -> { + existingGrouping.merge(newGrouping); + return existingGrouping; + }); + } + } + + Optional toAggregatedHit() { + if (groupingHitsMerged == 0) return Optional.empty(); + log.log(Level.FINE, () -> + String.format("Creating aggregated hit containing %d groupings from %d hits", + groupings.size(), groupingHitsMerged)); + groupings.values().forEach(Grouping::postMerge); + return Optional.of(new GroupingListHit(List.copyOf(groupings.values()), documentDefinitions)); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index 4e658122cdf..d7c9f1dce53 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.yahoo.prelude.fastsearch.GroupingListHit; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; @@ -44,6 +45,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private final Group group; private final LinkedBlockingQueue availableForProcessing; private final Set alreadyFailedNodes; + private final boolean mergeGroupingResult; private Query query; private boolean adaptiveTimeoutCalculated = false; @@ -71,6 +73,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM this.group = group; this.availableForProcessing = newQueue(); this.alreadyFailedNodes = alreadyFailedNodes; + this.mergeGroupingResult = searchCluster.dispatchConfig().mergeGroupingResultInSearchInvokerEnabled(); } /** @@ -115,6 +118,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM long nextTimeout = query.getTimeLeft(); boolean extraDebug = (query.getOffset() == 0) && (query.getHits() == 7) && log.isLoggable(java.util.logging.Level.FINE); List processed = new ArrayList<>(); + var groupingResultAggregator = new GroupingResultAggregator(); try { while (!invokers.isEmpty() && nextTimeout >= 0) { SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS); @@ -126,7 +130,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM if (extraDebug) { processed.add(toMerge); } - merged = mergeResult(result.getResult(), toMerge, merged); + merged = mergeResult(result.getResult(), toMerge, merged, groupingResultAggregator); ejectInvoker(invoker); } nextTimeout = nextTimeout(); @@ -134,6 +138,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM } catch (InterruptedException e) { throw new RuntimeException("Interrupted while waiting for search results", e); } + groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h)); insertNetworkErrors(result.getResult()); result.getResult().setCoverage(createCoverage()); @@ -238,14 +243,20 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM return nextAdaptive; } - private List mergeResult(Result result, InvokerResult partialResult, List current) { + private List mergeResult(Result result, InvokerResult partialResult, List current, + GroupingResultAggregator groupingResultAggregator) { collectCoverage(partialResult.getResult().getCoverage(true)); result.mergeWith(partialResult.getResult()); List partialNonLean = partialResult.getResult().hits().asUnorderedHits(); for(Hit hit : partialNonLean) { if (hit.isAuxiliary()) { - result.hits().add(hit); + if (hit instanceof GroupingListHit && mergeGroupingResult) { + var groupingHit = (GroupingListHit) hit; + groupingResultAggregator.mergeWith(groupingHit); + } else { + result.hits().add(hit); + } } } if (current.isEmpty() ) { -- cgit v1.2.3