diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java | 136 |
1 files changed, 132 insertions, 4 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index 2353054103d..27c4c73dd65 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -1,21 +1,39 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; +import com.yahoo.data.access.simple.Value; +import com.yahoo.data.access.slime.SlimeAdapter; import com.yahoo.fs4.BasicPacket; import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.DocumentInfo; +import com.yahoo.fs4.FS4Properties; import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.QueryPacketData; import com.yahoo.fs4.QueryResultPacket; import com.yahoo.fs4.mplex.FS4Channel; import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.ConfigurationException; +import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.ResponseMonitor; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.query.Sorting; +import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Relevance; import com.yahoo.search.searchchain.Execution; +import com.yahoo.searchlib.aggregation.Grouping; +import com.yahoo.slime.BinaryFormat; +import com.yahoo.vespa.objects.BufferSerializer; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.logging.Logger; @@ -25,6 +43,9 @@ import java.util.logging.Logger; * @author ollivir */ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<FS4Channel> { + static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); + static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype"); + private static final Logger log = Logger.getLogger(FS4SearchInvoker.class.getName()); private final VespaBackEndSearcher searcher; @@ -48,7 +69,7 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<F log.finest("sending query packet"); this.query = query; - this.queryPacket = searcher.createQueryPacket(searcher.getServerId(), query); + createQueryPacket(searcher.getServerId(), query); try { boolean couldSend = channel.sendPacket(queryPacket); @@ -98,13 +119,120 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<F Result result = new Result(query); - searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result); - - searcher.addUnfilledHits(result, resultPacket.getDocuments(), queryPacket.getQueryPacketData(), distributionKey()); + ensureResultHitCapacity(result, resultPacket); + addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result); + addUnfilledHits(result, resultPacket.getDocuments(), queryPacket.getQueryPacketData()); return result; } + private QueryPacket createQueryPacket(String serverId, Query query) { + this.queryPacket = QueryPacket.create(serverId, query); + int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); + queryPacket.setCompressionLimit(compressionLimit); + if (compressionLimit != 0) { + queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); + } + log.fine(() -> "made QueryPacket: " + queryPacket); + + return queryPacket; + } + + private void ensureResultHitCapacity(Result result, QueryResultPacket resultPacket) { + int hitCount = resultPacket.getDocumentCount(); + if (resultPacket.getGroupData() != null) { + hitCount++; + } + result.hits().ensureCapacity(hitCount); + } + + private void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result) { + result.setTotalHitCount(resultPacket.getTotalDocumentCount()); + + addBackendTrace(query, resultPacket); + + // Grouping + if (resultPacket.getGroupData() != null) { + byte[] data = resultPacket.getGroupData(); + ArrayList<Grouping> list = new ArrayList<>(); + BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(ByteBuffer.wrap(data))); + int cnt = buf.getInt(null); + for (int i = 0; i < cnt; i++) { + Grouping g = new Grouping(); + g.deserialize(buf); + list.add(g); + } + GroupingListHit hit = new GroupingListHit(list, searcher.getDocsumDefinitionSet(query)); + hit.setQuery(result.getQuery()); + hit.setSource(getName()); + hit.setQueryPacketData(queryPacketData); + result.hits().add(hit); + } + + if (resultPacket.getCoverageFeature()) { + result.setCoverage(new Coverage(resultPacket.getCoverageDocs(), resultPacket.getActiveDocs(), resultPacket.getNodesReplied()) + .setSoonActive(resultPacket.getSoonActiveDocs()) + .setDegradedReason(resultPacket.getDegradedReason()) + .setNodesTried(resultPacket.getNodesQueried())); + } + } + + private void addBackendTrace(Query query, QueryResultPacket resultPacket) { + if (resultPacket.propsArray == null) return; + Value.ArrayValue traces = new Value.ArrayValue(); + for (FS4Properties properties : resultPacket.propsArray) { + if ( ! properties.getName().equals("trace")) continue; + for (FS4Properties.Entry entry : properties.getEntries()) { + traces.add(new SlimeAdapter(BinaryFormat.decode(entry.getValue()).get())); + } + } + query.trace(traces, query.getTraceLevel()); + } + + /** + * Creates unfilled hits from a List of DocumentInfo instances. + */ + private void addUnfilledHits(Result result, List<DocumentInfo> documents, QueryPacketData queryPacketData) { + Query myQuery = result.getQuery(); + Sorting sorting = myQuery.getRanking().getSorting(); + Optional<Integer> channelDistributionKey = distributionKey(); + + for (DocumentInfo document : documents) { + + try { + FastHit hit = new FastHit(); + hit.setQuery(myQuery); + if (queryPacketData != null) + hit.setQueryPacketData(queryPacketData); + + hit.setFillable(); + hit.setCached(false); + + extractDocumentInfo(hit, document, sorting); + channelDistributionKey.ifPresent(hit::setDistributionKey); + + result.hits().add(hit); + } catch (ConfigurationException e) { + log.log(LogLevel.WARNING, "Skipping hit", e); + } catch (Exception e) { + log.log(LogLevel.ERROR, "Skipping malformed hit", e); + } + } + } + + private void extractDocumentInfo(FastHit hit, DocumentInfo document, Sorting sorting) { + hit.setSource(getName()); + + Number rank = document.getMetric(); + + hit.setRelevance(new Relevance(rank.doubleValue())); + + hit.setDistributionKey(document.getDistributionKey()); + hit.setGlobalId(document.getGlobalId()); + hit.setPartId(document.getPartId()); + hit.setSortData(document.getSortData(), sorting); + } + @Override public void release() { if (channel != null) { |