diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-09-10 19:33:34 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-10 19:33:34 +0200 |
commit | 9b16a5917acf4fabc9c4f4ebbb30974262ccbc74 (patch) | |
tree | 8152262d8a977a6fe8001e918833efaf65f3e672 | |
parent | 806be081fd15e9c5572e4618394de6bb34b3885f (diff) | |
parent | 1b0904eddb1e38b4eb46eea466dd928a95594d06 (diff) |
Merge pull request #10591 from vespa-engine/balder/merge-topn-instead-of-append-sort-trim-rebased
Balder/merge topn instead of append sort trim rebased
4 files changed, 55 insertions, 30 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java index a06b1518a12..da3c52969ca 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java @@ -158,6 +158,15 @@ public class FastHit extends Hit { this.sortDataSorting = sorting; } + @Override + public int compareTo(Hit other) { + int cmpRes = 0; + if ((sortData != null) && (other instanceof FastHit) && hasSortData(((FastHit) other).sortDataSorting)) { + cmpRes = SortDataHitSorter.getComparator(sortDataSorting, null).compare(this, other); + } + return (cmpRes != 0) ? cmpRes : super.compareTo(other); + } + public boolean hasSortData(Sorting sorting) { return sortData != null && sortDataSorting != null && sortDataSorting.equals(sorting); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index bf0abb1132f..76157175c3d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -6,13 +6,16 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; import com.yahoo.vespa.config.search.DispatchConfig; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.IdentityHashMap; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -45,8 +48,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private long adaptiveTimeoutMax = 0; private long deadline = 0; - private Result result = null; - private long answeredDocs = 0; private long answeredActiveDocs = 0; private long answeredSoonActiveDocs = 0; @@ -56,8 +57,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private boolean timedOut = false; private boolean degradedByMatchPhase = false; - private boolean trimResult = false; - public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, SearchCluster searchCluster, Set<Integer> alreadyFailedNodes) { super(Optional.empty()); this.invokers = Collections.newSetFromMap(new IdentityHashMap<>()); @@ -82,7 +81,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM int originalOffset = query.getOffset(); query.setHits(query.getHits() + query.getOffset()); query.setOffset(0); - trimResult = originalHits != query.getHits() || originalOffset != query.getOffset(); for (SearchInvoker invoker : invokers) { invoker.sendSearchRequest(query); @@ -95,6 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM @Override protected Result getSearchResult(Execution execution) throws IOException { + Result result = new Result(query); + List<Hit> merged = Collections.emptyList(); long nextTimeout = query.getTimeLeft(); try { while (!invokers.isEmpty() && nextTimeout >= 0) { @@ -103,7 +103,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM log.fine(() -> "Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received"); break; } else { - mergeResult(invoker.getSearchResult(execution)); + merged = mergeResult(result, invoker.getSearchResult(execution), merged); ejectInvoker(invoker); } nextTimeout = nextTimeout(); @@ -112,24 +112,16 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM throw new RuntimeException("Interrupted while waiting for search results", e); } - if (result == null) { - result = new Result(query); - } - insertNetworkErrors(); + insertNetworkErrors(result); result.setCoverage(createCoverage()); - trimResult(execution); - Result ret = result; - result = null; - return ret; - } - - private void trimResult(Execution execution) { - if (trimResult || result.hits().size() > query.getHits()) { - result.hits().trim(query.getOffset(), query.getHits()); + int needed = query.getOffset() + query.getHits(); + for (int index = query.getOffset(); (index < merged.size()) && (index < needed); index++) { + result.hits().add(merged.get(index)); } + return result; } - private void insertNetworkErrors() { + private void insertNetworkErrors(Result result) { // Network errors will be reported as errors only when all nodes fail, otherwise they are just traced boolean asErrors = answeredNodes == 0; @@ -195,15 +187,40 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM return nextAdaptive; } - private void mergeResult(Result partialResult) { + private List<Hit> mergeResult(Result result, Result partialResult, List<Hit> current) { collectCoverage(partialResult.getCoverage(true)); - if (result == null) { - result = partialResult; - return; - } result.mergeWith(partialResult); - result.hits().addAll(partialResult.hits().asUnorderedHits()); + List<Hit> partial = partialResult.hits().asUnorderedHits(); + if (current.isEmpty() ) { + return partial; + } + if (partial.isEmpty()) { + return current; + } + + int needed = query.getOffset() + query.getHits(); + List<Hit> merged = new ArrayList<>(needed); + int indexCurrent = 0; + int indexPartial = 0; + while (indexCurrent < current.size() && indexPartial < partial.size() && merged.size() < needed) { + int cmpRes = current.get(indexCurrent).compareTo(partial.get(indexPartial)); + if (cmpRes < 0) { + merged.add(current.get(indexCurrent++)); + } else if (cmpRes > 0) { + merged.add(partial.get(indexPartial++)); + } else { // Duplicates + merged.add(current.get(indexCurrent++)); + indexPartial++; + } + } + while ((indexCurrent < current.size()) && (merged.size() < needed)) { + merged.add(current.get(indexCurrent++)); + } + while ((indexPartial < partial.size()) && (merged.size() < needed)) { + merged.add(partial.get(indexPartial++)); + } + return merged; } private void collectCoverage(Coverage source) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java index 3db5e291a45..080e358d083 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java @@ -201,7 +201,7 @@ public class ProtobufSerialization { int hitItems = protobuf.getHitsCount(); var haveGrouping = protobuf.getGroupingBlob() != null && !protobuf.getGroupingBlob().isEmpty(); - if(haveGrouping) { + if (haveGrouping) { hitItems++; } result.hits().ensureCapacity(hitItems); @@ -238,7 +238,7 @@ public class ProtobufSerialization { result.hits().add(hit); } - if(sorting != null) { + if (sorting != null) { result.hits().setSorted(true); } var slimeTrace = protobuf.getSlimeTrace(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index d118d1835a8..9f42e794f5e 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -83,8 +83,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe ProtobufResponse protobufResponse = response.response().get(); CompressionType compression = CompressionType.valueOf(protobufResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, - protobufResponse.uncompressedSize()); + byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize()); var result = ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key()); return result; |