summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-09-10 19:33:34 +0200
committerGitHub <noreply@github.com>2019-09-10 19:33:34 +0200
commit9b16a5917acf4fabc9c4f4ebbb30974262ccbc74 (patch)
tree8152262d8a977a6fe8001e918833efaf65f3e672
parent806be081fd15e9c5572e4618394de6bb34b3885f (diff)
parent1b0904eddb1e38b4eb46eea466dd928a95594d06 (diff)
Merge pull request #10591 from vespa-engine/balder/merge-topn-instead-of-append-sort-trim-rebased
Balder/merge topn instead of append sort trim rebased
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java69
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java3
4 files changed, 55 insertions, 30 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java
index a06b1518a12..da3c52969ca 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java
@@ -158,6 +158,15 @@ public class FastHit extends Hit {
this.sortDataSorting = sorting;
}
+ @Override
+ public int compareTo(Hit other) {
+ int cmpRes = 0;
+ if ((sortData != null) && (other instanceof FastHit) && hasSortData(((FastHit) other).sortDataSorting)) {
+ cmpRes = SortDataHitSorter.getComparator(sortDataSorting, null).compare(this, other);
+ }
+ return (cmpRes != 0) ? cmpRes : super.compareTo(other);
+ }
+
public boolean hasSortData(Sorting sorting) {
return sortData != null && sortDataSorting != null && sortDataSorting.equals(sorting);
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index bf0abb1132f..76157175c3d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -6,13 +6,16 @@ import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
@@ -45,8 +48,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private long adaptiveTimeoutMax = 0;
private long deadline = 0;
- private Result result = null;
-
private long answeredDocs = 0;
private long answeredActiveDocs = 0;
private long answeredSoonActiveDocs = 0;
@@ -56,8 +57,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private boolean timedOut = false;
private boolean degradedByMatchPhase = false;
- private boolean trimResult = false;
-
public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, SearchCluster searchCluster, Set<Integer> alreadyFailedNodes) {
super(Optional.empty());
this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
@@ -82,7 +81,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
int originalOffset = query.getOffset();
query.setHits(query.getHits() + query.getOffset());
query.setOffset(0);
- trimResult = originalHits != query.getHits() || originalOffset != query.getOffset();
for (SearchInvoker invoker : invokers) {
invoker.sendSearchRequest(query);
@@ -95,6 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
@Override
protected Result getSearchResult(Execution execution) throws IOException {
+ Result result = new Result(query);
+ List<Hit> merged = Collections.emptyList();
long nextTimeout = query.getTimeLeft();
try {
while (!invokers.isEmpty() && nextTimeout >= 0) {
@@ -103,7 +103,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
log.fine(() -> "Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received");
break;
} else {
- mergeResult(invoker.getSearchResult(execution));
+ merged = mergeResult(result, invoker.getSearchResult(execution), merged);
ejectInvoker(invoker);
}
nextTimeout = nextTimeout();
@@ -112,24 +112,16 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
throw new RuntimeException("Interrupted while waiting for search results", e);
}
- if (result == null) {
- result = new Result(query);
- }
- insertNetworkErrors();
+ insertNetworkErrors(result);
result.setCoverage(createCoverage());
- trimResult(execution);
- Result ret = result;
- result = null;
- return ret;
- }
-
- private void trimResult(Execution execution) {
- if (trimResult || result.hits().size() > query.getHits()) {
- result.hits().trim(query.getOffset(), query.getHits());
+ int needed = query.getOffset() + query.getHits();
+ for (int index = query.getOffset(); (index < merged.size()) && (index < needed); index++) {
+ result.hits().add(merged.get(index));
}
+ return result;
}
- private void insertNetworkErrors() {
+ private void insertNetworkErrors(Result result) {
// Network errors will be reported as errors only when all nodes fail, otherwise they are just traced
boolean asErrors = answeredNodes == 0;
@@ -195,15 +187,40 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
return nextAdaptive;
}
- private void mergeResult(Result partialResult) {
+ private List<Hit> mergeResult(Result result, Result partialResult, List<Hit> current) {
collectCoverage(partialResult.getCoverage(true));
- if (result == null) {
- result = partialResult;
- return;
- }
result.mergeWith(partialResult);
- result.hits().addAll(partialResult.hits().asUnorderedHits());
+ List<Hit> partial = partialResult.hits().asUnorderedHits();
+ if (current.isEmpty() ) {
+ return partial;
+ }
+ if (partial.isEmpty()) {
+ return current;
+ }
+
+ int needed = query.getOffset() + query.getHits();
+ List<Hit> merged = new ArrayList<>(needed);
+ int indexCurrent = 0;
+ int indexPartial = 0;
+ while (indexCurrent < current.size() && indexPartial < partial.size() && merged.size() < needed) {
+ int cmpRes = current.get(indexCurrent).compareTo(partial.get(indexPartial));
+ if (cmpRes < 0) {
+ merged.add(current.get(indexCurrent++));
+ } else if (cmpRes > 0) {
+ merged.add(partial.get(indexPartial++));
+ } else { // Duplicates
+ merged.add(current.get(indexCurrent++));
+ indexPartial++;
+ }
+ }
+ while ((indexCurrent < current.size()) && (merged.size() < needed)) {
+ merged.add(current.get(indexCurrent++));
+ }
+ while ((indexPartial < partial.size()) && (merged.size() < needed)) {
+ merged.add(partial.get(indexPartial++));
+ }
+ return merged;
}
private void collectCoverage(Coverage source) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
index 3db5e291a45..080e358d083 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
@@ -201,7 +201,7 @@ public class ProtobufSerialization {
int hitItems = protobuf.getHitsCount();
var haveGrouping = protobuf.getGroupingBlob() != null && !protobuf.getGroupingBlob().isEmpty();
- if(haveGrouping) {
+ if (haveGrouping) {
hitItems++;
}
result.hits().ensureCapacity(hitItems);
@@ -238,7 +238,7 @@ public class ProtobufSerialization {
result.hits().add(hit);
}
- if(sorting != null) {
+ if (sorting != null) {
result.hits().setSorted(true);
}
var slimeTrace = protobuf.getSlimeTrace();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index d118d1835a8..9f42e794f5e 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -83,8 +83,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
ProtobufResponse protobufResponse = response.response().get();
CompressionType compression = CompressionType.valueOf(protobufResponse.compression());
- byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression,
- protobufResponse.uncompressedSize());
+ byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize());
var result = ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key());
return result;