From 91dd5bc9eb95701aeb3110fd402257084634aa73 Mon Sep 17 00:00:00 2001 From: Olli Virtanen Date: Mon, 11 Mar 2019 14:46:03 +0100 Subject: Protobuf over jrt support in search --- container-search/abi-spec.json | 15 ++- container-search/pom.xml | 5 + .../java/com/yahoo/fs4/mplex/FS4Connection.java | 1 - .../prelude/fastsearch/FS4InvokerFactory.java | 46 +++----- .../yahoo/prelude/fastsearch/FS4SearchInvoker.java | 36 ++----- .../com/yahoo/prelude/fastsearch/FastSearcher.java | 11 +- .../prelude/fastsearch/VespaBackEndSearcher.java | 2 +- .../java/com/yahoo/prelude/query/Highlight.java | 15 ++- .../src/main/java/com/yahoo/search/Query.java | 43 +++++++- .../src/main/java/com/yahoo/search/Result.java | 79 +++++++++++++- .../java/com/yahoo/search/dispatch/Client.java | 54 ++++++++++ .../java/com/yahoo/search/dispatch/Dispatcher.java | 54 ++++++---- .../com/yahoo/search/dispatch/InvokerFactory.java | 41 +++++++ .../java/com/yahoo/search/dispatch/RpcClient.java | 53 ++++++++- .../yahoo/search/dispatch/RpcInvokerFactory.java | 94 ++++++++++++++++ .../com/yahoo/search/dispatch/RpcResourcePool.java | 23 ---- .../yahoo/search/dispatch/RpcSearchInvoker.java | 119 +++++++++++++++++++++ .../com/yahoo/search/dispatch/SearchInvoker.java | 11 ++ .../yahoo/search/dispatch/searchcluster/Group.java | 6 ++ .../yahoo/search/dispatch/searchcluster/Node.java | 7 ++ .../main/java/com/yahoo/search/query/Model.java | 33 +++++- .../java/com/yahoo/search/query/Presentation.java | 11 +- .../main/java/com/yahoo/search/query/Ranking.java | 18 +++- .../main/java/com/yahoo/search/query/Sorting.java | 11 ++ .../yahoo/search/query/ranking/RankFeatures.java | 9 ++ .../yahoo/search/query/ranking/RankProperties.java | 12 +++ .../java/com/yahoo/search/result/Coverage.java | 16 +++ .../com/yahoo/search/dispatch/DispatcherTest.java | 34 ++++-- .../com/yahoo/search/dispatch/FillTestCase.java | 12 ++- .../java/com/yahoo/search/dispatch/MockClient.java | 20 ++++ .../search/dispatch/RpcSearchInvokerTest.java | 92 ++++++++++++++++ 31 files changed, 839 insertions(+), 144 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/RpcInvokerFactory.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/RpcSearchInvoker.java create mode 100644 container-search/src/test/java/com/yahoo/search/dispatch/RpcSearchInvokerTest.java (limited to 'container-search') diff --git a/container-search/abi-spec.json b/container-search/abi-spec.json index 02f82762d57..5b98d789a94 100644 --- a/container-search/abi-spec.json +++ b/container-search/abi-spec.json @@ -555,7 +555,8 @@ "public com.yahoo.prelude.query.Highlight clone()", "public java.util.Map getHighlightTerms()", "public void prepare()", - "public bridge synthetic java.lang.Object clone()" + "public bridge synthetic java.lang.Object clone()", + "public void addToProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Request$Builder, boolean)" ], "fields": [ "public static final java.lang.String HIGHLIGHTTERMS" @@ -1776,6 +1777,7 @@ "public bridge synthetic com.yahoo.processing.Request clone()", "public bridge synthetic com.yahoo.processing.request.Properties properties()", "public bridge synthetic com.yahoo.component.provider.FreezableClass clone()", + "public ai.vespa.searchlib.searchprotocol.protobuf.Search$Request toProtobuf(java.lang.String, boolean)", "public bridge synthetic java.lang.Object clone()" ], "fields": [ @@ -1834,6 +1836,8 @@ "public com.yahoo.collections.ListMap getHeaders(boolean)", "public bridge synthetic com.yahoo.component.provider.ListenableFreezableClass clone()", "public bridge synthetic com.yahoo.component.provider.FreezableClass clone()", + "public ai.vespa.searchlib.searchprotocol.protobuf.Search$Reply toProtobuf()", + "public static com.yahoo.search.Result fromProtobuf(com.yahoo.search.Query, ai.vespa.searchlib.searchprotocol.protobuf.Search$Reply, com.yahoo.prelude.fastsearch.DocumentDatabase)", "public bridge synthetic java.lang.Object clone()" ], "fields": [] @@ -4955,6 +4959,7 @@ "public com.yahoo.search.searchchain.Execution getExecution()", "public static com.yahoo.search.query.Model getFrom(com.yahoo.search.Query)", "public java.lang.String toString()", + "public void addToProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Request$Builder, boolean)", "public void prepare(com.yahoo.search.query.Ranking)" ], "fields": [ @@ -5013,6 +5018,7 @@ "public void setTiming(boolean)", "public java.util.Set getSummaryFields()", "public void prepare()", + "public void addToProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Request$Builder, boolean)", "public void setSummaryFields(java.lang.String)" ], "fields": [ @@ -5106,6 +5112,7 @@ "public void setSorting(com.yahoo.search.query.Sorting)", "public void setSorting(java.lang.String)", "public static com.yahoo.search.query.Ranking getFrom(com.yahoo.search.Query)", + "public void addToProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Request$Builder, boolean)", "public void prepare()" ], "fields": [ @@ -5335,6 +5342,7 @@ "public int hashCode()", "public boolean equals(java.lang.Object)", "public int encode(java.nio.ByteBuffer)", + "public void addToProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Request$Builder, boolean)", "public bridge synthetic java.lang.Object clone()" ], "fields": [ @@ -6434,6 +6442,7 @@ "public int hashCode()", "public com.yahoo.search.query.ranking.RankFeatures clone()", "public java.lang.String toString()", + "public void addToProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Request$Builder, boolean)", "public bridge synthetic java.lang.Object clone()" ], "fields": [] @@ -6459,6 +6468,7 @@ "public int hashCode()", "public com.yahoo.search.query.ranking.RankProperties clone()", "public java.lang.String toString()", + "public void addToProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Request$Builder, boolean)", "public bridge synthetic java.lang.Object clone()" ], "fields": [] @@ -6981,7 +6991,8 @@ "public com.yahoo.search.result.Coverage setSoonActive(long)", "public com.yahoo.search.result.Coverage setDegradedReason(int)", "public com.yahoo.search.result.Coverage setNodesTried(int)", - "public bridge synthetic com.yahoo.container.handler.Coverage setNodesTried(int)" + "public bridge synthetic com.yahoo.container.handler.Coverage setNodesTried(int)", + "public static com.yahoo.search.result.Coverage fromProtobuf(ai.vespa.searchlib.searchprotocol.protobuf.Search$Reply)" ], "fields": [] }, diff --git a/container-search/pom.xml b/container-search/pom.xml index 35b1477f478..1abaee91d20 100644 --- a/container-search/pom.xml +++ b/container-search/pom.xml @@ -138,6 +138,11 @@ + + com.google.protobuf + protobuf-java + provided + javax.xml.bind jaxb-api diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java index 11e7d4e3d35..69267f4a6b2 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java @@ -18,7 +18,6 @@ import com.yahoo.fs4.PacketListener; import com.yahoo.io.Connection; import com.yahoo.io.Listener; import com.yahoo.log.LogLevel; -import com.yahoo.search.Query; /** * diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index 5700e316493..66088faed79 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -8,12 +8,10 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InterleavedFillInvoker; import com.yahoo.search.dispatch.InterleavedSearchInvoker; -import com.yahoo.search.dispatch.SearchErrorInvoker; +import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; import java.util.ArrayList; @@ -33,15 +31,13 @@ import java.util.Set; * * @author ollivir */ -public class FS4InvokerFactory { +public class FS4InvokerFactory extends InvokerFactory { private final FS4ResourcePool fs4ResourcePool; - private final VespaBackEndSearcher searcher; private final SearchCluster searchCluster; private final ImmutableMap nodesByKey; - public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster, VespaBackEndSearcher searcher) { + public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster) { this.fs4ResourcePool = fs4ResourcePool; - this.searcher = searcher; this.searchCluster = searchCluster; ImmutableMap.Builder builder = ImmutableMap.builder(); @@ -49,7 +45,7 @@ public class FS4InvokerFactory { this.nodesByKey = builder.build(); } - public SearchInvoker getSearchInvoker(Query query, Node node) { + public SearchInvoker createSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node) { Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); return new FS4SearchInvoker(searcher, query, backend.openChannel(), Optional.of(node)); } @@ -57,6 +53,8 @@ public class FS4InvokerFactory { /** * Create a {@link SearchInvoker} for a list of content nodes. * + * @param searcher + * the searcher processing the query * @param query * the search query being processed * @param groupId @@ -70,7 +68,9 @@ public class FS4InvokerFactory { * @return Optional containing the SearchInvoker or empty if some node in the * list is invalid and the remaining coverage is not sufficient */ - public Optional getSearchInvoker(Query query, OptionalInt groupId, List nodes, boolean acceptIncompleteCoverage) { + @Override + public Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, List nodes, + boolean acceptIncompleteCoverage) { List invokers = new ArrayList<>(nodes.size()); Set failed = null; for (Node node : nodes) { @@ -114,44 +114,27 @@ public class FS4InvokerFactory { } } - private SearchInvoker createCoverageErrorInvoker(List nodes, Set failed) { - StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: "); - int count = 0; - for (Node node : nodes) { - if (failed.contains(node.key())) { - if (count > 0) { - down.append(", "); - } - count++; - down.append(node.key()); - } - } - Coverage coverage = new Coverage(0, 0, 0); - coverage.setNodesTried(count); - return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage); - } - - public FillInvoker getFillInvoker(Query query, Node node) { - return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port()); + public FillInvoker createFillInvoker(VespaBackEndSearcher searcher, Result result, Node node) { + return new FS4FillInvoker(searcher, result.getQuery(), fs4ResourcePool, node.hostname(), node.fs4port()); } /** * Create a {@link FillInvoker} for a the hits in a {@link Result}. * + * @param searcher the searcher processing the query * @param result the Result containing hits that need to be filled * @return Optional containing the FillInvoker or empty if some hit is from an unknown content node */ - public Optional getFillInvoker(Result result) { + public Optional createFillInvoker(VespaBackEndSearcher searcher, Result result) { Collection requiredNodes = requiredFillNodes(result); - Query query = result.getQuery(); Map invokers = new HashMap<>(); for (Integer distKey : requiredNodes) { Node node = nodesByKey.get(distKey); if (node == null) { return Optional.empty(); } - invokers.put(distKey, getFillInvoker(query, node)); + invokers.put(distKey, createFillInvoker(searcher, result, node)); } if (invokers.size() == 1) { @@ -172,4 +155,5 @@ public class FS4InvokerFactory { } return requiredNodes; } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index d2910ba3fbc..24653db5671 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -12,13 +12,11 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.ResponseMonitor; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; import java.io.IOException; import java.util.Optional; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -27,6 +25,7 @@ import java.util.logging.Logger; * @author ollivir */ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor { + private static final Logger log = Logger.getLogger(FS4SearchInvoker.class.getName()); private final VespaBackEndSearcher searcher; private FS4Channel channel; @@ -46,8 +45,7 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor "got packets " + basicPackets.length + " packets"); basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; - if (isLoggingFine()) - getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); + log.finest(() -> "got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); if (query.getPresentation().getSummary() == null) query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); @@ -114,14 +110,6 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor invoker = dispatcher.getSearchInvoker(query, fs4InvokerFactory); + Optional invoker = dispatcher.getSearchInvoker(query, this); if (invoker.isPresent()) { return invoker.get(); } Optional direct = getDirectNode(query); if(direct.isPresent()) { - return fs4InvokerFactory.getSearchInvoker(query, direct.get()); + return dispatcher.getFS4InvokerFactory().createSearchInvoker(this, query, direct.get()); } return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty()); } @@ -228,14 +225,14 @@ public class FastSearcher extends VespaBackEndSearcher { */ private FillInvoker getFillInvoker(Result result) { Query query = result.getQuery(); - Optional invoker = dispatcher.getFillInvoker(result, this, getDocumentDatabase(query), fs4InvokerFactory); + Optional invoker = dispatcher.getFillInvoker(result, this); if (invoker.isPresent()) { return invoker.get(); } Optional direct = getDirectNode(query); if (direct.isPresent()) { - return fs4InvokerFactory.getFillInvoker(query, direct.get()); + return dispatcher.getFS4InvokerFactory().createFillInvoker(this, result, direct.get()); } return new FS4FillInvoker(this, query, dispatchBackend); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index bee1fab5686..dccda0bf733 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -122,7 +122,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { public String getServerId() { return serverId; } - protected DocumentDatabase getDocumentDatabase(Query query) { + public DocumentDatabase getDocumentDatabase(Query query) { if (query.getModel().getRestrict().size() == 1) { String docTypeName = (String)query.getModel().getRestrict().toArray()[0]; DocumentDatabase db = documentDbs.get(docTypeName); diff --git a/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java b/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java index 81c68ccc2b9..023923d34ca 100644 --- a/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java +++ b/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java @@ -1,7 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.query; -import java.util.*; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; +import com.yahoo.searchlib.protobuf.MapConverter; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import static com.yahoo.language.LinguisticsCase.toLowerCase; @@ -131,7 +138,11 @@ public class Highlight implements Cloneable { } } - + public void addToProtobuf(Search.Request.Builder builder, boolean encodeQueryData) { + if(encodeQueryData) { + MapConverter.convertStringMultiMap(highlightTerms, builder::addHighlightTerms); + } + } } diff --git a/container-search/src/main/java/com/yahoo/search/Query.java b/container-search/src/main/java/com/yahoo/search/Query.java index 97a8c35bfa3..b5871019bac 100644 --- a/container-search/src/main/java/com/yahoo/search/Query.java +++ b/container-search/src/main/java/com/yahoo/search/Query.java @@ -1,12 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; import com.yahoo.collections.Tuple2; import com.yahoo.component.Version; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.fs4.MapEncoder; +import com.yahoo.io.GrowableByteBuffer; import com.yahoo.log.LogLevel; import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.prelude.query.Highlight; @@ -14,6 +17,7 @@ import com.yahoo.prelude.query.QueryException; import com.yahoo.prelude.query.textualrepresentation.TextualQueryRepresentation; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.federation.FederationSearcher; +import com.yahoo.search.grouping.vespa.GroupingExecutor; import com.yahoo.search.query.Model; import com.yahoo.search.query.ParameterParser; import com.yahoo.search.query.Presentation; @@ -45,8 +49,10 @@ import com.yahoo.search.query.properties.RequestContextProperties; import com.yahoo.search.yql.NullItemException; import com.yahoo.search.yql.VespaSerializer; import com.yahoo.search.yql.YqlParser; +import com.yahoo.searchlib.aggregation.Grouping; +import com.yahoo.vespa.objects.BufferSerializer; import com.yahoo.yolean.Exceptions; -import edu.umd.cs.findbugs.annotations.Nullable; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -1066,6 +1072,41 @@ public class Query extends com.yahoo.processing.Request implements Cloneable { return Collections.emptyMap(); } + public Search.Request toProtobuf(String serverId, boolean includeQueryData) { + var builder = Search.Request.newBuilder() + .setHits(hits) + .setOffset(offset) + .setTimeout((int) getTimeLeft()); + + ranking.addToProtobuf(builder, includeQueryData); + model.addToProtobuf(builder, includeQueryData); + + if(getGroupingSessionCache() || getRanking().getQueryCache()) { + // TODO verify that the session key is included whenever rank properties would have been + builder.setSessionKey(getSessionId(serverId).toString()); + } + if(properties().getBoolean(Model.ESTIMATE)) { + builder.setHits(0); + } + if(GroupingExecutor.hasGroupingList(this)) { + List groupingList = GroupingExecutor.getGroupingList(this); + BufferSerializer gbuf = new BufferSerializer(new GrowableByteBuffer()); + gbuf.putInt(null, groupingList.size()); + for (Grouping g: groupingList){ + g.serialize(gbuf); + } + gbuf.getBuf().flip(); + builder.setGroupingBlob(ByteString.copyFrom(gbuf.getBuf().getByteBuffer())); + } + + presentation.addToProtobuf(builder, includeQueryData); + if(getGroupingSessionCache()) { + builder.setCacheGrouping(true); + } + + return builder.build(); + } + private Map createModelMap() { Map m = new HashMap<>(); if (model.getSearchPath() != null) m.put("searchpath", model.getSearchPath()); diff --git a/container-search/src/main/java/com/yahoo/search/Result.java b/container-search/src/main/java/com/yahoo/search/Result.java index 364e60e6263..e130ec6d6d5 100644 --- a/container-search/src/main/java/com/yahoo/search/Result.java +++ b/container-search/src/main/java/com/yahoo/search/Result.java @@ -1,13 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; +import com.google.protobuf.ByteString; import com.yahoo.collections.ListMap; +import com.yahoo.document.GlobalId; +import com.yahoo.io.GrowableByteBuffer; import com.yahoo.net.URI; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.GroupingListHit; import com.yahoo.protect.Validator; import com.yahoo.search.query.context.QueryContext; -import com.yahoo.search.result.*; +import com.yahoo.search.result.Coverage; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.search.result.HitGroup; +import com.yahoo.search.result.HitOrderer; +import com.yahoo.search.result.HitSortOrderer; +import com.yahoo.search.result.Relevance; import com.yahoo.search.statistics.ElapsedTime; +import com.yahoo.searchlib.aggregation.Grouping; +import com.yahoo.vespa.objects.BufferSerializer; +import java.util.ArrayList; import java.util.Iterator; /** @@ -326,4 +342,65 @@ public final class Result extends com.yahoo.processing.Response implements Clone return headers; } + public static Result fromProtobuf(Query query, Search.Reply protobuf, DocumentDatabase documentDatabase) { + var result = new Result(query); + + result.setTotalHitCount(protobuf.getTotalHitCount()); + result.setCoverage(Coverage.fromProtobuf(protobuf)); + + if(protobuf.getGroupingBlob() != null && !protobuf.getGroupingBlob().isEmpty()) { + ArrayList list = new ArrayList<>(); + BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(protobuf.getGroupingBlob().asReadOnlyByteBuffer())); + int cnt = buf.getInt(null); + for (int i = 0; i < cnt; i++) { + Grouping g = new Grouping(); + g.deserialize(buf); + list.add(g); + } + GroupingListHit hit = new GroupingListHit(list, documentDatabase.getDocsumDefinitionSet()); + hit.setQuery(query); + result.hits().add(hit); + } + + for (var replyHit : protobuf.getHitsList()) { + FastHit hit = new FastHit(); + hit.setQuery(query); + + hit.setRelevance(new Relevance(replyHit.getRelevance())); + hit.setGlobalId(new GlobalId(replyHit.getGlobalId().toByteArray())); + + hit.setFillable(); + hit.setCached(false); + + result.hits().add(hit); + } + + return result; + } + + public Search.Reply toProtobuf() { + var builder = Search.Reply.newBuilder(); + + var coverage = getCoverage(false); + if(coverage != null) { + builder.setCoverageDocs(coverage.getDocs()) + .setActiveDocs(coverage.getActive()) + .setSoonActiveDocs(coverage.getSoonActive()) + .setDegradedBySoftTimeout(coverage.isDegradedByTimeout()) + .setDegradedByMatchPhase(coverage.isDegradedByMatchPhase()); + } + + hits.iterator().forEachRemaining(hit -> { + var hitBuilder = Search.Hit.newBuilder(); + if (hit.getRelevance() != null) { + hitBuilder.setRelevance(hit.getRelevance().getScore()); + } + if (hit instanceof FastHit) { + FastHit fhit = (FastHit) hit; + hitBuilder.setGlobalId(ByteString.copyFrom(fhit.getGlobalId().getRawId())); + } + builder.addHits(hitBuilder); + }); + return builder.build(); + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java index 431b36c2623..be625178d2f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java @@ -18,6 +18,10 @@ interface Client { int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds); + void search(NodeConnection node, CompressionType compression, + int uncompressedLength, byte[] compressedPayload, RpcSearchInvoker responseReceiver, + double timeoutSeconds); + /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); @@ -87,4 +91,54 @@ interface Client { } + class SearchResponseOrError { + // One of these will be non empty and the other not + private Optional response; + private Optional error; + + public static SearchResponseOrError fromResponse(SearchResponse response) { + return new SearchResponseOrError(Optional.of(response), Optional.empty()); + } + + public static SearchResponseOrError fromError(String error) { + return new SearchResponseOrError(Optional.empty(), Optional.of(error)); + } + + private SearchResponseOrError(Optional response, Optional error) { + this.response = response; + this.error = error; + } + + /** Returns the response, or empty if there is an error */ + public Optional response() { return response; } + + /** Returns the error or empty if there is a response */ + public Optional error() { return error; } + + } + + class SearchResponse { + private final byte compression; + private final int uncompressedSize; + private final byte[] compressedPayload; + + public SearchResponse(byte compression, int uncompressedSize, byte[] compressedPayload) { + this.compression = compression; + this.uncompressedSize = uncompressedSize; + this.compressedPayload = compressedPayload; + } + + public byte compression() { + return compression; + } + + public int uncompressedSize() { + return uncompressedSize; + } + + public byte[] compressedPayload() { + return compressedPayload; + } + } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 146b132be22..a4c66671d43 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -3,7 +3,6 @@ package com.yahoo.search.dispatch; import com.yahoo.component.AbstractComponent; import com.yahoo.container.handler.VipStatus; -import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.prelude.fastsearch.FS4InvokerFactory; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; @@ -42,25 +41,38 @@ public class Dispatcher extends AbstractComponent { /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); + /** If enabled, search queries will use protobuf rpc */ + private static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf"); + /** A model of the search cluster this dispatches to */ private final SearchCluster searchCluster; private final LoadBalancer loadBalancer; - private final RpcResourcePool rpcResourcePool; private final boolean multilevelDispatch; private final boolean internalDispatchByDefault; + private final FS4InvokerFactory fs4InvokerFactory; + private final RpcInvokerFactory rpcInvokerFactory; + public Dispatcher(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { - this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig); + this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig, + fs4ResourcePool, new RpcResourcePool(dispatchConfig)); + } + + public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, RpcResourcePool rpcResourcePool) { + this(searchCluster, dispatchConfig, new FS4InvokerFactory(fs4ResourcePool, searchCluster), + new RpcInvokerFactory(rpcResourcePool, searchCluster)); } - public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig) { + public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4InvokerFactory fs4InvokerFactory, RpcInvokerFactory rpcInvokerFactory) { this.searchCluster = searchCluster; this.loadBalancer = new LoadBalancer(searchCluster, dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); - this.rpcResourcePool = new RpcResourcePool(dispatchConfig); this.multilevelDispatch = dispatchConfig.useMultilevelDispatch(); this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault(); + + this.fs4InvokerFactory = fs4InvokerFactory; + this.rpcInvokerFactory = rpcInvokerFactory; } /** Returns the search cluster this dispatches to */ @@ -70,17 +82,16 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { - rpcResourcePool.release(); + rpcInvokerFactory.release(); } - public Optional getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb, - FS4InvokerFactory fs4InvokerFactory) { - Optional rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb); + public Optional getFillInvoker(Result result, VespaBackEndSearcher searcher) { + Optional rpcInvoker = rpcInvokerFactory.createFillInvoker(searcher, result); if (rpcInvoker.isPresent()) { return rpcInvoker; } if (result.getQuery().properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { - Optional fs4Invoker = fs4InvokerFactory.getFillInvoker(result); + Optional fs4Invoker = fs4InvokerFactory.createFillInvoker(searcher, result); if (fs4Invoker.isPresent()) { return fs4Invoker; } @@ -88,15 +99,17 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } - public Optional getSearchInvoker(Query query, FS4InvokerFactory fs4InvokerFactory) { + public Optional getSearchInvoker(Query query, VespaBackEndSearcher searcher) { if (multilevelDispatch || ! query.properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { return Optional.empty(); } - Optional invoker = getSearchPathInvoker(query, fs4InvokerFactory::getSearchInvoker); + InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, false) ? rpcInvokerFactory : fs4InvokerFactory; + + Optional invoker = getSearchPathInvoker(query, factory, searcher); if (!invoker.isPresent()) { - invoker = getInternalInvoker(query, fs4InvokerFactory::getSearchInvoker); + invoker = getInternalInvoker(query, factory, searcher); } if (invoker.isPresent() && query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { query.setHits(0); @@ -105,13 +118,12 @@ public class Dispatcher extends AbstractComponent { return invoker; } - @FunctionalInterface - private interface SearchInvokerSupplier { - Optional supply(Query query, OptionalInt groupId, List nodes, boolean acceptIncompleteCoverage); + public FS4InvokerFactory getFS4InvokerFactory() { + return fs4InvokerFactory; } // build invoker based on searchpath - private Optional getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) { + private Optional getSearchPathInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) { String searchPath = query.getModel().getSearchPath(); if(searchPath == null) { return Optional.empty(); @@ -122,19 +134,19 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } else { query.trace(false, 2, "Dispatching internally with search path ", searchPath); - return invokerFactory.supply(query, OptionalInt.empty(), nodes, true); + return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), nodes, true); } } catch (InvalidSearchPathException e) { return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage()))); } } - private Optional getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) { + private Optional getInternalInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) { Optional directNode = searchCluster.directDispatchTarget(); if (directNode.isPresent()) { Node node = directNode.get(); query.trace(false, 2, "Dispatching directly to ", node); - return invokerFactory.supply(query, OptionalInt.empty(), Arrays.asList(node), true); + return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), Arrays.asList(node), true); } int covered = searchCluster.groupsWithSufficientCoverage(); @@ -149,7 +161,7 @@ public class Dispatcher extends AbstractComponent { } Group group = groupInCluster.get(); boolean acceptIncompleteCoverage = (i == max - 1); - Optional invoker = invokerFactory.supply(query, OptionalInt.of(group.id()), group.nodes(), + Optional invoker = invokerFactory.createSearchInvoker(searcher, query, OptionalInt.of(group.id()), group.nodes(), acceptIncompleteCoverage); if (invoker.isPresent()) { query.trace(false, 2, "Dispatching internally to search group ", group.id()); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java new file mode 100644 index 00000000000..ca471fb2baa --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -0,0 +1,41 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.result.Coverage; +import com.yahoo.search.result.ErrorMessage; + +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; + +/** + * @author ollivir + */ +public abstract class InvokerFactory { + public abstract Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, + List nodes, boolean acceptIncompleteCoverage); + + public abstract Optional createFillInvoker(VespaBackEndSearcher searcher, Result result); + + protected static SearchInvoker createCoverageErrorInvoker(List nodes, Set failed) { + StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: "); + int count = 0; + for (Node node : nodes) { + if (failed.contains(node.key())) { + if (count > 0) { + down.append(", "); + } + count++; + down.append(node.key()); + } + } + Coverage coverage = new Coverage(0, 0, 0); + coverage.setNodesTried(count); + return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java index 2a4767bc389..d6fe7ab631b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java @@ -40,7 +40,19 @@ class RpcClient implements Client { request.setContext(hits); RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcResponseWaiter(rpcNode, responseReceiver)); + rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver)); + } + + @Override + public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + RpcSearchInvoker responseReceiver, double timeoutSeconds) { + Request request = new Request("proton.search"); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedPayload)); + + RpcNodeConnection rpcNode = ((RpcNodeConnection) node); + rpcNode.invokeAsync(request, timeoutSeconds, new RpcSearchResponseWaiter(rpcNode, responseReceiver)); } private static class RpcNodeConnection implements NodeConnection { @@ -83,7 +95,7 @@ class RpcClient implements Client { } - private static class RpcResponseWaiter implements RequestWaiter { + private static class RpcDocsumResponseWaiter implements RequestWaiter { /** The node to which we made the request we are waiting for - for error messages only */ private final RpcNodeConnection node; @@ -91,7 +103,7 @@ class RpcClient implements Client { /** The handler to which the response is forwarded */ private final RpcFillInvoker.GetDocsumsResponseReceiver handler; - public RpcResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { + public RpcDocsumResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { this.node = node; this.handler = handler; } @@ -124,4 +136,39 @@ class RpcClient implements Client { } + private static class RpcSearchResponseWaiter implements RequestWaiter { + + /** The node to which we made the request we are waiting for - for error messages only */ + private final RpcNodeConnection node; + + /** The handler to which the response is forwarded */ + private final RpcSearchInvoker handler; + + public RpcSearchResponseWaiter(RpcNodeConnection node, RpcSearchInvoker handler) { + this.node = node; + this.handler = handler; + } + + @Override + public void handleRequestDone(Request requestWithResponse) { + if (requestWithResponse.isError()) { + handler.receive(SearchResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage())); + return; + } + + Values returnValues = requestWithResponse.returnValues(); + if (returnValues.size() < 3) { + handler.receive(SearchResponseOrError.fromError( + "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size())); + return; + } + + byte compression = returnValues.get(0).asInt8(); + int uncompressedSize = returnValues.get(1).asInt32(); + byte[] compressedPayload = returnValues.get(2).asData(); + handler.receive(SearchResponseOrError.fromResponse(new SearchResponse(compression, uncompressedSize, compressedPayload))); + } + + } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcInvokerFactory.java new file mode 100644 index 00000000000..62b4ec2c57b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcInvokerFactory.java @@ -0,0 +1,94 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; + +/** + * @author ollivir + */ +public class RpcInvokerFactory extends InvokerFactory { + /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ + private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); + + private final RpcResourcePool rpcResourcePool; + private final SearchCluster searchCluster; + + public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) { + this.rpcResourcePool = rpcResourcePool; + this.searchCluster = searchCluster; + } + + @Override + public Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, List nodes, + boolean acceptIncompleteCoverage) { + List invokers = new ArrayList<>(nodes.size()); + Set failed = null; + for (Node node : nodes) { + if (node.isWorking()) { + invokers.add(new RpcSearchInvoker(searcher, node, rpcResourcePool)); + } else { + if (failed == null) { + failed = new HashSet<>(); + } + failed.add(node.key()); + } + } + + if (failed != null) { + List success = new ArrayList<>(nodes.size() - failed.size()); + for (Node node : nodes) { + if (!failed.contains(node.key())) { + success.add(node); + } + } + if (!searchCluster.isPartialGroupCoverageSufficient(groupId, success)) { + if (acceptIncompleteCoverage) { + invokers.add(createCoverageErrorInvoker(nodes, failed)); + } else { + return Optional.empty(); + } + } + } + + if (invokers.size() == 1) { + return Optional.of(invokers.get(0)); + } else { + return Optional.of(new InterleavedSearchInvoker(invokers, searcher, searchCluster)); + } + } + + @Override + public Optional createFillInvoker(VespaBackEndSearcher searcher, Result result) { + Query query = result.getQuery(); + if (query.properties().getBoolean(dispatchSummaries, true) + && ! searcher.summaryNeedsQuery(query) + && query.getRanking().getLocation() == null) + { + return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))); + } else { + return Optional.empty(); + } + } + + // for testing + public FillInvoker createFillInvoker(DocumentDatabase documentDb) { + return new RpcFillInvoker(rpcResourcePool, documentDb); + } + + public void release() { + rpcResourcePool.release(); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java index 29641080ba6..e03e0875106 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java @@ -3,14 +3,10 @@ package com.yahoo.search.dispatch; import com.google.common.collect.ImmutableMap; import com.yahoo.compress.Compressor; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.Query; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Map; -import java.util.Optional; /** * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains @@ -22,9 +18,6 @@ public class RpcResourcePool { /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); - /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ - private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); - private final Compressor compressor = new Compressor(); private final Client client; @@ -47,22 +40,6 @@ public class RpcResourcePool { this.nodeConnections = nodeConnectionsBuilder.build(); } - public Optional getFillInvoker(Query query, VespaBackEndSearcher searcher, DocumentDatabase documentDb) { - if (query.properties().getBoolean(dispatchSummaries, true) - && ! searcher.summaryNeedsQuery(query) - && query.getRanking().getLocation() == null) - { - return Optional.of(new RpcFillInvoker(this, documentDb)); - } else { - return Optional.empty(); - } - } - - // for testing - public FillInvoker getFillInvoker(DocumentDatabase documentDb) { - return new RpcFillInvoker(this, documentDb); - } - public Compressor compressor() { return compressor; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcSearchInvoker.java new file mode 100644 index 00000000000..4ceac71ef63 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcSearchInvoker.java @@ -0,0 +1,119 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import ai.vespa.searchlib.searchprotocol.protobuf.Search; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.Client.SearchResponse; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.searchchain.Execution; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * {@link SearchInvoker} implementation using RPC + * + * @author ollivir + */ +public class RpcSearchInvoker extends SearchInvoker { + private final VespaBackEndSearcher searcher; + private final Node node; + private final RpcResourcePool resourcePool; + private final BlockingQueue responses; + + private Query query; + + RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) { + super(Optional.of(node)); + this.searcher = searcher; + this.node = node; + this.resourcePool = resourcePool; + this.responses = new LinkedBlockingQueue<>(1); + } + + @Override + protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { + this.query = query; + + CompressionType compression = CompressionType + .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); + + Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); + if (nodeConnection == null) { + responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key())); + responseAvailable(); + return; + } + + var payload = query.toProtobuf(searcher.getServerId(), true).toByteArray(); + double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; + Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); + resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, + timeoutSeconds); + } + + @Override + protected Result getSearchResult(Execution execution) throws IOException { + long timeLeftMs = query.getTimeLeft(); + if (timeLeftMs <= 0) { + return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); + } + Client.SearchResponseOrError response = null; + try { + response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // handled as timeout + } + if (response == null) { + return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); + } + if (response.error().isPresent()) { + return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get())); + } + if (response.response().isEmpty()) { + return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available")); + } + + SearchResponse searchResponse = response.response().get(); + CompressionType compression = CompressionType.valueOf(searchResponse.compression()); + byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression, + searchResponse.uncompressedSize()); + var protobuf = Search.Reply.parseFrom(payload); + Result result = Result.fromProtobuf(query, protobuf, searcher.getDocumentDatabase(query)); + result.hits().unorderedIterator().forEachRemaining(hit -> { + if(hit instanceof FastHit) { + FastHit fhit = (FastHit) hit; + fhit.setPartId(node.pathIndex()); + fhit.setDistributionKey(node.key()); + } + hit.setSource(getName()); + }); + + return result; + } + + @Override + protected void release() { + // nothing to release + } + + public void receive(Client.SearchResponseOrError response) { + responses.add(response); + responseAvailable(); + } + + private String getName() { + return searcher.getName(); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java index 0d7ef53bb50..1650494db3a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java @@ -5,6 +5,8 @@ import com.yahoo.fs4.QueryPacket; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.result.Coverage; +import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; import java.io.IOException; @@ -53,4 +55,13 @@ public abstract class SearchInvoker extends CloseableInvoker { protected Optional distributionKey() { return node.map(Node::key); } + + protected Result errorResult(Query query, ErrorMessage errorMessage) { + Result error = new Result(query, errorMessage); + Coverage errorCoverage = new Coverage(0, 0, 0); + errorCoverage.setNodesTried(1); + error.setCoverage(errorCoverage); + return error; + } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index cb86f19e761..bafc72b9b43 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -25,6 +25,12 @@ public class Group { public Group(int id, List nodes) { this.id = id; this.nodes = ImmutableList.copyOf(nodes); + + int idx = 0; + for(var node: nodes) { + node.setPathIndex(idx); + idx++; + } } /** Returns the unique identity of this group */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index 98deb9e3199..7e0e3117628 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicLong; public class Node { private final int key; + private int pathIndex; private final String hostname; private final int fs4port; final int group; @@ -31,6 +32,12 @@ public class Node { /** Returns the unique and stable distribution key of this node */ public int key() { return key; } + public int pathIndex() { return pathIndex; } + + void setPathIndex(int index) { + pathIndex = index; + } + public String hostname() { return hostname; } public int fs4port() { return fs4port; } diff --git a/container-search/src/main/java/com/yahoo/search/query/Model.java b/container-search/src/main/java/com/yahoo/search/query/Model.java index f01951047d0..4e05b44b087 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Model.java +++ b/container-search/src/main/java/com/yahoo/search/query/Model.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.query; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; +import com.google.protobuf.ByteString; import com.yahoo.language.Language; import com.yahoo.language.Linguistics; import com.yahoo.language.LocaleFactory; @@ -17,6 +19,7 @@ import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.QueryProfileType; import com.yahoo.search.searchchain.Execution; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -34,6 +37,7 @@ import static com.yahoo.text.Lowercase.toLowerCase; * @author bratseth */ public class Model implements Cloneable { + private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024; /** The type representing the property arguments consumed by this */ private static final QueryProfileType argumentType; @@ -101,9 +105,9 @@ public class Model implements Cloneable { /** * Gets the language to use for parsing. If this is explicitly set in the model, that language is returned. - * Otherwise, if a query tree is already produced and any node in it specifies a language the first such - * node encountered in a depth first - * left to right search is returned. Otherwise the language is guessed from the query string. + * Otherwise, if a query tree is already produced and any node in it specifies a language the first such + * node encountered in a depth first + * left to right search is returned. Otherwise the language is guessed from the query string. * If this does not yield an actual language, English is returned as the default. * * @return the language determined, never null @@ -121,7 +125,7 @@ public class Model implements Cloneable { if (queryTree != null) language = languageBelow(queryTree); if (language != Language.UNKNOWN) return language; - + Linguistics linguistics = execution.context().getLinguistics(); if (linguistics != null) language = linguistics.getDetector().detect(languageDetectionText, null).getLanguage(); // TODO: Set language if detected @@ -129,7 +133,7 @@ public class Model implements Cloneable { return Language.ENGLISH; } - + private Language languageBelow(Item item) { if (item.getLanguage() != Language.UNKNOWN) return item.getLanguage(); if (item instanceof CompositeItem) { @@ -509,4 +513,23 @@ public class Model implements Cloneable { return false; } + public void addToProtobuf(Search.Request.Builder builder, boolean encodeQueryData) { + if (documentDbName != null) { + builder.setDocumentType(documentDbName); + } + int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE; + boolean success = false; + while(!success) { + try { + ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize); + getQueryTree().encode(treeBuffer); + treeBuffer.flip(); + builder.setQueryTreeBlob(ByteString.copyFrom(treeBuffer)); + success = true; + } catch(java.nio.BufferOverflowException e) { + bufferSize *= 2; + } + } + } + } diff --git a/container-search/src/main/java/com/yahoo/search/query/Presentation.java b/container-search/src/main/java/com/yahoo/search/query/Presentation.java index 6b10fd0847c..5359569f5f3 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Presentation.java +++ b/container-search/src/main/java/com/yahoo/search/query/Presentation.java @@ -1,11 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.query; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; import com.google.common.base.Splitter; import com.yahoo.collections.LazySet; import com.yahoo.component.ComponentSpecification; -import com.yahoo.processing.request.CompoundName; -import com.yahoo.prelude.query.*; +import com.yahoo.prelude.query.Highlight; +import com.yahoo.prelude.query.IndexedItem; import com.yahoo.search.Query; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.QueryProfileType; @@ -196,5 +197,11 @@ public class Presentation implements Cloneable { } + public void addToProtobuf(Search.Request.Builder builder, boolean encodeQueryData) { + if(highlight != null) { + highlight.addToProtobuf(builder, encodeQueryData); + } + } + } diff --git a/container-search/src/main/java/com/yahoo/search/query/Ranking.java b/container-search/src/main/java/com/yahoo/search/query/Ranking.java index 903eedfe870..0d3aa01a85b 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Ranking.java +++ b/container-search/src/main/java/com/yahoo/search/query/Ranking.java @@ -1,9 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.query; -import com.yahoo.processing.request.CompoundName; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; import com.yahoo.prelude.Freshness; import com.yahoo.prelude.Location; +import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.QueryProfileFieldType; @@ -268,4 +269,19 @@ public class Ranking implements Cloneable { } } + public void addToProtobuf(Search.Request.Builder builder, boolean includeQueryData) { + builder.setRankProfile(getProfile()); + if (queryCache) { + builder.setCacheQuery(true); + } + if(sorting != null) { + sorting.addToProtobuf(builder, includeQueryData); + } + if(location != null) { + builder.setGeoLocation(location.toString()); + } + rankFeatures.addToProtobuf(builder, includeQueryData); + rankProperties.addToProtobuf(builder, includeQueryData); + } + } diff --git a/container-search/src/main/java/com/yahoo/search/query/Sorting.java b/container-search/src/main/java/com/yahoo/search/query/Sorting.java index b9d157de531..3d575084214 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Sorting.java +++ b/container-search/src/main/java/com/yahoo/search/query/Sorting.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.query; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; import com.ibm.icu.text.Collator; import com.ibm.icu.util.ULocale; import com.yahoo.text.Utf8; @@ -404,4 +405,14 @@ public class Sorting implements Cloneable { return usedBytes; } + public void addToProtobuf(Search.Request.Builder builder, boolean includeQueryData) { + for (var field : fieldOrders) { + var sorting = Search.SortField.newBuilder() + .setField(field.getSorter().getName()) + .setAscending(field.getSortOrder() == Order.ASCENDING) + .build(); + builder.addSorting(sorting); + } + } + } diff --git a/container-search/src/main/java/com/yahoo/search/query/ranking/RankFeatures.java b/container-search/src/main/java/com/yahoo/search/query/ranking/RankFeatures.java index 9786eba163a..9b0fd45c357 100644 --- a/container-search/src/main/java/com/yahoo/search/query/ranking/RankFeatures.java +++ b/container-search/src/main/java/com/yahoo/search/query/ranking/RankFeatures.java @@ -1,7 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.query.ranking; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; import com.yahoo.fs4.MapEncoder; +import com.yahoo.searchlib.protobuf.MapConverter; import com.yahoo.tensor.Tensor; import com.yahoo.text.JSON; @@ -127,4 +129,11 @@ public class RankFeatures implements Cloneable { return JSON.encode(features); } + public void addToProtobuf(Search.Request.Builder builder, boolean includeQueryData) { + if (includeQueryData) { + MapConverter.convertMapStrings(features, builder::addFeatureOverrides); + MapConverter.convertMapTensors(features, builder::addTensorFeatureOverrides); + } + } + } diff --git a/container-search/src/main/java/com/yahoo/search/query/ranking/RankProperties.java b/container-search/src/main/java/com/yahoo/search/query/ranking/RankProperties.java index 4158b0e7476..dc544386dc4 100644 --- a/container-search/src/main/java/com/yahoo/search/query/ranking/RankProperties.java +++ b/container-search/src/main/java/com/yahoo/search/query/ranking/RankProperties.java @@ -1,8 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.query.ranking; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; import com.yahoo.fs4.GetDocSumsPacket; import com.yahoo.fs4.MapEncoder; +import com.yahoo.searchlib.protobuf.MapConverter; import com.yahoo.text.JSON; import java.nio.ByteBuffer; @@ -111,4 +113,14 @@ public class RankProperties implements Cloneable { return JSON.encode(properties); } + public void addToProtobuf(Search.Request.Builder builder, boolean includeQueryData) { + if (includeQueryData) { + MapConverter.convertMultiMap(properties, propB -> { + if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) { + builder.addRankProperties(propB); + } + }, builder::addTensorRankProperties); + } + } + } diff --git a/container-search/src/main/java/com/yahoo/search/result/Coverage.java b/container-search/src/main/java/com/yahoo/search/result/Coverage.java index 25829b70b5e..30ea8f53fc0 100644 --- a/container-search/src/main/java/com/yahoo/search/result/Coverage.java +++ b/container-search/src/main/java/com/yahoo/search/result/Coverage.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.result; +import ai.vespa.searchlib.searchprotocol.protobuf.Search; import com.google.common.annotations.Beta; /** @@ -41,4 +42,19 @@ public class Coverage extends com.yahoo.container.handler.Coverage { public Coverage setDegradedReason(int degradedReason) { this.degradedReason = degradedReason; return this; } public Coverage setNodesTried(int nodesTried) { super.setNodesTried(nodesTried); return this; } + + public static Coverage fromProtobuf(Search.Reply protobuf) { + var coverage = new Coverage(protobuf.getCoverageDocs(), protobuf.getActiveDocs(), 1); + coverage.setNodesTried(1) + .setSoonActive(protobuf.getSoonActiveDocs()); + + int degradedReason = 0; + if (protobuf.getDegradedByMatchPhase()) + degradedReason |= Coverage.DEGRADED_BY_MATCH_PHASE; + if (protobuf.getDegradedBySoftTimeout()) + degradedReason |= Coverage.DEGRADED_BY_TIMEOUT; + coverage.setDegradedReason(degradedReason); + + return coverage; + } } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 708caafa3f5..0dcbc917a69 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -2,6 +2,7 @@ package com.yahoo.search.dispatch; import com.yahoo.prelude.fastsearch.FS4InvokerFactory; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.dispatch.searchcluster.Node; @@ -36,7 +37,7 @@ public class DispatcherTest { builder.useMultilevelDispatch(true); DispatchConfig dc = new DispatchConfig(builder); - Dispatcher disp = new Dispatcher(cl, dc); + Dispatcher disp = new Dispatcher(cl, dc, new MockFS4InvokerFactory(cl), new MockRpcInvokerFactory()); assertThat(disp.getSearchInvoker(query(), null).isPresent(), is(false)); } @@ -45,13 +46,13 @@ public class DispatcherTest { SearchCluster cl = new MockSearchCluster("1", 2, 2); Query q = query(); q.getModel().setSearchPath("1/0"); // second node in first group - Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (nodes, a) -> { assertThat(nodes.size(), is(1)); assertThat(nodes.get(0).key(), is(2)); return true; }); - Optional invoker = disp.getSearchInvoker(q, invokerFactory); + Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockRpcInvokerFactory()); + Optional invoker = disp.getSearchInvoker(q, null); assertThat(invoker.isPresent(), is(true)); invokerFactory.verifyAllEventsProcessed(); } @@ -64,9 +65,9 @@ public class DispatcherTest { return Optional.of(new Node(1, "test", 123, 1)); } }; - Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (n, a) -> true); - Optional invoker = disp.getSearchInvoker(query(), invokerFactory); + Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockRpcInvokerFactory()); + Optional invoker = disp.getSearchInvoker(query(), null); assertThat(invoker.isPresent(), is(true)); invokerFactory.verifyAllEventsProcessed(); } @@ -75,7 +76,6 @@ public class DispatcherTest { public void requireThatInvokerConstructionIsRetriedAndLastAcceptsAnyCoverage() { SearchCluster cl = new MockSearchCluster("1", 2, 1); - Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (n, acceptIncompleteCoverage) -> { assertThat(acceptIncompleteCoverage, is(false)); return false; @@ -83,7 +83,8 @@ public class DispatcherTest { assertThat(acceptIncompleteCoverage, is(true)); return true; }); - Optional invoker = disp.getSearchInvoker(query(), invokerFactory); + Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockRpcInvokerFactory()); + Optional invoker = disp.getSearchInvoker(query(), null); assertThat(invoker.isPresent(), is(true)); invokerFactory.verifyAllEventsProcessed(); } @@ -92,9 +93,9 @@ public class DispatcherTest { public void requireThatInvokerConstructionDoesNotRepeatGroups() { SearchCluster cl = new MockSearchCluster("1", 2, 1); - Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (n, a) -> false, (n, a) -> false); - Optional invoker = disp.getSearchInvoker(query(), invokerFactory); + Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, null); + Optional invoker = disp.getSearchInvoker(query(), null); assertThat(invoker.isPresent(), is(false)); invokerFactory.verifyAllEventsProcessed(); } @@ -108,12 +109,13 @@ public class DispatcherTest { private int step = 0; public MockFS4InvokerFactory(SearchCluster cl, FactoryStep... events) { - super(null, cl, null); + super(null, cl); this.events = events; } @Override - public Optional getSearchInvoker(Query query, OptionalInt groupId, List nodes, boolean acceptIncompleteCoverage) { + public Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, + List nodes, boolean acceptIncompleteCoverage) { if (step >= events.length) { throw new RuntimeException("Was not expecting more calls to getSearchInvoker"); } @@ -130,4 +132,14 @@ public class DispatcherTest { assertThat(step, is(events.length)); } } + + public class MockRpcInvokerFactory extends RpcInvokerFactory { + public MockRpcInvokerFactory() { + super(null, null); + } + + @Override + public void release() { + } + } } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java index 173b2de494f..13178e88515 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java @@ -36,6 +36,7 @@ public class FillTestCase { nodes.put(1, client.createConnection("host1", 123)); nodes.put(2, client.createConnection("host2", 123)); RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); Query query = new Query(); Result result = new Result(query); @@ -51,7 +52,7 @@ public class FillTestCase { client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); client.setDocsumReponse("host0", 4, "summaryClass1", map("field1", "s.0.4", "field2", 4)); - rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + factory.createFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); @@ -72,6 +73,7 @@ public class FillTestCase { nodes.put(1, client.createConnection("host1", 123)); nodes.put(2, client.createConnection("host2", 123)); RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); Query query = new Query(); Result result = new Result(query); @@ -87,7 +89,7 @@ public class FillTestCase { client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); client.setDocsumReponse("host0", 4, "summaryClass1",new HashMap<>()); - rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + factory.createFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); @@ -111,12 +113,13 @@ public class FillTestCase { Map nodes = new HashMap<>(); nodes.put(0, client.createConnection("host0", 123)); RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); Query query = new Query(); Result result = new Result(query); result.hits().add(createHit(0, 0)); - rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + factory.createFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("Malfunctioning", result.hits().getError().getDetailedMessage()); } @@ -128,6 +131,7 @@ public class FillTestCase { Map nodes = new HashMap<>(); nodes.put(0, client.createConnection("host0", 123)); RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); Query query = new Query(); Result result = new Result(query); @@ -135,7 +139,7 @@ public class FillTestCase { result.hits().add(createHit(1, 1)); - rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + factory.createFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("Could not fill hits from unknown node 1", result.hits().getError().getDetailedMessage()); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java index a4cb8ae641c..b35efba5847 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java @@ -6,6 +6,7 @@ import com.yahoo.compress.Compressor; import com.yahoo.document.GlobalId; import com.yahoo.document.idstring.IdIdString; import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.search.Result; import com.yahoo.slime.ArrayTraverser; import com.yahoo.slime.BinaryFormat; import com.yahoo.slime.Cursor; @@ -25,6 +26,7 @@ public class MockClient implements Client { private final Map> docsums = new HashMap<>(); private final Compressor compressor = new Compressor(); private boolean malfunctioning = false; + private Result searchResult; /** Set to true to cause this to produce an error instead of a regular response */ public void setMalfunctioning(boolean malfunctioning) { this.malfunctioning = malfunctioning; } @@ -72,6 +74,24 @@ public class MockClient implements Client { responseReceiver.receive(GetDocsumsResponseOrError.fromResponse(response)); } + @Override + public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + RpcSearchInvoker responseReceiver, double timeoutSeconds) { + if (malfunctioning) { + responseReceiver.receive(SearchResponseOrError.fromError("Malfunctioning")); + return; + } + + if(searchResult == null) { + responseReceiver.receive(SearchResponseOrError.fromError("No result defined")); + return; + } + var payload = searchResult.toProtobuf().toByteArray(); + var compressionResult = compressor.compress(compression, payload); + var response = new SearchResponse(compressionResult.type().getCode(), payload.length, compressionResult.data()); + responseReceiver.receive(SearchResponseOrError.fromResponse(response)); + } + public void setDocsumReponse(String nodeId, int docId, String docsumClass, Map docsumValues) { docsums.put(new DocsumKey(nodeId, globalIdFrom(docId), docsumClass), docsumValues); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/RpcSearchInvokerTest.java new file mode 100644 index 00000000000..6b39f8b3f07 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/dispatch/RpcSearchInvokerTest.java @@ -0,0 +1,92 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.search.dispatch; + +import ai.vespa.searchlib.searchprotocol.protobuf.Search; +import com.google.common.collect.ImmutableMap; +import com.yahoo.compress.CompressionType; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.RpcFillInvoker.GetDocsumsResponseReceiver; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.searchchain.Execution; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * @author ollivir + */ +public class RpcSearchInvokerTest { + @Test + public void testProtobufSerialization() throws IOException { + var compressionTypeHolder = new AtomicReference(); + var payloadHolder = new AtomicReference(); + var lengthHolder = new AtomicInteger(); + var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder); + var mockPool = new RpcResourcePool(mockClient, ImmutableMap.of(7, () -> {})); + @SuppressWarnings("resource") + var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 77, 1), mockPool); + + Query q = new Query("search/?query=test&hits=10&offset=3"); + invoker.sendSearchRequest(q, null); + + var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); + var request = Search.Request.newBuilder().mergeFrom(bytes).build(); + + assertThat(request.getHits(), equalTo(10)); + assertThat(request.getOffset(), equalTo(3)); + assertThat(request.getQueryTreeBlob().size(), greaterThan(0)); + } + + private Client parameterCollectorClient(AtomicReference compressionTypeHolder, AtomicReference payloadHolder, + AtomicInteger lengthHolder) { + return new Client() { + @Override + public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + RpcSearchInvoker responseReceiver, double timeoutSeconds) { + compressionTypeHolder.set(compression); + payloadHolder.set(compressedPayload); + lengthHolder.set(uncompressedLength); + } + + @Override + public void getDocsums(List hits, NodeConnection node, CompressionType compression, int uncompressedLength, + byte[] compressedSlime, GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + fail("Unexpected call"); + } + + @Override + public NodeConnection createConnection(String hostname, int port) { + fail("Unexpected call"); + return null; + } + }; + } + + private VespaBackEndSearcher mockSearcher() { + return new VespaBackEndSearcher() { + @Override + protected Result doSearch2(Query query, QueryPacket queryPacket, Execution execution) { + fail("Unexpected call"); + return null; + } + + @Override + protected void doPartialFill(Result result, String summaryClass) { + fail("Unexpected call"); + } + }; + } +} -- cgit v1.2.3