diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-09-19 04:59:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-19 04:59:54 +0200 |
commit | c7f577949b7d95ea029716a6422dc8ff251ca932 (patch) | |
tree | 36ade4b419be773ca8e658491b8f5d581a06f105 | |
parent | f20cdeb2c1e4268cf27930d3a1f8a03201c76356 (diff) |
Revert "Revert "Revert "Revert "Revert "Balder/no more fs4 dispatching from fastsearcher""""."
80 files changed, 7099 insertions, 127 deletions
diff --git a/configdefinitions/src/vespa/dispatch.def b/configdefinitions/src/vespa/dispatch.def index 32a1d8fe2d1..9cef66f49e0 100644 --- a/configdefinitions/src/vespa/dispatch.def +++ b/configdefinitions/src/vespa/dispatch.def @@ -24,7 +24,6 @@ useFdispatchByDefault bool default=false dispatchWithProtobuf bool default=false # Is multi-level dispatch configured for this cluster -# Deprecated, will go away soon, NOOP useMultilevelDispatch bool default=false # Dispatch only to local nodes diff --git a/container-core/src/main/java/com/yahoo/container/handler/VipStatus.java b/container-core/src/main/java/com/yahoo/container/handler/VipStatus.java index b9ef1627ce7..6f827041ffb 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/VipStatus.java +++ b/container-core/src/main/java/com/yahoo/container/handler/VipStatus.java @@ -117,9 +117,7 @@ public class VipStatus { /** Returns whether this container should receive traffic at this time */ public boolean isInRotation() { - synchronized (mutex) { - return currentlyInRotation; - } + return currentlyInRotation; } } diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java new file mode 100644 index 00000000000..f87721dc503 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java @@ -0,0 +1,315 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.fastsearch.TimeoutException; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.logging.Logger; + +/** + * Superclass of fs4 packets + * + * @author bratseth + */ +public abstract class BasicPacket { + + private final Compressor compressor = new Compressor(); + + private static Logger log = Logger.getLogger(QueryResultPacket.class.getName()); + private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024); + public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags. + + private byte[] encodedBody; + + /** The length of this packet in bytes or -1 if not known */ + protected int length = -1; + + /** + * A timestamp which can be set or inspected by clients of this class + * but which is never updated by the class itself. This is mostly + * a convenience for when you need to queue packets or retain them + * in some structure where their validity is limited by a timeout + * or similar. + */ + private long timeStamp = -1; + + private int compressionLimit = 0; + + private CompressionType compressionType; + + /** + * Sets the number of bytes the package must be before activating compression. + * A value of 0 means no compression. + * + * @param limit smallest package size that triggers compression. + */ + public void setCompressionLimit(int limit) { compressionLimit = limit; } + + public void setCompressionType(String type) { + compressionType = CompressionType.valueOf(type); + } + + /** + * Fills this package from a byte buffer positioned at the first byte of the package + * + * @return this for convenience + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public BasicPacket decode(ByteBuffer buffer) { + length = buffer.getInt()+4; // Streamed packet length is the length-4 + int code = buffer.getInt(); + + decodeAndDecompressBody(buffer, code, length - 2*4); + return this; + } + + protected void decodeAndDecompressBody(ByteBuffer buffer, int code, int packetLength) { + byte compressionType = (byte)((code & ~CODE_MASK) >> 24); + boolean isCompressed = compressionType != 0; + codeDecodedHook(code & CODE_MASK); + if (isCompressed) { + int uncompressedSize = buffer.getInt(); + int compressedSize = packetLength - 4; + int offset = 0; + byte[] compressedData; + if (buffer.hasArray()) { + compressedData = buffer.array(); + offset = buffer.arrayOffset() + buffer.position(); + buffer.position(buffer.position() + compressedSize); + } else { + compressedData = new byte[compressedSize]; + buffer.get(compressedData); + } + byte[] body = compressor.decompress(CompressionType.valueOf(compressionType), compressedData, offset, + uncompressedSize, Optional.of(compressedSize)); + ByteBuffer bodyBuffer = ByteBuffer.wrap(body); + length += uncompressedSize - (compressedSize + 4); + decodeBody(bodyBuffer); + } else { + decodeBody(buffer); + } + } + + /** + * Decodes the body of this package from a byte buffer + * positioned at the first byte of the package. + * + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public void decodeBody(ByteBuffer buffer) { + throw new UnsupportedOperationException("Decoding of " + this + " is not implemented"); + } + + /** + * Called when the packet code is decoded. + * This default implementation just throws an exception if the code + * is not the code of this packet. Packets which has several possible codes + * will use this method to store the code. + */ + protected void codeDecodedHook(int code) { + if (code != getCode()) + throw new RuntimeException("Can not decode " + code + " into " + this); + } + + /** + * <p>Encodes this package onto the given buffer at the current position. + * The position of the buffer after encoding is the byte following + * the last encoded byte.</p> + * + * <p>This method will ensure that everything is written provided + * sufficient capacity regardless of the buffer limit. + * When returning, the limit is at the end of the package (qual to the + * position).</p> + * + * @return this for convenience + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public BasicPacket encode(ByteBuffer buffer) throws BufferTooSmallException { + int oldLimit = buffer.limit(); + int startPosition = buffer.position(); + + buffer.limit(buffer.capacity()); + try { + buffer.putInt(4); // Real length written later, when we know it + buffer.putInt(getCode()); + + encodeAndCompressBody(buffer, startPosition); + } + catch (java.nio.BufferOverflowException e) { + // reset buffer to expected state + buffer.position(startPosition); + buffer.limit(oldLimit); + throw new BufferTooSmallException("Destination buffer too small while encoding packet"); + } + + return this; + } + + protected void encodeAndCompressBody(ByteBuffer buffer, int startPosition) { + int startOfBody = buffer.position(); + encodeBody(buffer); + setEncodedBody(buffer, startOfBody, buffer.position() - startOfBody); + length = buffer.position() - startPosition; + + if (compressionLimit != 0 && length-4 > compressionLimit) { + byte[] compressedBody; + compressionType = CompressionType.LZ4; + LZ4Factory factory = LZ4Factory.fastestInstance(); + LZ4Compressor compressor = factory.fastCompressor(); + compressedBody = compressor.compress(encodedBody); + + log.log(LogLevel.DEBUG, "Uncompressed size: " + encodedBody.length + ", Compressed size: " + compressedBody.length); + if (compressedBody.length + 4 < encodedBody.length) { + buffer.position(startPosition); + buffer.putInt(compressedBody.length + startOfBody - startPosition + 4 - 4); // +4 for compressed size + buffer.putInt(getCompressedCode(compressionType)); + buffer.position(startOfBody); + buffer.putInt(encodedBody.length); + buffer.put(compressedBody); + buffer.limit(buffer.position()); + return; + } + } + buffer.putInt(startPosition, length - 4); // Encoded length 4 less than actual length + buffer.limit(buffer.position()); + } + + private int getCompressedCode(CompressionType compression) { + int code = compression.getCode(); + return getCode() | (code << 24); + } + + /** + * Encodes the body of this package onto the given buffer at the current position. + * The position of the buffer after encoding is the byte following + * the last encoded byte. + * + * @throws UnsupportedOperationException if not implemented in the subclass + */ + protected void encodeBody(ByteBuffer buffer) { + throw new UnsupportedOperationException("Encoding of " + this + " is not implemented"); + } + + private void setEncodedBody(ByteBuffer b, int start, int length) { + encodedBody = new byte[length]; + b.position(start); + b.get(encodedBody); + } + + public boolean isEncoded() { + return encodedBody != null; + } + + /** + * Just a place holder to make the APIs simpler. + */ + public Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException { + throw new UnsupportedOperationException("This class does not support a channel ID"); + } + + /** + * Allocate the needed buffers and encode the packet using the given + * channel ID (if pertinent). + * + * If this packet does not use a channel ID, the ID will be ignored. + */ + private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) { + while (true) { + try { + if (hasChannelId()) { + encode(buffer, channelId); + } else { + encode(buffer); + } + buffer.flip(); + break; + } + catch (BufferTooSmallException e) { + buffer = ByteBuffer.allocate(buffer.capacity()*2); + } + } + return buffer; + } + + /** + * Return buffer containing the encoded form of this package and + * remove internal reference to it. + */ + public final ByteBuffer grantEncodingBuffer(int channelId) { + return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE)); + } + + public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) { + return allocateAndEncode(channelId, buffer); + } + + /** Returns the code of this package */ + public abstract int getCode(); + + /** + * Returns the length of this body (including header (8 bytes) and body), + * or -1 if not known. + * Note that the streamed packet format length is 4 bytes less than this length, + * for unknown reasons. + * The length is always known when decodeBody is called. + */ + public int getLength() { + return length; + } + + /** + * Set the timestamp field of the packet. + * + * A timestamp which can be set or inspected by clients of this class + * but which is never updated by the class itself. This is mostly + * a convenience for when you need to queue packets or retain them + * in some structure where their validity is limited by a timeout + * or similar. + */ + public void setTimestamp (long timeStamp) { + this.timeStamp = timeStamp; + } + + /** + * Get the timestamp field of this packet. Note that this is + * <b>not</b> part of the FS4 protocol. @see #setTimestamp for + * more information + * + */ + public long getTimestamp () { + return timeStamp; + } + + public String toString() { + return "packet with code " + getCode(); + } + + /** Whether this is a packets which can encode a channel ID. */ + public boolean hasChannelId() { + return false; + } + + /** + * Throws an IOException if the packet is not of the expected type + */ + public void ensureInstanceOf(Class<? extends BasicPacket> type, String name) throws IOException { + if ((type.isAssignableFrom(getClass()))) return; + + if (this instanceof ErrorPacket) { + ErrorPacket errorPacket = (ErrorPacket) this; + if (errorPacket.getErrorCode() == 8) + throw new TimeoutException("Query timed out in " + name); + else + throw new IOException("Received error from backend in " + name + ": " + this); + } else { + throw new IOException("Received " + this + " when expecting " + type); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java b/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java new file mode 100644 index 00000000000..38f312e6c8d --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java @@ -0,0 +1,17 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- +// +// + +package com.yahoo.fs4; +/** + * Signal that the buffer used to hold a packet is too small + * + * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a> + */ +@SuppressWarnings("serial") +public class BufferTooSmallException extends Exception { + public BufferTooSmallException (String message) { + super(message); + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java b/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java new file mode 100644 index 00000000000..fe86e7273c0 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java @@ -0,0 +1,23 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- +// + +package com.yahoo.fs4; + + +/** + * Signal that a timeout occurred in the Channel2 communiction + * + * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a> + * + */ +@SuppressWarnings("serial") +public class ChannelTimeoutException extends Exception +{ + public ChannelTimeoutException (String msg) { + super(msg); + } + + public ChannelTimeoutException () { + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java b/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java index 3105b645cd0..ad679e0c53c 100644 --- a/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java @@ -1,6 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4; +import com.yahoo.document.GlobalId; + +import java.nio.ByteBuffer; + /** * An "extended query result" packet. This is the query result * packets used today, they allow more flexible sets of parameters @@ -8,10 +12,15 @@ package com.yahoo.fs4; * * @author bratseth */ -public class DocsumPacket { +public class DocsumPacket extends Packet { + + private GlobalId globalId = new GlobalId(new byte[GlobalId.LENGTH]); private byte[] data; + private DocsumPacket() { + } + /** * Constructor used by streaming search */ @@ -19,11 +28,31 @@ public class DocsumPacket { data = buffer.clone(); } + public static DocsumPacket create() { + return new DocsumPacket(); + } + + public int getCode() { return 205; } + + /** + * Fills this packet from a byte buffer positioned at the + * first byte of the packet + */ + public void decodeBody(ByteBuffer buffer) { + byte[] rawGid = new byte[GlobalId.LENGTH]; + buffer.get(rawGid); + globalId = new GlobalId(rawGid); + data=new byte[getLength()-12-GlobalId.LENGTH]; + buffer.get(data); + } + + public GlobalId getGlobalId() { return globalId; } public byte[] getData() { return data; } public String toString() { - return "docsum packet size: " + (data==null ? "(no data)" : data.length + " bytes") + " ]"; + return "docsum packet [globalId: " + globalId.toString() + + ", size: " + (data==null ? "(no data)" : data.length + " bytes") + " ]"; } } diff --git a/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java b/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java new file mode 100644 index 00000000000..8294ae5796d --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java @@ -0,0 +1,74 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import com.yahoo.document.GlobalId; + +import java.nio.ByteBuffer; + +/** + * Meta attributes on documents (not the document summaries themselves). + * Used in query results and get docusum packages + * + * @author bratseth + */ +public class DocumentInfo implements Cloneable { + + private final byte [] globalId; + private final double metric; + private final int partId; + private final int distributionKey; + private final byte[] sortData; + + DocumentInfo(ByteBuffer buffer, QueryResultPacket owner, byte[] sortData) { + globalId = new byte[GlobalId.LENGTH]; + buffer.get(globalId); + metric = decodeMetric(buffer); + partId = owner.getMldFeature() ? buffer.getInt() : 0; + distributionKey = owner.getMldFeature() ? buffer.getInt() : 0; + this.sortData = sortData; + } + + public DocumentInfo(GlobalId globalId, int metric, int partId, int distributionKey) { + this.globalId = globalId.getRawId(); + this.metric = metric; + this.partId = partId; + this.distributionKey = distributionKey; + this.sortData = null; + } + + private double decodeMetric(ByteBuffer buffer) { + return buffer.getDouble(); + } + + public GlobalId getGlobalId() { return new GlobalId(globalId); } + public byte [] getRawGlobalId() { return globalId; } + + /** Raw rank score */ + public double getMetric() { return metric; } + + /** Partition this document resides on */ + public int getPartId() { return partId; } + + /** Unique key for the node this document resides on */ + public int getDistributionKey() { return distributionKey; } + + public byte[] getSortData() { + return sortData; + } + + public String toString() { + return "document info [globalId=" + new GlobalId(globalId).toString() + ", metric=" + metric + "]"; + } + + /** + * Implements the Cloneable interface + */ + public Object clone() { + try { + return super.clone(); + } + catch (CloneNotSupportedException e) { + throw new RuntimeException("Someone inserted a nonclonable superclass"); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/EolPacket.java b/container-search/src/main/java/com/yahoo/fs4/EolPacket.java new file mode 100644 index 00000000000..1e907f67696 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/EolPacket.java @@ -0,0 +1,35 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +/** + * A EOL packet signaling end of transmission. + * This package has no body. + * + * @author bratseth + */ +public class EolPacket extends Packet { + + private EolPacket() { + } + + public static EolPacket create() { + return new EolPacket(); + } + + public int getCode() { return 200; } + + public void decodeBody(ByteBuffer buffer) { + // No body + } + + public void encodeBody(ByteBuffer buffer) { + // No body + } + + public String toString() { + return "EOL packet"; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java b/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java new file mode 100644 index 00000000000..f21663272d4 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java @@ -0,0 +1,48 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +import com.yahoo.text.Utf8; + +/** + * + * An error packet signaling that an error occurred. + * + * @author Bjørn Borud + */ +public class ErrorPacket extends Packet { + private int errorCode; + private int errmsgLen; + private String message; + + private ErrorPacket() { + } + + public static ErrorPacket create() { + return new ErrorPacket(); + } + + public int getCode() { return 203; } + + public void decodeBody(ByteBuffer buffer) { + errorCode = buffer.getInt(); + errmsgLen = buffer.getInt(); + + byte[] tmp = new byte[errmsgLen]; + buffer.get(tmp); + + message = Utf8.toString(tmp); + } + + public int getErrorCode () { return errorCode; } + + public void encodeBody(ByteBuffer buffer) { + // No body + } + + public String toString() { + return (message + " (" + errorCode + ")"); + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java b/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java new file mode 100644 index 00000000000..f5f1fca0801 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java @@ -0,0 +1,60 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import com.yahoo.text.Utf8; + +import java.nio.ByteBuffer; + +public class FS4Properties { + private String name; + + static public class Entry { + public final String key; + private final byte [] val; + public Entry(byte[] k, byte[] v) { + key = Utf8.toString(k); + val = v; + } + public final byte [] getValue() { return val; } + }; + + private Entry[] entries; + + void decode(ByteBuffer buffer) { + int nameLen = buffer.getInt(); + byte[] utf8name = new byte[nameLen]; + buffer.get(utf8name); + this.setName(Utf8.toString(utf8name)); + + int n = buffer.getInt(); + setEntries(new Entry[n]); + for (int j = 0; j < n; j++) { + int keyLen = buffer.getInt(); + byte[] key = new byte[keyLen]; + buffer.get(key); + + int valLen = buffer.getInt(); + byte[] value = new byte[valLen]; + buffer.get(value); + + getEntries()[j] = new Entry(key, value); + } + } + + public Entry[] getEntries() { + return entries; + } + + public void setEntries(Entry[] entries) { + this.entries = entries; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java b/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java index 6a808e17b5c..7353d0730a4 100644 --- a/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java @@ -1,15 +1,215 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4; +import com.yahoo.document.GlobalId; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.query.Item; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.result.Hit; +import com.yahoo.text.Utf8; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.logging.Logger; + /** * <p>A packet for requesting a list of document summaries. * This packet can be encoded only.</p> * * @author bratseth */ -public class GetDocSumsPacket { +public class GetDocSumsPacket extends Packet { /** Session id key. Yep, putting this here is ugly as hell */ public static final String sessionIdKey = "sessionId"; + private static final Logger log = Logger.getLogger(GetDocSumsPacket.class.getName()); + private final Result result; + private final Query query; + private final String summaryClass; + private QueryPacketData queryPacketData = null; + private int flags = 0; + + /** + * True if we should send the query with this docsum, false otherwise. + * Sending the query is necessary if we need to return summary features or generate a dynamic summary + */ + private final boolean sendQuery; + + private GetDocSumsPacket(Result result, String summaryClass, boolean sendQuery) { + this.result = result; + this.query = result.getQuery(); + this.summaryClass = summaryClass; + this.sendQuery = sendQuery; + } + + /** + * Creates a get docsums packet for a certain result + */ + public static GetDocSumsPacket create(Result result, String summaryClass, boolean sendQuery) { + return new GetDocSumsPacket(result, summaryClass, sendQuery); + } + + /** + * features bits, as given in searchlib/src/searchlib/common/packets.h + * definition of enum getdocsums_features + */ + public static final int GDF_MLD = 0x00000001; + public static final int GDF_QUERYSTACK = 0x00000004; + public static final int GDF_RANKP_QFLAGS = 0x00000010; + public static final int GDF_LOCATION = 0x00000080; + public static final int GDF_RESCLASSNAME = 0x00000800; + public static final int GDF_PROPERTIES = 0x00001000; + public static final int GDF_FLAGS = 0x00002000; + + public void encodeBody(ByteBuffer buffer) { + setFieldsFromHits(); + + boolean useQueryCache = query.getRanking().getQueryCache(); + // If feature cache is used we need to include the sessionId as key. + if (useQueryCache) { // TODO: Move this decision (and the key) to ranking + query.getRanking().getProperties().put(sessionIdKey, query.getSessionId().toString()); + } + + // set the default features + long features = GDF_MLD; + if (sendQuery) + features |= GDF_QUERYSTACK; + features |= GDF_RANKP_QFLAGS; + + // do we want a specific result class? + if (summaryClass != null) + features |= GDF_RESCLASSNAME; + if (query.getRanking().getLocation() != null) + features |= GDF_LOCATION; + if (query.hasEncodableProperties()) + features |= GDF_PROPERTIES; + if (flags != 0) { + features |= GDF_FLAGS; + } + buffer.putInt((int)features); + buffer.putInt(0); //Unused, was docstamp + long timeLeft = query.getTimeLeft(); + buffer.putInt(Math.max(1, (int)timeLeft)); // Safety to avoid sending down 0 or negative number + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Timeout from query(" + query.getTimeout() + "), sent to backend: " + timeLeft); + } + + if (queryPacketData != null) + encodeQueryFromPacketData(buffer, useQueryCache); + else + encodeQuery(buffer); + + if (flags != 0) + buffer.putInt(flags); + encodeDocIds(buffer); + } + + private void setFieldsFromHits() { + for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { + Hit h = i.next(); + if (h instanceof FastHit) { + FastHit hit = (FastHit)h; + QueryPacketData tag = hit.getQueryPacketData(); + if (tag != null) { + this.queryPacketData = tag; + break; + } + } + } + } + + private void encodeQueryFromPacketData(ByteBuffer buffer, boolean reencodePropertyMaps) { + queryPacketData.encodeRankProfile(buffer); + queryPacketData.encodeQueryFlags(buffer); + + encodeSummaryClass(buffer); + + if (reencodePropertyMaps || ! sendQuery) // re-encode when we're not sending query, to avoid resending all encoded properties + query.encodeAsProperties(buffer, sendQuery); + else + queryPacketData.encodePropertyMaps(buffer); + + if (sendQuery) + queryPacketData.encodeQueryStack(buffer); + queryPacketData.encodeLocation(buffer); + } + + private void encodeSummaryClass(ByteBuffer buffer) { + if (summaryClass != null) { + byte[] tmp = Utf8.toBytes(summaryClass); + buffer.putInt(tmp.length); + buffer.put(tmp); + } + } + + private void encodeQuery(ByteBuffer buffer) { + Item.putString(query.getRanking().getProfile(), buffer); + buffer.putInt(QueryPacket.getQueryFlags(query)); + + encodeSummaryClass(buffer); + + query.encodeAsProperties(buffer, sendQuery); + + if (sendQuery) { + // The stack must be resubmitted to generate dynamic docsums + int itemCountPosition = buffer.position(); + buffer.putInt(0); + int dumpLengthPosition = buffer.position(); + buffer.putInt(0); + int count = query.encode(buffer); + buffer.putInt(itemCountPosition, count); + buffer.putInt(dumpLengthPosition, buffer.position() - dumpLengthPosition - 4); + } + + if (query.getRanking().getLocation() != null) { + int locationLengthPosition = buffer.position(); + buffer.putInt(0); + int locationLength = query.getRanking().getLocation().encode(buffer); + buffer.putInt(locationLengthPosition, locationLength); + } + } + + private void encodeDocIds(ByteBuffer buffer) { + byte[] emptyGid = new byte[GlobalId.LENGTH]; + for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { + Hit hit = i.next(); + if (hit instanceof FastHit && !hit.isFilled(summaryClass)) { + FastHit fastHit = (FastHit)hit; + buffer.put(fastHit.getGlobalId() != null ? fastHit.getRawGlobalId() : emptyGid); + buffer.putInt(fastHit.getPartId()); + buffer.putInt(0); //Unused, was docstamp + } + } + } + + public int getCode() { + return 219; + } + + public String toString() { + return "Get docsums x packet fetching " + getNumDocsums() + " docsums and packet length of " + getLength() + " bytes."; + } + + public int getNumDocsums() { + int num = 0; + for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { + Hit hit = i.next(); + if (hit instanceof FastHit && !hit.isFilled(summaryClass)) { + num++; + } + } + return num; + } + + /** + * Return the document summary class we want the fdispatch + * to use when replying to us + */ + @SuppressWarnings("UnusedDeclaration") + public String getSummaryClass() { + return summaryClass; + } } diff --git a/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java b/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java new file mode 100644 index 00000000000..78ba857c475 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +/** + * Provides sequential access to each byte of a buffer + * as a hexadecimal string of length 2. + * + * @author Tony Vaagenes + */ +public final class HexByteIterator implements Iterator<String> { + private final ByteBuffer buffer; + + private String hexByte(byte b) { + final int unsignedValue = ((int)b) & 0xff; + String s = Integer.toHexString(unsignedValue).toUpperCase(); + + boolean singleChar = unsignedValue < 0x10; + if (singleChar) + return '0' + s; + else + return s; + } + + public boolean hasNext() { + return buffer.hasRemaining(); + } + + public String next() { + return hexByte(buffer.get()); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + public HexByteIterator(ByteBuffer buffer) { + this.buffer = buffer.slice(); + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/Packet.java b/container-search/src/main/java/com/yahoo/fs4/Packet.java new file mode 100644 index 00000000000..1e9deede59d --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/Packet.java @@ -0,0 +1,114 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; +import java.util.logging.Logger; + +/** + * Superclass of fs4 packets containing channel/query ID + * + * @author bratseth + */ +public abstract class Packet extends BasicPacket { + + private static Logger log = Logger.getLogger(Packet.class.getName()); + + /** + * The channel at which this packet will be sent or was received, + * or -1 when this is not known + */ + protected int channel = -1; + + /** + * Fills this package from a byte buffer positioned at the first + * byte of the package + * + * @return this Packet (as a BasicPacket) for convenience + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public BasicPacket decode(ByteBuffer buffer) { + int originalPos = buffer.position(); + length = buffer.getInt()+4; // Streamed packet length is the length-4 + int packetLength = length; + try { + int code = buffer.getInt(); + channel = buffer.getInt(); + + decodeAndDecompressBody(buffer, code, length - 3*4); + } + finally { + int targetPosition = (originalPos + packetLength); + if (buffer.position() != targetPosition) { + log.warning("Position in buffer is " + buffer.position() + " should be " + targetPosition); + buffer.position(targetPosition); + } + } + + return this; + } + + /** + * <p>Encodes this package onto the given buffer at the current + * position. The position of the buffer after encoding is the + * byte following the last encoded byte.</p> + * + * <p>This method will ensure that everything is written provided + * sufficient capacity regardless of the buffer limit. + * When returning, the limit is at the end of the package (qual to the + * position).</p> + * + * @return this for convenience + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public final Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException { + this.channel = channel; + int oldLimit = buffer.limit(); + int startPosition = buffer.position(); + + buffer.limit(buffer.capacity()); + try { + buffer.putInt(8); // Real length written later, when we know it + buffer.putInt(getCode()); + buffer.putInt(channel); + + encodeAndCompressBody(buffer, startPosition); + } + catch (java.nio.BufferOverflowException e) { + // reset buffer to expected state + buffer.position(startPosition); + buffer.limit(oldLimit); + throw new BufferTooSmallException("Destination buffer too small while encoding packet"); + } + return this; + } + + /** + * Get the channel id of the packet. In the FS4 transport protocol, + * there is the concept of a channel. This must <b>not</b> be confused + * with all the other channels we have floating around this code (aargh!). + * <P> + * The channel can be thought of as a way to pair up requests and + * responses in the FS4 protocol: A response always belongs to + * to a channel and it is the clients responsibility to not re-use + * channel ids within the same connection. + * <p> + * Summary: This "channel" means "session id" + * + * @return FS4 channel id + * + */ + public int getChannel() { return channel; } + + public void setChannel(int channel) { this.channel=channel; } + + + /** Informs that this packets needs a channel ID. */ + public boolean hasChannelId() { + return true; + } + + public String toString() { + return "packet with code " + getCode() + ", channelId=" + getChannel(); + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java b/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java new file mode 100644 index 00000000000..3e673717d02 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java @@ -0,0 +1,200 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +/** + * Returns the correct package for a package byte stream + * + * @author bratseth + * @author <a href="mailto:borud@yahoo-inc.com">Bj\u00f8rn Borud</a> + */ +public class PacketDecoder { + + /** Represents a packet and the data used to construct it */ + public static class DecodedPacket { + public BasicPacket packet; + public ByteBuffer consumedBytes; + + DecodedPacket(BasicPacket packet, ByteBuffer consumedBytes) { + this.packet = packet; + this.consumedBytes = consumedBytes; + } + } + + private PacketDecoder() {} + + /** + * Returns the package starting at the current position in the buffer + * + * @throws IllegalArgumentException if an unknown package code is + * encountered + * @throws java.nio.BufferUnderflowException if the buffer contains too little + * data to decode the pcode. + */ + public static BasicPacket decode(ByteBuffer buffer) { + int packetCode = buffer.getInt(buffer.position()+4); + packetCode &= BasicPacket.CODE_MASK; + + switch (packetCode) { + case 200: + return EolPacket.create().decode(buffer); + + case 203: + return ErrorPacket.create().decode(buffer); + + case 205: + return DocsumPacket.create().decode(buffer); + + case 217: + return QueryResultPacket.create().decode(buffer); + + case 221: + return PongPacket.create().decode(buffer); + + default: + throw new IllegalArgumentException("No support for packet " + packetCode); + } + } + + /** Gives the packet along with the bytes consumed to construct it. */ + public static DecodedPacket decodePacket(ByteBuffer buffer) { + ByteBuffer dataUsed = buffer.slice(); + int start = buffer.position(); + + BasicPacket packet = decode(buffer); + dataUsed.limit(buffer.position() - start); + return new DecodedPacket(packet, dataUsed); + } + + /** Sniff channel ID for query result packets */ + public static int sniffChannel(ByteBuffer buffer) { + int remaining = buffer.remaining(); + if (remaining < 12) { + return 0; + } + int packetCode = buffer.getInt(buffer.position()+4); + packetCode &= BasicPacket.CODE_MASK; + switch (packetCode) { + case 202: + case 208: + case 214: + case 217: + return buffer.getInt(buffer.position()+8); + default: + return 0; + } + } + + /** + * Test whether the buffer contains (the start of) a pong packet. + * + * Returns false if there is not enough data to determine the + * answer. + */ + public static boolean isPongPacket(ByteBuffer buffer) { + + int remaining = buffer.remaining(); + if (remaining < 8) + return false; + int packetCode = buffer.getInt(buffer.position()+4); + packetCode &= BasicPacket.CODE_MASK; + if (packetCode == 221) + return true; + else + return false; + } + + /** + * Note that it assumes that the position of the ByteBuffer is at the + * start of a packet and that we have enough data to actually read + * an integer out of the buffer. + * + * @return Return the length of the fs4 packet. Returns -1 if length + * could not be determined because we had too little + * data in the buffer. + * + */ + public static int packetLength(ByteBuffer buffer) + { + if (buffer.remaining() < 4) { + return -1; + } + return (buffer.getInt(buffer.position()) + 4); + } + + /** + * Takes a buffer possibly containing a packet. + * + * <P> + * If we return a packet when we return: + * <UL> + * <LI> the buffer is positioned at the beginning of the next + * packet when we return. + * <LI> limit is unchanged + * </UL> + * + * <P> + * If we return <code>null</code> there were no more packets + * there to decode and the following is true of the buffer + * <UL> + * <LI> the buffer is compacted, ie. partial packet is + * moved to the start, or if no more data is available + * the buffer is cleared. + * <LI> the position is set to the next byte after the valid + * data so the buffer is ready for reading. + * </UL> + * + * If there are no packets to be returned the buffer is compacted + * (ie. content is moved to the start and read-pointer is positioned + * + * @return Returns the next available packet from the buffer or + * <code>null</code> if there are no more <b>complete</b> + * packets in the buffer at this time. + */ + public static DecodedPacket extractPacket(ByteBuffer buffer) + throws BufferTooSmallException + { + int remaining = buffer.remaining(); + + // if we are empty we can reset the buffer + if (remaining == 0) { + buffer.clear(); + return null; + } + + // if we can't figure out the size because we have less than + // 4 bytes we prepare the buffer for more data reading. + if (remaining < 4) { + buffer.compact(); + return null; + } + + int plen = packetLength(buffer); + + // -1 means that we do not have enough data to read the packet + // size yet + if (plen == -1) { + buffer.compact(); + return null; + } + + // if we haven't read an entire packet yet, we compact and return + // (same as above but separate code for clarity). note that this + // also occurs when there is no physical room for the packet, so + // clients of this API need to be aware of this and check for it + if (remaining < plen) { + + // if the read buffer is too small we must take drastic action + if (buffer.capacity() < plen) { + throw new BufferTooSmallException("Buffer too small to hold packet"); + } + + buffer.compact(); + return null; + } + + return PacketDecoder.decodePacket(buffer); + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java b/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java new file mode 100644 index 00000000000..6b2c792837a --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java @@ -0,0 +1,135 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.log.LogLevel; +import com.yahoo.search.Query; + +/** + * Responsible for dumping query & query result packets + * + * @author Tony Vaagenes + */ +public class PacketDumper implements PacketListener { + /** High level representation of packet types (e.g. query, result, ...) */ + public static enum PacketType { + query(QueryPacket.class), + result(QueryResultPacket.class); + + Class<? extends BasicPacket> implementationType; + + PacketType(Class<? extends BasicPacket> implementationType) { + this.implementationType = implementationType; + } + } + + private static Logger log = Logger.getLogger(PacketDumper.class.getSimpleName()); + + private volatile boolean disabled = true; + private final File logDirectory; + private final Map<Class<? extends BasicPacket>, DataOutputStream> dumpFiles = + new HashMap<>(); + private final String fileNamePattern; + + private void handlePacket(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm, String direction) { + //minimize overhead when disabled: + if (disabled) + return; + + try { + DataOutputStream stream = getOutputStream(packet); + if (stream != null) { + synchronized (stream) { + stream.writeChars(packet.getTimestamp() + " " + direction + " packet on channel " + channel.getChannelId()); + String indent = " "; + Query query = channel.getQuery(); + if (query != null) + stream.writeChars('\n' + indent + "Query: '" + query.getModel().getQueryString()); + hexDump(indent, stream, serializedForm); + + stream.writeChar('\n'); + stream.flush(); + } + } + } catch (IOException e) { + log.log(LogLevel.WARNING, "Could not log packet.", e); + } + } + + private void hexDump(String indent, DataOutputStream stream, ByteBuffer serializedForm) throws IOException { + HexByteIterator hexByteIterator = new HexByteIterator(serializedForm); + + long count = 0; + final int maxNumCharacters = 80; + while (hexByteIterator.hasNext()) { + if (count++ % maxNumCharacters == 0) + stream.writeChar('\n'); + stream.writeChars(hexByteIterator.next()); + } + } + + private synchronized DataOutputStream getOutputStream(BasicPacket packet) { + return dumpFiles.get(packet.getClass()); + } + + public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { + handlePacket(channel, packet, serializedForm, "Sent"); + } + + public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { + handlePacket(channel, packet, serializedForm, "Received"); + } + + public synchronized void dumpPackets(PacketType packetType, boolean on) throws IOException { + OutputStream stream = dumpFiles.get(packetType.implementationType); + if (!on && stream != null) + closeFile(stream, packetType); + else if (on && stream == null) + openFile(packetType); + } + + private void openFile(PacketType packetType) throws FileNotFoundException { + if (!logDirectory.exists() || + logDirectory.mkdirs()) { + + throw new RuntimeException("PacketDumper: Could not create log directory " + logDirectory); + } + String fileName = fileNamePattern.replace("%s", packetType.toString()); + boolean append = true; + DataOutputStream outputStream = + new DataOutputStream( + new BufferedOutputStream( + new FileOutputStream(new File(logDirectory, fileName), append))); + dumpFiles.put(packetType.implementationType, outputStream); + + disabled = dumpFiles.isEmpty(); + } + + private void closeFile(OutputStream stream, PacketType packetType) throws IOException { + try { + synchronized (stream) { + stream.close(); + } + } finally { + dumpFiles.remove(packetType.implementationType); + disabled = dumpFiles.isEmpty(); + } + } + + public PacketDumper(File logDirectory, String fileNamePattern) { + this.logDirectory = logDirectory; + this.fileNamePattern = fileNamePattern; + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketListener.java b/container-search/src/main/java/com/yahoo/fs4/PacketListener.java new file mode 100644 index 00000000000..113da03b420 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/PacketListener.java @@ -0,0 +1,16 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +import com.yahoo.fs4.mplex.FS4Channel; + +/** + * Interface for recieving notifications of packets sent or recieved. + * + * @author Tony Vaagenes + */ +public interface PacketListener { + void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm); + void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm); +} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java b/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java new file mode 100644 index 00000000000..1be79031d1b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java @@ -0,0 +1,34 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +import com.yahoo.fs4.mplex.FS4Channel; + +/** + * Broadcasts packet notifications to a list of listeners. + * + * @author Tony Vaagenes + */ +public class PacketNotificationsBroadcaster implements PacketListener { + + private final PacketListener[] listeners; + + public PacketNotificationsBroadcaster(PacketListener... listeners) { + this.listeners = listeners; + } + + @Override + public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { + if (channel == null) return; + for (PacketListener listener : listeners) + listener.packetSent(channel, packet, serializedForm); + } + + @Override + public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { + if (channel == null) return; + for (PacketListener listener : listeners) + listener.packetReceived(channel, packet, serializedForm); + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java b/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java new file mode 100644 index 00000000000..b577ef31ad8 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java @@ -0,0 +1,53 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.search.Query; + +/** + * Adds packets to the query context + * + * @author Tony Vaagenes + */ +public class PacketQueryTracer implements PacketListener { + + private final static int traceLevel = 10; + + private void addTrace(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { + Query query = channel.getQuery(); + if (query != null && query.getTraceLevel() >= traceLevel) { + StringBuilder traceString = new StringBuilder(); + traceString.append(packet.getClass().getSimpleName()).append(": "); + hexDump(serializedForm, traceString); + + final boolean includeQuery = true; + query.trace(traceString.toString(), includeQuery, traceLevel); + } + } + + private void hexDump(ByteBuffer serializedForm, StringBuilder traceString) { + HexByteIterator hexByteIterator = new HexByteIterator(serializedForm); + + long count = 0; + final int maxNumCharacters = 80; + while (hexByteIterator.hasNext()) { + if (++count % maxNumCharacters == 0) + traceString.append('\n'); + traceString.append(hexByteIterator.next()); + } + } + + @Override + public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { + addTrace(channel, packet, serializedForm); + } + + @Override + public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { + addTrace(channel, packet, serializedForm); + } + +} + diff --git a/container-search/src/main/java/com/yahoo/fs4/PingPacket.java b/container-search/src/main/java/com/yahoo/fs4/PingPacket.java new file mode 100644 index 00000000000..7c1f8df1101 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/PingPacket.java @@ -0,0 +1,26 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +/** + * A ping packet for FS4. This packet has no data. It maps to + * PCODE_MONITORQUERY the C++ implementation of the protocol. + * + * @author Steinar Knutsen + */ +public class PingPacket extends BasicPacket { + + public int getCode() { return 220; } + + public void encodeBody(ByteBuffer buffer) { + buffer.putInt(MQF_QFLAGS); + buffer.putInt(MQFLAG_REPORT_ACTIVEDOCS); + } + + /** feature bits, taken from searchlib/common/transport.h */ + static final int MQF_QFLAGS = 0x00000002; + + /** flag bits, taken from searchlib/common/transport.h */ + static final int MQFLAG_REPORT_ACTIVEDOCS = 0x00000020; +} diff --git a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java new file mode 100644 index 00000000000..37aaf7067a9 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java @@ -0,0 +1,92 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; +import java.util.Optional; + +/** + * A pong packet for FS4. It maps to PCODE_MONITORRESULTX + * in the C++ implementation of the protocol. + * + * @author Steinar Knutsen + */ +public class PongPacket extends BasicPacket { + + private int dispatchTimestamp; + + @SuppressWarnings("unused") + private int totalNodes; // configured nodes + private Optional<Integer> activeNodes = Optional.empty(); // number of nodes that are up + @SuppressWarnings("unused") + private int totalPartitions; // configured partitions + private Optional<Integer> activePartitions = Optional.empty(); // number of partitions that are up + + private Optional<Long> activeDocuments = Optional.empty(); // how many documents are searchable (sum) + + public PongPacket() { + } + + /** For testing */ + public PongPacket(long activeDocuments) { + this.activeDocuments = Optional.of(activeDocuments); + } + + private int code; + protected void codeDecodedHook(int code) { this.code = code; } + public int getCode() { return code; } + + public void decodeBody(ByteBuffer buffer) { + int features = buffer.getInt(); + buffer.getInt(); // Unused lowPartitionId + dispatchTimestamp = buffer.getInt(); + if ((features & MRF_MLD) != 0) { + totalNodes = buffer.getInt(); + activeNodes = Optional.of(buffer.getInt()); + totalPartitions = buffer.getInt(); + activePartitions = Optional.of(buffer.getInt()); + } + if ((features & MRF_RFLAGS) != 0) { + buffer.getInt(); // ignore rflags (historical field) + } + if ((features & MRF_ACTIVEDOCS) != 0) { + activeDocuments = Optional.of(Long.valueOf(buffer.getLong())); + } + } + + public static PongPacket create() { + return new PongPacket(); + } + + /** + * Return current docstamp for backend to make cache invalidation + * possible. + * */ + public int getDocstamp() { + return dispatchTimestamp; + } + + /** + * retrieve the reported number of active (searchable) documents + * in the monitored backend. + **/ + public Optional<Long> getActiveDocuments() { + return activeDocuments; + } + + public Optional<Integer> getActiveNodes() { + return activeNodes; + } + + public Optional<Integer> getActivePartitions() { + return activePartitions; + } + + /** feature bits, taken from searchlib/common/transport.h */ + static final int MRF_MLD = 0x00000001; + static final int MRF_RFLAGS = 0x00000008; + static final int MRF_ACTIVEDOCS = 0x00000010; + + /** packet codes, taken from searchlib/common/transport.h */ + static final int PCODE_MONITORRESULTX = 221; + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java new file mode 100644 index 00000000000..d1d8b9a0f77 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java @@ -0,0 +1,215 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import com.yahoo.compress.IntegerCompressor; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.prelude.query.Item; +import com.yahoo.search.Query; +import com.yahoo.search.dispatch.Dispatcher; +import com.yahoo.search.grouping.vespa.GroupingExecutor; +import com.yahoo.search.query.Ranking; +import com.yahoo.searchlib.aggregation.Grouping; +import com.yahoo.text.Utf8String; +import com.yahoo.vespa.objects.BufferSerializer; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An "extended query" packet. This is the query packets used today, + * they allow more flexible sets of parameters to be shipped with queries. + * This packet can be encoded only. + * + * @author bratseth + * @author Bjørn Borud + */ +public class QueryPacket extends Packet { + + private final String serverId; + private final Query query; + + private QueryPacketData queryPacketData; + + private QueryPacket(String serverId, Query query) { + this.serverId = serverId; + this.query = query; + } + + /** Returns the query from which this packet is populated */ + public Query getQuery() { + return query; + } + + /** + * Creates and returns a query packet + * + * @param query the query to convert to a packet + */ + public static QueryPacket create(String serverId, Query query) { + return new QueryPacket(serverId, query); + } + + + /** Returns the first offset requested */ + public int getOffset() { + return query.getOffset(); + } + + /** + * Returns the <i>last</i> offset requested (inclusively), that is + * getOffset() + getHits() + */ + public int getLastOffset() { + return getOffset() + getHits(); + } + + /** Returns the number of hits requested */ + public int getHits() { + return query.getHits(); + } + + public void encodeBody(ByteBuffer buffer) { + queryPacketData = new QueryPacketData(); + + boolean sendSessionKey = query.getGroupingSessionCache() || query.getRanking().getQueryCache(); + int featureFlag = getFeatureInt(sendSessionKey); + buffer.putInt(featureFlag); + + IntegerCompressor.putCompressedPositiveNumber(getOffset(), buffer); + IntegerCompressor.putCompressedPositiveNumber(getHits(), buffer); + buffer.putInt(Math.max(1, (int)query.getTimeLeft())); // Safety to avoid sending down 0 or negative number + buffer.putInt(getFlagInt()); + int startOfFieldToSave = buffer.position(); + Item.putString(query.getRanking().getProfile(), buffer); + queryPacketData.setRankProfile(buffer, startOfFieldToSave); + + if ( (featureFlag & QF_PROPERTIES) != 0) { + startOfFieldToSave = buffer.position(); + query.encodeAsProperties(buffer, true); + queryPacketData.setPropertyMaps(buffer, startOfFieldToSave); + } + + // Language not needed when sending query stacks + + if ((featureFlag & QF_SORTSPEC) != 0) { + int sortSpecLengthPosition=buffer.position(); + buffer.putInt(0); + int sortSpecLength = query.getRanking().getSorting().encode(buffer); + buffer.putInt(sortSpecLengthPosition, sortSpecLength); + } + + if ( (featureFlag & QF_GROUPSPEC) != 0) { + List<Grouping> groupingList = GroupingExecutor.getGroupingList(query); + BufferSerializer gbuf = new BufferSerializer(new GrowableByteBuffer()); + gbuf.putInt(null, groupingList.size()); + for (Grouping g: groupingList){ + g.serialize(gbuf); + } + gbuf.getBuf().flip(); + byte[] blob = new byte [gbuf.getBuf().limit()]; + gbuf.getBuf().get(blob); + buffer.putInt(blob.length); + buffer.put(blob); + } + + if (sendSessionKey) { + Utf8String key = query.getSessionId(serverId).asUtf8String(); + buffer.putInt(key.getByteLength()); + buffer.put(key.getBytes()); + } + + if ((featureFlag & QF_LOCATION) != 0) { + startOfFieldToSave = buffer.position(); + int locationLengthPosition=buffer.position(); + buffer.putInt(0); + int locationLength= query.getRanking().getLocation().encode(buffer); + buffer.putInt(locationLengthPosition, locationLength); + queryPacketData.setLocation(buffer, startOfFieldToSave); + } + + startOfFieldToSave = buffer.position(); + int stackItemPosition=buffer.position(); + buffer.putInt(0); // Number of stack items written below + int stackLengthPosition = buffer.position(); + buffer.putInt(0); + int stackPosition = buffer.position(); + int stackItemCount=query.encode(buffer); + int stackLength = buffer.position() - stackPosition; + buffer.putInt(stackItemPosition,stackItemCount); + buffer.putInt(stackLengthPosition, stackLength); + queryPacketData.setQueryStack(buffer, startOfFieldToSave); + } + + /** + * feature bits, taken from searchlib/common/transport.h + */ + private static final int QF_PARSEDQUERY = 0x00000002; + private static final int QF_RANKP = 0x00000004; + private static final int QF_SORTSPEC = 0x00000080; + private static final int QF_LOCATION = 0x00000800; + private static final int QF_PROPERTIES = 0x00100000; + private static final int QF_GROUPSPEC = 0x00400000; + private static final int QF_SESSIONID = 0x00800000; + + private int getFeatureInt(boolean sendSessionId) { + int features = QF_PARSEDQUERY | QF_RANKP; // this bitmask means "parsed query" in query packet. + // And rank properties. Both are always present + + features |= (query.getRanking().getSorting() != null) ? QF_SORTSPEC : 0; + features |= (query.getRanking().getLocation() != null) ? QF_LOCATION : 0; + features |= (query.hasEncodableProperties()) ? QF_PROPERTIES : 0; + features |= GroupingExecutor.hasGroupingList(query) ? QF_GROUPSPEC : 0; + features |= (sendSessionId) ? QF_SESSIONID : 0; + + return features; + } + + /** + * query flag bits, taken from searchlib/common/transport.h + */ + private static final int QFLAG_EXTENDED_COVERAGE = 0x00000001; + private static final int QFLAG_COVERAGE_NODES = 0x00000002; + private static final int QFLAG_ESTIMATE = 0x00000080; + private static final int QFLAG_DROP_SORTDATA = 0x00004000; + private static final int QFLAG_NO_RESULTCACHE = 0x00010000; + private static final int QFLAG_DUMP_FEATURES = 0x00040000; + + private int getFlagInt() { + int flags = getQueryFlags(query); + queryPacketData.setQueryFlags(flags); + flags |= query.properties().getBoolean(Dispatcher.dispatchInternal, false) ? 0 : QFLAG_DROP_SORTDATA; + return flags; + } + + + public int getCode() { + return 218; + } + + public String toString() { + return "Query x packet [query: " + query + "]"; + } + + static int getQueryFlags(Query query) { + int flags = QFLAG_EXTENDED_COVERAGE | QFLAG_COVERAGE_NODES; + + flags |= query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE) ? QFLAG_ESTIMATE : 0; + flags |= query.getNoCache() ? QFLAG_NO_RESULTCACHE : 0; + flags |= query.properties().getBoolean(Ranking.RANKFEATURES, false) ? QFLAG_DUMP_FEATURES : 0; + return flags; + } + + /** + * Fetch a binary wrapper containing data from encoding process for use in + * creating a summary request. + * + * @return wrapper object suitable for creating a summary fetch packet + * @throws IllegalStateException if no wrapper has been generated + */ + public QueryPacketData getQueryPacketData() { + if (queryPacketData == null) + throw new IllegalStateException("Trying to fetch a hit tag without having encoded the packet first."); + return queryPacketData; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java b/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java new file mode 100644 index 00000000000..673b9cc0c47 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java @@ -0,0 +1,91 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; + +/** + * Class for storing data which has to be constant between query and summary + * fetch for a Vespa hit. Used to avoid to tagging Vespa summary hits with + * the entire query as an immutable. + * + * @author Steinar Knutsen + */ +public final class QueryPacketData { + + private byte[] rankProfile = null; + private int queryFlags = 0; + private byte[] queryStack = null; + private byte[] location = null; + private byte[] propertyMaps = null; + + /** + * Given src.position() bigger than startOfField, allocate a fresh byte + * array, and copy the data from startOfField to src.position() into it. + * + * @param src + * the ByteBuffer to copy from + * @param startOfField + * the position of the buffer at which the field starts + * @return a copy of the data between startOfField and the buffer position + * before invokation + * @throws IllegalArgumentException + * if startOfField is somewhere after src.position() + */ + private byte[] copyField(final ByteBuffer src, final int startOfField) { + if (startOfField > src.position()) { + throw new IllegalArgumentException("startOfField after src.position()"); + } + final byte[] dst = new byte[src.position() - startOfField]; + + src.position(startOfField); + src.get(dst); + return dst; + } + + ByteBuffer encodeRankProfile(final ByteBuffer buffer) { + return buffer.put(rankProfile); + } + + void setRankProfile(final ByteBuffer src, final int startOfField) { + rankProfile = copyField(src, startOfField); + } + + ByteBuffer encodeQueryFlags(final ByteBuffer buffer) { + return buffer.putInt(queryFlags); + } + + void setQueryFlags(final int queryFlags) { + this.queryFlags = queryFlags; + } + + ByteBuffer encodeQueryStack(final ByteBuffer buffer) { + return buffer.put(queryStack); + } + + void setQueryStack(final ByteBuffer src, final int startOfField) { + queryStack = copyField(src, startOfField); + } + + ByteBuffer encodePropertyMaps(final ByteBuffer buffer) { + if (propertyMaps != null) { + buffer.put(propertyMaps); + } + return buffer; + } + + void setPropertyMaps(final ByteBuffer src, final int startOfField) { + propertyMaps = copyField(src, startOfField); + } + + void setLocation(final ByteBuffer src, final int startOfField) { + this.location = copyField(src, startOfField); + } + + ByteBuffer encodeLocation(final ByteBuffer buffer) { + if (location != null) { + buffer.put(location); + } + return buffer; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java new file mode 100644 index 00000000000..6a27beefb5e --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java @@ -0,0 +1,228 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A query result packet (code 217). This packet can be decoded only. + * + * @author bratseth + */ +public class QueryResultPacket extends Packet { + + /** The code of this type of package */ + private static final int code = 217; + + /** Whether mld data is included in this result */ + private boolean mldFeature = false; + + /** Whether sort data is included in this result */ + private boolean sortData = false; + + /** Whether coverage information is included in this result */ + private boolean coverageNodes = false; + private long coverageDocs = 0; + private long activeDocs = 0; + private long soonActiveDocs = 0; + private int degradedReason = 0; + private short nodesQueried = 0; + private short nodesReplied = 0; + + /** Whether the result contains grouping results **/ + private boolean groupDataFeature = false; + + /** Whether the result contains properties **/ + private boolean propsFeature = false; + + private long totalDocumentCount; + + private Number maxRank; + + private int docstamp; + + private byte[] groupData = null; + + private List<DocumentInfo> documents=new ArrayList<>(10); + + public FS4Properties[] propsArray; + + private int offset; + + private QueryResultPacket() { } + + public static QueryResultPacket create() { + return new QueryResultPacket(); + } + + public void setDocstamp(int docstamp){ this.docstamp=docstamp; } + + public int getDocstamp() { return docstamp; } + + /** Returns whether this has the mysterious mld feature */ + public boolean getMldFeature() { return mldFeature; } + + public boolean getCoverageFeature() { return true; } + + public long getCoverageDocs() { return coverageDocs; } + + public long getActiveDocs() { return activeDocs; } + + public long getSoonActiveDocs() { return soonActiveDocs; } + + public int getDegradedReason() { return degradedReason; } + + public boolean getCoverageFull() { + return coverageDocs == activeDocs; + } + + + /** @return offset returned by backend */ + public int getOffset() { return offset; } + + /** Only for testing. */ + public void setOffset(int offset) { + this.offset = offset; + } + + @Override + public void decodeBody(ByteBuffer buffer) { + IntBuffer ints = buffer.asIntBuffer(); + decodeFeatures(ints); + offset = ints.get(); + int documentCount = ints.get(); + buffer.position(buffer.position() + ints.position() * 4); + totalDocumentCount = buffer.getLong(); + maxRank = decodeMaxRank(buffer); + ints = buffer.asIntBuffer(); + docstamp = ints.get(); + buffer.position(buffer.position() + ints.position() * 4); + // do not access "ints" below here! + + if (coverageNodes) { + nodesQueried = buffer.getShort(); + nodesReplied = buffer.getShort(); + } + + byte[][] documentSortData = null; + if (sortData && documentCount > 0) { + documentSortData = decodeSortData(buffer, documentCount); + } + + if (groupDataFeature) { + int len = buffer.getInt(); + groupData = new byte[len]; + buffer.get(groupData); + } + + coverageDocs = buffer.getLong(); + activeDocs = buffer.getLong(); + soonActiveDocs = buffer.getLong(); + degradedReason = buffer.getInt(); + + decodeDocuments(buffer, documentCount, documentSortData); + if (propsFeature) { + int numMaps = buffer.getInt(); + propsArray = new FS4Properties[numMaps]; + for (int i = 0; i < numMaps; i++) { + propsArray[i] = new FS4Properties(); + propsArray[i].decode(buffer); + } + } + } + + private byte[][] decodeSortData(ByteBuffer buffer, int documentCount) { + int[] indexes = new int[documentCount]; + indexes[0] = 0; + for (int i = 1; i < documentCount; i++) { + indexes[i] = buffer.getInt(); + } + + int sortDataLengthInBytes = buffer.getInt(); + byte[][] ret = new byte[indexes.length][]; + + for (int i = 0; i < indexes.length; i++) { + int end = i + 1 >= indexes.length ? sortDataLengthInBytes : indexes[i + 1]; + int len = end - indexes[i]; + ret[i] = new byte[len]; + buffer.get(ret[i], 0, len); + } + return ret; + } + + private Number decodeMaxRank(ByteBuffer buffer) { + return Double.valueOf(buffer.getDouble()); + } + + /** + * feature bits + */ + private static final int QRF_MLD = 0x00000001; + private static final int QRF_COVERAGE_NODES = 0x00000002; + private static final int QRF_SORTDATA = 0x00000010; + private static final int QRF_UNUSED_1 = 0x00000020; + private static final int QRF_UNUSED_2 = 0x00000040; + private static final int QRF_GROUPDATA = 0x00000200; + private static final int QRF_PROPERTIES = 0x00000400; + + /** Decodes the feature int of this package data into boolean feature fields */ + private void decodeFeatures(IntBuffer buffer) { + int features = buffer.get(); + mldFeature = (QRF_MLD & features) != 0; + sortData = (QRF_SORTDATA & features) != 0; + coverageNodes = (QRF_COVERAGE_NODES & features) != 0; + groupDataFeature = (QRF_GROUPDATA & features) != 0; + propsFeature = (QRF_PROPERTIES & features) != 0; + } + + private void decodeDocuments(ByteBuffer buffer, int documentCount, byte[][] documentSortData) { + for (int i = 0; i < documentCount; i++) { + byte[] sort = documentSortData == null ? null : documentSortData[i]; + documents.add(new DocumentInfo(buffer, this, sort)); + } + } + + public int getCode() { return code; } + + protected void codeDecodedHook(int code) { + if ( code != QueryResultPacket.code) + throw new RuntimeException("Programming error, packet " + getCode() + "Not expected."); + } + + public int getDocumentCount() { return documents.size(); } + + public String toString() { + return "Query result x packet [" + getDocumentCount() + " documents]"; + } + + /** Returns the opaque grouping results **/ + public byte[] getGroupData() { return groupData; } + + + /** Returns the total number of documents avalable for this query */ + public long getTotalDocumentCount() { return totalDocumentCount; } + + /** Only for testing. */ + public void setTotalDocumentCount(long totalDocumentCount) { + this.totalDocumentCount = totalDocumentCount; + } + + /** Returns a read-only list containing the DocumentInfo objects of this result */ + public List<DocumentInfo> getDocuments() { + return Collections.unmodifiableList(documents); + } + + public void addDocument(DocumentInfo document) { + documents.add(document); + } + + // TODO: Handle new maxRank intelligently + public int getMaxRank() { return maxRank.intValue(); } + + public short getNodesQueried() { return nodesQueried; } + public short getNodesReplied() { return nodesReplied; } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java new file mode 100644 index 00000000000..12f8e9e387d --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java @@ -0,0 +1,449 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.PacketDumper; +import com.yahoo.fs4.PacketListener; +import com.yahoo.fs4.PacketNotificationsBroadcaster; +import com.yahoo.fs4.PacketQueryTracer; +import com.yahoo.io.Connection; +import com.yahoo.io.ConnectionFactory; +import com.yahoo.io.Listener; +import com.yahoo.vespa.defaults.Defaults; +import com.yahoo.yolean.Exceptions; +import com.yahoo.yolean.concurrent.ConcurrentResourcePool; +import com.yahoo.yolean.concurrent.ResourceFactory; +import com.yahoo.yolean.concurrent.ResourcePool; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author Bjorn Borud + */ +public class Backend implements ConnectionFactory { + + private static int DEFAULT_BUFFER_SIZE = 0x8000; + + public static final class BackendStatistics { + + public final int activeConnections; + public final int passiveConnections; + + public BackendStatistics(int activeConnections, int passiveConnections) { + this.activeConnections = activeConnections; + this.passiveConnections = passiveConnections; + } + + @Override + public String toString() { + return activeConnections + "/" + totalConnections(); + } + + public int totalConnections() { + return activeConnections + passiveConnections; + } + } + + private static final Logger log = Logger.getLogger(Backend.class.getName()); + + private final ListenerPool listeners; + private final InetSocketAddress address; + private final String host; + private final int port; + private final Map<Integer, FS4Channel> activeChannels = new HashMap<>(); + private int channelId = 0; + private boolean shutdownInitiated = false; + + /** Whether we are currently in the state of not being able to connect, to avoid repeated logging */ + private boolean areInSocketNotConnectableState = false; + + private final LinkedList<FS4Channel> pingChannels = new LinkedList<>(); + private final PacketListener packetListener; + private final ConnectionPool connectionPool; + private final PacketDumper packetDumper; + private final AtomicInteger connectionCount = new AtomicInteger(0); + private final ConcurrentResourcePool<ByteBuffer> byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() { + @Override + public ByteBuffer create() { + return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + } + }); + + /** + * For unit testing. do not use + */ + protected Backend() { + listeners = null; + host = null; + port = 0; + packetListener = null; + packetDumper = null; + address = null; + connectionPool = new ConnectionPool(); + } + + public Backend(String host, + int port, + String serverDiscriminator, + ListenerPool listenerPool, + ConnectionPool connectionPool) { + String fileNamePattern = "qrs." + serverDiscriminator + '.' + host + ":" + port + ".%s" + ".dump"; + packetDumper = new PacketDumper(new File(Defaults.getDefaults().underVespaHome("logs/vespa/qrs/")), + fileNamePattern); + packetListener = new PacketNotificationsBroadcaster(packetDumper, new PacketQueryTracer()); + this.listeners = listenerPool; + this.host = host; + this.port = port; + address = new InetSocketAddress(host, port); + this.connectionPool = connectionPool; + } + + private void logWarning(String attemptDescription, Exception e) { + log.log(Level.WARNING, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); + } + + private void logInfo(String attemptDescription, Exception e) { + log.log(Level.INFO, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); + } + + // ============================================================ + // ==== connection pool stuff + // ============================================================ + + /** + * Fetch a connection from the connection pool. If the pool + * is empty we create a connection. + */ + private FS4Connection getConnection() throws IOException { + FS4Connection connection = connectionPool.getConnection(); + if (connection == null) { + // if pool was empty create one: + connection = createConnection(); + } + return connection; + } + + ConcurrentResourcePool<ByteBuffer> getBufferPool() { + return byteBufferResourcePool; + } + + /** + * Return a connection to the connection pool. If the + * connection is not valid anymore we drop it, ie. do not + * put it into the pool. + */ + public void returnConnection(FS4Connection connection) { + connectionPool.releaseConnection(connection); + } + + /** + * Create a new connection to the target for this backend. + */ + private FS4Connection createConnection() throws IOException { + SocketChannel socket = SocketChannel.open(); + try { + connectSocket(socket); + } catch (Exception e) { + // was warning, see VESPA-1922 + if ( ! areInSocketNotConnectableState) { + logInfo("connecting to", e); + } + areInSocketNotConnectableState = true; + socket.close(); + return null; + } + areInSocketNotConnectableState = false; + int listenerId = connectionCount.getAndIncrement()%listeners.size(); + Listener listener = listeners.get(listenerId); + FS4Connection connection = new FS4Connection(socket, listener, this, packetListener); + listener.registerConnection(connection); + + log.fine("Created new connection to " + host + ":" + port); + connectionPool.createdConnection(); + return connection; + } + + private void connectSocket(SocketChannel socket) throws IOException { + socket.configureBlocking(false); + + boolean connected = socket.connect(address); + + // wait for connection + if (!connected) { + long timeBarrier = System.currentTimeMillis() + 20L; + while (true) { + try { + Thread.sleep(5L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Received InterruptedException while waiting for socket to connect.", e); + } + // don't care whether it's spurious wakeup + connected = socket.finishConnect(); + if (connected || System.currentTimeMillis() > timeBarrier) { + break; + } + } + } + + // did we get a connection? + if ( !connected) { + throw new IllegalArgumentException("Could not create connection to dispatcher on " + + address.getHostName() + ":" + address.getPort()); + } + socket.socket(). + setTcpNoDelay(true); + } + + + //============================================================ + //==== channel management + //============================================================ + + /** Opens a new channel to fdispatch. Analogous to the "Channel" concept as used in FS4. */ + public FS4Channel openChannel() { + int cachedChannelId; + synchronized (this) { + if (channelId >= ((1 << 31) - 2)) { + channelId = 0; + } + cachedChannelId = channelId; + channelId += 2; + } + Integer id = cachedChannelId; + FS4Channel chan = new FS4Channel(this, id); + synchronized (activeChannels) { + activeChannels.put(id, chan); + } + return chan; + } + + public FS4Channel openPingChannel() { + FS4Channel chan = FS4Channel.createPingChannel(this); + synchronized (pingChannels) { + pingChannels.add(chan); + } + return chan; + } + + /** + * Get the remote address for this Backend. This method + * has package access only, because it is really only of + * importance to FS4Channel for writing slightly more sensible + * log messages. + * @return Returns the address (host, port) for this Backend. + */ + InetSocketAddress getAddress() { + return address; + } + + /** + * Get an active channel by id. + * + * @param id the (fs4) channel id + * @return returns the (fs4) channel associated with this id + * or <code>null</code> if the channel is not in the + * set of active channels. + */ + public FS4Channel getChannel(Integer id) { + synchronized (activeChannels) { + return activeChannels.get(id); + } + } + + /** + * Return the first channel in the queue waiting for pings or + * <code>null</code> if none. + */ + public FS4Channel getPingChannel() { + synchronized (pingChannels) { + return (pingChannels.isEmpty()) ? null : pingChannels.getFirst(); + } + } + + /** + * Get an active channel by id. This is a wrapper for the method + * that takes the id as an Integer. + * + * @param id The (fs4) channel id + * @return Returns the (fs4) channel associated with this id + * or <code>null</code> if the channel is not in the + * set of active channels. + */ + public FS4Channel getChannel(int id) { + return getChannel(Integer.valueOf(id)); + } + + /** + * Remove a channel. We do not want this method to be called + * directly by the client -- removal of channels should be done + * by calling the close() method of the channel. + * + * @param id The (fs4) channel id + * @return Removes and returns the (fs4) channel associated + * with this id or <code>null</code> if the channel is + * not in the set of active channels. + */ + protected FS4Channel removeChannel(Integer id) { + synchronized (activeChannels) { + return activeChannels.remove(id); + } + } + + /** + * Remove a ping channel. We do not want this method to be called + * directly by the client -- removal of channels should be done + * by calling the close() method of the channel. + * + * @return Removes and returns the (fs4) channel first in + * the queue of ping channels or <code>null</code> + * if there are no active ping channels. + */ + protected FS4Channel removePingChannel() { + synchronized (pingChannels) { + if (pingChannels.isEmpty()) + return null; + return pingChannels.removeFirst(); + } + } + //============================================================ + //==== packet sending and reception + //============================================================ + + protected boolean sendPacket(BasicPacket packet, Integer channelId) throws IOException { + if (shutdownInitiated) { + log.fine("Tried to send packet after shutdown initiated. Ignored."); + return false; + } + + FS4Connection connection = null; + try { + connection = getConnection(); + if (connection == null) { + return false; + } + connection.sendPacket(packet, channelId); + } + finally { + if (connection != null) { + returnConnection(connection); + } + } + + return true; + } + + /** + * When a connection receives a packet, it uses this method to + * dispatch the packet to the right FS4Channel. If the corresponding + * FS4Channel does not exist the packet is dropped and a message is + * logged saying so. + */ + protected void receivePacket(BasicPacket packet) { + FS4Channel fs4; + if (packet.hasChannelId()) + fs4 = getChannel(((Packet)packet).getChannel()); + else + fs4 = getPingChannel(); + + // channel does not exist + if (fs4 == null) { + return; + } + try { + fs4.addPacket(packet); + } + catch (InterruptedException e) { + log.info("Interrupted during packet adding. Packet = " + packet.toString()); + Thread.currentThread().interrupt(); + } + catch (InvalidChannelException e) { + log.log(Level.WARNING, "Channel was invalid. Packet = " + packet.toString() + + " Backend probably sent data pertaining an old request," + + " system may be overloaded."); + } + } + + /** + * Attempt to establish a connection without sending messages and then + * return it to the pool. The assumption is that if the probing is + * successful, the connection will be used soon after. There should be + * minimal overhead since the connection is cached. + */ + public boolean probeConnection() { + if (shutdownInitiated) { + return false; + } + + FS4Connection connection = null; + try { + connection = getConnection(); + } catch (IOException ignored) { + // connection is null + } finally { + if (connection != null) { + returnConnection(connection); + } + } + + return connection != null; + } + + /** + * This method should be used to ensure graceful shutdown of the backend. + */ + public void shutdown() { + log.fine("shutting down"); + if (shutdownInitiated) { + throw new IllegalStateException("Shutdown already in progress"); + } + shutdownInitiated = true; + } + + public void close() { + for (Connection c = connectionPool.getConnection(); c != null; c = connectionPool.getConnection()) { + try { + c.close(); + } catch (IOException e) { + logWarning("closing", e); + } + } + } + + /** + * Connection factory used by the Listener class. + */ + public Connection newConnection(SocketChannel channel, Listener listener) { + return new FS4Connection(channel, listener, this, packetListener); + } + + public String toString () { + return("Backend/" + host + ":" + port); + } + + public BackendStatistics getStatistics() { + synchronized (connectionPool) { //ensure consistent values + return new BackendStatistics(connectionPool.activeConnections(), connectionPool.passiveConnections()); + } + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java new file mode 100644 index 00000000000..e84adfbef2c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java @@ -0,0 +1,90 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; +import java.util.Iterator; +import java.util.Timer; +import java.util.TimerTask; + +import com.yahoo.log.LogLevel; +/** + * Pool of FS4 connections. + * + * @author Tony Vaagenes + */ +public class ConnectionPool { + + private final static int CLEANINGPERIOD = 1000; // Execute every second + private final Queue<FS4Connection> connections = new ConcurrentLinkedQueue<>(); + private final AtomicInteger activeConnections = new AtomicInteger(0); + private final AtomicInteger passiveConnections = new AtomicInteger(0); + private static final Logger log = Logger.getLogger(ConnectionPool.class.getName()); + + class PoolCleanerTask extends TimerTask { + private final ConnectionPool connectionPool; + public PoolCleanerTask(ConnectionPool connectionPool) { + this.connectionPool = connectionPool; + } + + public void run() { + try { + connectionPool.dropInvalidConnections(); + } catch (Exception e) { + log.log(LogLevel.WARNING, + "Caught exception in connection pool cleaner, ignoring.", + e); + } + } + } + + public ConnectionPool() { + } + + public ConnectionPool(Timer timer) { + timer.schedule(new PoolCleanerTask(this), CLEANINGPERIOD, CLEANINGPERIOD); + } + + private void dropInvalidConnections() { + for (Iterator<FS4Connection> i = connections.iterator(); i.hasNext();) { + FS4Connection connection = i.next(); + if (!connection.isValid()) { + i.remove(); + } + } + } + + private FS4Connection registerAsActiveIfNonZero(FS4Connection connection) { + activeConnections.incrementAndGet(); + passiveConnections.decrementAndGet(); + return connection; + } + + public FS4Connection getConnection() { + return registerAsActiveIfNonZero(connections.poll()); + } + + void releaseConnection(FS4Connection connection) { + assert(connection != null); + activeConnections.decrementAndGet(); + if (connection.isValid()) { + passiveConnections.incrementAndGet(); + connections.add(connection); + } + } + + void createdConnection() { + activeConnections.incrementAndGet(); + } + + int activeConnections() { + return activeConnections.get(); + } + + //unused connections in the pool + int passiveConnections() { + return passiveConnections.get(); + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java new file mode 100644 index 00000000000..adfc63d02f7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java @@ -0,0 +1,255 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.Packet; +import com.yahoo.search.Query; +import com.yahoo.search.dispatch.ResponseMonitor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * This class is used to represent a "channel" in the FS4 protocol. + * A channel represents a session between a client and the fdispatch. + * Internally this class has a response queue used by the backend + * for queueing up FS4 packets that belong to this channel (or + * <em>session</em>, which might be a more appropriate name for it). + * Outbound packets are handed off to the FS4Connection. + * + * @author Bjorn Borud + */ +public class FS4Channel { + + private static Logger log = Logger.getLogger(FS4Channel.class.getName()); + + private Integer channelId; + private Backend backend; + volatile private BlockingQueue<BasicPacket> responseQueue; + private Query query; + private boolean isPingChannel = false; + private ResponseMonitor<FS4Channel> monitor; + + /** for unit testing. do not use */ + protected FS4Channel () { + } + + protected FS4Channel(Backend backend, Integer channelId) { + this.channelId = channelId; + this.backend = backend; + this.responseQueue = new LinkedBlockingQueue<>(); + } + + static public FS4Channel createPingChannel(Backend backend) { + FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0)); + pingChannel.isPingChannel = true; + return pingChannel; + } + + /** Set the query currently associated with this channel */ + public void setQuery(Query query) { + this.query = query; + } + + /** Get the query currently associated with this channel */ + public Query getQuery() { + return query; + } + + /** Returns the (fs4) channel id */ + public Integer getChannelId () { + return channelId; + } + + /** + * Closes the channel + */ + public void close () { + BlockingQueue<BasicPacket> q = responseQueue; + responseQueue = null; + query = null; + if (isPingChannel) { + backend.removePingChannel(); + } else { + backend.removeChannel(channelId); + } + if (q != null) { + q.clear(); + } + } + + /** + * Legacy interface. + */ + public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { + ensureValid(); + return backend.sendPacket(packet, channelId); + } + + /** + * Receives the given number of packets and returns them, OR + * <ul> + * <li>Returns a smaller number of packets if an error or eol packet is received + * <li>Throws a ChannelTimeoutException if timeout occurs before all packets + * are received. Packets received with the wrong channel id are ignored. + * </ul> + * + * @param timeout the number of ms to attempt to get packets before throwing an exception + * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error + */ + public BasicPacket[] receivePackets(long timeout, int packetCount) + throws InvalidChannelException, ChannelTimeoutException { + ensureValid(); + + List<BasicPacket> packets = new ArrayList<>(12); + long startTime = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeout; + + try { + while (timeLeft >= 0) { + BasicPacket p = nextPacket(timeLeft); + if (p == null) throw new ChannelTimeoutException("Timed out"); + + if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) { + log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId()); + continue; + } + + packets.add(p); + if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) { + BasicPacket[] packetArray = new BasicPacket[packets.size()]; + packets.toArray(packetArray); + return packetArray; + } + + // doing this last might save us one system call for the last + // packet. + timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime); + } + } + catch (InvalidChannelException e) { + // nop. if we get this we want to return the default + // zero length packet array indicating that we have no + // valid response + log.info("FS4Channel was invalid. timeLeft=" + + timeLeft + ", timeout=" + timeout); + } + catch (InterruptedException e) { + log.info("FS4Channel was interrupted. timeLeft=" + + timeLeft + ", timeout=" + timeout); + Thread.currentThread().interrupt(); + } + + // default return, we only hit this if we timed out and + // did not get the end of the packet stream + throw new ChannelTimeoutException(); + } + + private static boolean hasEnoughPackets(int packetCount,List<BasicPacket> packets) { + if (packetCount<0) return false; + return packets.size()>=packetCount; + } + + /** + * Returns true if we will definitely receive more packets on this stream + * + * Shouldn't that be "_not_ receive more packets"? + */ + private static boolean isLastPacket (BasicPacket packet) { + if (packet instanceof com.yahoo.fs4.ErrorPacket) return true; + if (packet instanceof com.yahoo.fs4.EolPacket) return true; + if (packet instanceof com.yahoo.fs4.PongPacket) return true; + return false; + } + + /** + * Return the next available packet from the response queue. If there + * are no packets available we wait a maximum of <code>timeout</code> + * milliseconds before returning a <code>null</code> + * + * @param timeout Number of milliseconds to wait for a packet + * to become available. + * + * @return Returns the next available <code>BasicPacket</code> or + * <code>null</code> if we timed out. + */ + public BasicPacket nextPacket(long timeout) + throws InterruptedException, InvalidChannelException + { + return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Add incoming packet to the response queue. This is to be used + * by the listener for placing incoming packets in the response + * queue. + * + * @param packet BasicPacket to be placed in the response queue. + * + */ + protected void addPacket (BasicPacket packet) + throws InterruptedException, InvalidChannelException + { + ensureValidQ().put(packet); + notifyMonitor(); + } + + /** + * A valid FS4Channel is one that has not yet been closed. + * + * @return Returns <code>true</code> if the FS4Channel is valid. + */ + public boolean isValid () { + return responseQueue != null; + } + + /** + * This method is called whenever we want to perform an operation + * which assumes that the FS4Channel object is valid. An exception + * is thrown if the opposite turns out to be the case. + * + * @throws InvalidChannelException if the channel is no longer valid. + */ + private void ensureValid () throws InvalidChannelException { + if (isValid()) { + return; + } + throw new InvalidChannelException("Channel is no longer valid"); + } + + /** + * This method is called whenever we want to perform an operation + * which assumes that the FS4Channel object is valid. An exception + * is thrown if the opposite turns out to be the case. + * + * @throws InvalidChannelException if the channel is no longer valid. + */ + private BlockingQueue<BasicPacket> ensureValidQ () throws InvalidChannelException { + BlockingQueue<BasicPacket> q = responseQueue; + if (q != null) { + return q; + } + throw new InvalidChannelException("Channel is no longer valid"); + } + + public String toString() { + return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]"); + } + + public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) { + this.monitor = monitor; + } + + protected void notifyMonitor() { + if(monitor != null) { + monitor.responseAvailable(this); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java new file mode 100644 index 00000000000..7dcbefde9fa --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java @@ -0,0 +1,371 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.fs4.mplex; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.BufferTooSmallException; +import com.yahoo.fs4.PacketDecoder; +import com.yahoo.fs4.PacketListener; +import com.yahoo.io.Connection; +import com.yahoo.io.Listener; +import com.yahoo.log.LogLevel; + +/** + * + * This class is used to represent a connection to an fdispatch + * + * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a> + */ +public class FS4Connection implements Connection +{ + private static Logger log = Logger.getLogger(FS4Connection.class.getName()); + private Backend backend; + private Listener listener; + private SocketChannel channel; + + private boolean shouldWrite = false; + + private static int idCounter = 1; + private int idNumber; + + // outbound data + private ByteBuffer writeBuffer; + private LinkedList<ByteBuffer> writeBufferList = new LinkedList<>(); + + // inbound data + private ByteBuffer fixedReadBuffer = ByteBuffer.allocateDirect(256 * 1024); + private ByteBuffer readBuffer = fixedReadBuffer; + + private volatile boolean valid = true; + + private final PacketListener packetListener; + + + /** + * Create an FS4 Connection. + */ + public FS4Connection (SocketChannel channel, Listener listener, Backend backend, PacketListener packetListener) { + this.backend = backend; + this.listener = listener; + this.channel = channel; + this.idNumber = idCounter++; + this.packetListener = packetListener; + + log.log(Level.FINER, "new: "+this+", id="+idNumber + ", address=" + backend.getAddress()); + } + + + /** + * Packet sending interface. + */ + public void sendPacket (BasicPacket packet, Integer channelId) throws IOException { + ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc()); + ByteBuffer viewForPacketListener = buffer.slice(); + synchronized (this) { + if (!(valid && channel.isOpen())) { + throw new IllegalStateException("Connection is not valid. " + + "Address = " + backend.getAddress() + + ", valid = " + valid + + ", isOpen = " + channel.isOpen()); + } + + if (writeBuffer == null) { + writeBuffer = buffer; + } else { + writeBufferList.addLast(buffer); + enableWrite(); + } + write(); + } + + if (packetListener != null) + packetListener.packetSent(backend.getChannel(channelId), packet, viewForPacketListener); + } + + + /** + * The write event handler. This can be called both from the client + * thread and from the IO thread, so it needs to be synchronized. It + * assumes that IO is nonblocking, and will attempt to keep writing + * data until the system won't accept more data. + * + */ + public synchronized void write () throws IOException { + if (! channel.isOpen()) { + throw new IllegalStateException("Channel not open in write(), address=" + backend.getAddress()); + } + + try { + int bytesWritten = 0; + boolean isFinished = false; + do { + // if writeBuffer is not set we need to fetch the next buffer + if (writeBuffer == null) { + + // if the list is empty, signal the selector we do not need + // to do any writing for a while yet and bail + if (writeBufferList.isEmpty()) { + disableWrite(); + isFinished = true; + break; + } + writeBuffer = writeBufferList.removeFirst(); + } + + // invariants: we have a writeBuffer + bytesWritten = channel.write(writeBuffer); + + // buffer drained so we forget it and see what happens when we + // go around. if indeed we go around + if (!writeBuffer.hasRemaining()) { + writeBuffer.clear(); + backend.getBufferPool().free(writeBuffer); + writeBuffer = null; + } + } while (bytesWritten > 0); + if (!isFinished) { + enableWrite(); + } + } catch (IOException e) { + log.log(LogLevel.DEBUG, "Failed writing to channel for backend " + backend.getAddress() + + ". Closing channel", e); + try { + close(); + } catch (IOException ignored) {} + + throw e; + } + } + + + private void disableWrite() { + if (shouldWrite) { + listener.modifyInterestOpsBatch(this, SelectionKey.OP_WRITE, false); + shouldWrite = false; + } + } + + + private void enableWrite() { + if (!shouldWrite) { + listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true); + shouldWrite = true; + } + } + + + + public void read () throws IOException { + if (! channel.isOpen()) { + throw new IOException("Channel not open in read(), address=" + backend.getAddress()); + } + + int bytesRead = 0; + + do { + try { + if (readBuffer == fixedReadBuffer) { + bytesRead = channel.read(readBuffer); + } else { + fixedReadBuffer.clear(); + if (readBuffer.remaining() < fixedReadBuffer.capacity()) { + fixedReadBuffer.limit(readBuffer.remaining()); + } + bytesRead = channel.read(fixedReadBuffer); + fixedReadBuffer.flip(); + readBuffer.put(fixedReadBuffer); + fixedReadBuffer.clear(); + } + } + catch (IOException e) { + // this is the "normal" way that connection closes. + log.log(Level.FINER, "Read exception address=" + backend.getAddress() + " id="+idNumber+": "+ + e.getClass().getName()+" / ", e); + bytesRead = -1; + } + + // end of file + if (bytesRead == -1) { + log.log(LogLevel.DEBUG, "Dispatch closed connection" + + " (id="+idNumber+", address=" + backend.getAddress() + ")"); + try { + close(); + } catch (Exception e) { + log.log(Level.WARNING, "Close failed, address=" + backend.getAddress(), e); + } + } + + // no more read + if (bytesRead == 0) { + // buffer too small? + if (! readBuffer.hasRemaining()) { + log.fine("Buffer possibly too small, extending"); + readBuffer.flip(); + extendReadBuffer(readBuffer.capacity() * 2); + } + } + + } while (bytesRead > 0); + + readBuffer.flip(); + + // hand off packet extraction + extractPackets(readBuffer); + } + + private void extractPackets(ByteBuffer readBuffer) { + for (;;) { + PacketDecoder.DecodedPacket packet = null; + try { + FS4Channel receiver = null; + int queryId = PacketDecoder.sniffChannel(readBuffer); + if (queryId == 0) { + if (PacketDecoder.isPongPacket(readBuffer)) + receiver = backend.getPingChannel(); + } + else { + receiver = backend.getChannel(Integer.valueOf(queryId)); + } + packet = PacketDecoder.extractPacket(readBuffer); + + if (packet != null) + packetListener.packetReceived(receiver, packet.packet, packet.consumedBytes); + } + catch (BufferTooSmallException e) { + log.fine("Unable to decode, extending readBuffer"); + extendReadBuffer(PacketDecoder.packetLength(readBuffer)); + return; + } + + // break out of loop if we did not get a packet out of the + // buffer so we can select and read some more + if (packet == null) { + + // if the buffer has been cleared, we can do a reset + // of the readBuffer + if ((readBuffer.position() == 0) + && (readBuffer.limit() == readBuffer.capacity())) + { + resetReadBuffer(); + } + break; + } + + backend.receivePacket(packet.packet); + } + } + + /** + * This is called when we close the connection to do any + * pending cleanup work. Closing a connection marks it as + * not valid. + */ + public void close () throws IOException { + valid = false; + channel.close(); + log.log(Level.FINER, "invalidated id="+idNumber + " address=" + backend.getAddress()); + } + + /** + * Upon asynchronous connect completion this method is called by + * the Listener. + */ + public void connect() throws IOException { + throw new RuntimeException("connect() was called, address=" + backend.getAddress() + ". " + + "asynchronous connect in NIO is flawed!"); + } + + /** + * Since we are performing an asynchronous connect we are initially + * only interested in the <code>OP_CONNECT</code> event. + */ + public int selectOps () { + return SelectionKey.OP_READ; + } + + /** + * Return the underlying SocketChannel used by this connection. + */ + public SocketChannel socketChannel() { + return channel; + } + + + public String toString () { + return FS4Connection.class.getName() + "/" + channel; + } + + + //============================================================ + //==== readbuffer management + //============================================================ + + + /** + * Extend the readBuffer. Make a new buffer of the requested size + * copy the contents of the readBuffer into it and assign reference + * to readBuffer instance variable. + * + * <P> + * <b>The readBuffer needs to be in "readable" (flipped) state before + * this is called and it will be in the "writeable" state when it + * returns.</b> + */ + private void extendReadBuffer (int size) { + // we specifically check this because packetLength() can return -1 + // and someone might alter the code so that we do in fact get -1 + // ...which never happens as the code is now + // + if (size == -1) { + throw new RuntimeException("Invalid buffer size requested: -1"); + } + + // if we get a size that is smaller than the current + // readBuffer capacity we just double it. not sure how wise this + // might be. + // + if (size < readBuffer.capacity()) { + size = readBuffer.capacity() * 2; + } + + ByteBuffer tmp = ByteBuffer.allocate(size); + tmp.put(readBuffer); + log.fine("Extended readBuffer to " + size + " bytes" + + "from " + readBuffer.capacity() + " bytes"); + readBuffer = tmp; + } + + /** + * Clear the readBuffer, and if temporarily allocated bigger + * buffer is in use: ditch it and reset the reference to the + * fixed readBuffer. + */ + private void resetReadBuffer () { + fixedReadBuffer.clear(); + if (readBuffer == fixedReadBuffer) { + return; + } + log.fine("Resetting readbuffer"); + readBuffer = fixedReadBuffer; + } + + /** + * This method is used to determine whether the connection is still + * viable or not. All connections are initially valid, but they + * become invalid if we close the connection or something bad happens + * and the connection needs to be ditched. + */ + public boolean isValid() { + return valid; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java new file mode 100644 index 00000000000..6176d069645 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java @@ -0,0 +1,15 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- + +package com.yahoo.fs4.mplex; + +/** + * @author <a href="mailto:borud@yahoo-inc.com">Bj\u00f8rn Borud</a> + */ +@SuppressWarnings("serial") +public class InvalidChannelException extends Exception +{ + public InvalidChannelException (String message) { + super(message); + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java new file mode 100644 index 00000000000..d76d270dcb7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java @@ -0,0 +1,56 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import com.yahoo.io.FatalErrorHandler; +import com.yahoo.io.Listener; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Pool of com.yahoo.io.Listener instances for shared use by Vespa backend + * searchers. + * + * @author baldersheim + * @since 5.3.0 + */ +public final class ListenerPool { + private final static Logger logger = Logger.getLogger(ListenerPool.class.getName()); + private final List<Listener> listeners; + + public ListenerPool(String name, int numListeners) { + listeners = new ArrayList<>(numListeners); + FatalErrorHandler fatalErrorHandler = new FatalErrorHandler(); + for (int i = 0; i < numListeners; i++) { + Listener listener = new Listener(name + "-" + i); + listener.setFatalErrorHandler(fatalErrorHandler); + listener.start(); + listeners.add(listener); + } + } + + public Listener get(int index) { + return listeners.get(index); + } + + public int size() { + return listeners.size(); + } + + public void close() { + for (Listener listener : listeners) { + listener.interrupt(); + } + try { + for (Listener listener : listeners) { + listener.join(); + } + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Got interrupted", e); + Thread.currentThread().interrupt(); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/prelude/Pong.java b/container-search/src/main/java/com/yahoo/prelude/Pong.java index a60fba9a4f7..a6bc3e7975d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/Pong.java +++ b/container-search/src/main/java/com/yahoo/prelude/Pong.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude; +import com.yahoo.fs4.PongPacket; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.statistics.ElapsedTime; @@ -18,19 +19,28 @@ public class Pong { private String pingInfo=""; private final List<ErrorMessage> errors = new ArrayList<>(1); + private final Optional<PongPacket> pongPacket; private ElapsedTime elapsed = new ElapsedTime(); private final Optional<Long> activeDocuments; public Pong() { + this.pongPacket = Optional.empty(); this.activeDocuments = Optional.empty(); } public Pong(ErrorMessage error) { errors.add(error); + this.pongPacket = Optional.empty(); + this.activeDocuments = Optional.empty(); + } + + public Pong(PongPacket pongPacket) { + this.pongPacket = Optional.of(pongPacket); this.activeDocuments = Optional.empty(); } public Pong(long activeDocuments) { + this.pongPacket = Optional.empty(); this.activeDocuments = Optional.of(activeDocuments); } @@ -42,14 +52,21 @@ public class Pong { return errors.get(i); } + public int getErrorSize() { + return errors.size(); + } + /** Returns the number of active documents in the backend responding in this Pong, if available */ public Optional<Long> activeDocuments() { - return activeDocuments; + if (activeDocuments.isPresent()) return activeDocuments; + if ( ! pongPacket.isPresent()) return Optional.empty(); + return pongPacket.get().getActiveDocuments(); } /** Returns the number of nodes which responded to this Pong, if available */ public Optional<Integer> activeNodes() { - return Optional.empty(); + if ( ! pongPacket.isPresent()) return Optional.empty(); + return pongPacket.get().getActiveNodes(); } public List<ErrorMessage> getErrors() { @@ -61,6 +78,16 @@ public class Pong { return ! errors.isEmpty(); } + /** Sets information about the ping used to produce this. This is included when returning the tostring of this. */ + public void setPingInfo(String pingInfo) { + if (pingInfo==null) + pingInfo=""; + this.pingInfo=pingInfo; + } + + /** Returns information about the ping use, or "" (never null) if none */ + public String getPingInfo() { return pingInfo; } + public ElapsedTime getElapsedTime() { return elapsed; } diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java new file mode 100644 index 00000000000..c075a0f842b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java @@ -0,0 +1,161 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.cluster; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.yahoo.component.provider.Freezable; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.search.result.ErrorMessage; + +/** + * Monitors of a cluster of remote nodes. The monitor uses an internal thread + * for node monitoring. + * + * @author bratseth + * @author Steinar Knutsen + */ +public class ClusterMonitor implements Runnable, Freezable { + + // The ping thread wil start using the system, but we cannot be guaranteed that all components + // in the system is up. As a workaround for not being able to find out when the system + // is ready to be used, we wait some time before starting the ping thread + private static final int pingThreadInitialDelayMs = 3000; + + private final MonitorConfiguration configuration; + + private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName()); + + private final ClusterSearcher nodeManager; + + private final Optional<VipStatus> vipStatus; + + /** A map from Node to corresponding MonitoredNode */ + private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>(); + + private ScheduledFuture<?> future; + + private boolean isFrozen = false; + + ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional<VipStatus> vipStatus) { + configuration = new MonitorConfiguration(monitorConfig); + nodeManager = manager; + this.vipStatus = vipStatus; + log.fine("checkInterval is " + configuration.getCheckInterval() + " ms"); + } + + /** Returns the configuration of this cluster monitor */ + MonitorConfiguration getConfiguration() { + return configuration; + } + + void startPingThread() { + if ( ! isFrozen()) + throw new IllegalStateException("Do not start the monitoring thread before the set of " + + "nodes to monitor is complete/the ClusterMonitor is frozen."); + future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS); + } + + /** + * Adds a new node for monitoring. + */ + void add(VespaBackEndSearcher node) { + if (isFrozen()) + throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen."); + nodeMonitors.put(node, new NodeMonitor(node)); + updateVipStatus(); + } + + /** Called from ClusterSearcher/NodeManager when a node failed */ + void failed(VespaBackEndSearcher node, ErrorMessage error) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasWorking = monitor.isWorking(); + monitor.failed(error); + if (wasWorking && !monitor.isWorking()) { + log.info("Failed monitoring node '" + node + "' due to '" + error); + nodeManager.failed(node); + } + updateVipStatus(); + } + + /** Called when a node responded */ + void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasFailing = !monitor.isWorking(); + monitor.responded(hasSearchNodesOnline); + if (wasFailing && monitor.isWorking()) { + log.info("Failed node '" + node + "' started working again."); + nodeManager.working(node); + } + updateVipStatus(); + } + + private void updateVipStatus() { + if ( ! vipStatus.isPresent()) return; + if ( ! hasInformationAboutAllNodes()) return; + + if (hasWorkingNodesWithDocumentsOnline()) { + vipStatus.get().addToRotation(nodeManager.getId().stringValue()); + } else { + vipStatus.get().removeFromRotation(nodeManager.getId().stringValue()); + } + } + + private boolean hasInformationAboutAllNodes() { + for (NodeMonitor monitor : nodeMonitors.values()) { + if ( ! monitor.statusIsKnown()) + return false; + } + return true; + } + + private boolean hasWorkingNodesWithDocumentsOnline() { + for (NodeMonitor node : nodeMonitors.values()) { + if (node.isWorking() && node.searchNodesOnline()) + return true; + } + return false; + } + + /** + * Ping all nodes which needs pinging to discover state changes + */ + private void ping() throws InterruptedException { + for (NodeMonitor monitor : nodeMonitors.values()) { + nodeManager.ping(monitor.getNode()); + } + } + + @Override + public void run() { + log.finest("Activating ping"); + try { + ping(); + } catch (Exception e) { + log.log(Level.WARNING, "Error in monitor thread", e); + } + } + + public void shutdown() { + if (future != null) { + future.cancel(true); + } + } + + @Override + public void freeze() { + isFrozen = true; + + } + + @Override + public boolean isFrozen() { + return isFrozen; + } + +} diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index b0d9c2b0002..4ffcc0a4330 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -2,13 +2,20 @@ package com.yahoo.prelude.cluster; import com.yahoo.cloud.config.ClusterInfoConfig; +import com.yahoo.collections.Tuple2; import com.yahoo.component.ComponentId; +import com.yahoo.component.chain.Chain; import com.yahoo.component.chain.dependencies.After; +import com.yahoo.concurrent.Receiver; +import com.yahoo.concurrent.Receiver.MessageState; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.handler.VipStatus; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.jdisc.Metric; import com.yahoo.net.HostName; import com.yahoo.prelude.IndexFacts; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; import com.yahoo.prelude.fastsearch.FS4ResourcePool; @@ -39,7 +46,11 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Logger; import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING; @@ -53,6 +64,10 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S @After("*") public class ClusterSearcher extends Searcher { + private final static Logger log = Logger.getLogger(ClusterSearcher.class.getName()); + + private final ClusterMonitor monitor; + private final Value cacheHitRatio; private final String clusterModelName; @@ -63,6 +78,8 @@ public class ClusterSearcher extends Searcher { // Mapping from rank profile names to document types containing them private final Map<String, Set<String>> rankProfiles = new HashMap<>(); + private final FS4ResourcePool fs4ResourcePool; + private final long maxQueryTimeout; // in milliseconds private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; @@ -71,6 +88,7 @@ public class ClusterSearcher extends Searcher { private VespaBackEndSearcher server = null; + /** * Creates a new ClusterSearcher. */ @@ -78,6 +96,7 @@ public class ClusterSearcher extends Searcher { QrSearchersConfig qrsConfig, ClusterConfig clusterConfig, DocumentdbInfoConfig documentDbConfig, + QrMonitorConfig monitorConfig, DispatchConfig dispatchConfig, ClusterInfoConfig clusterInfoConfig, Statistics manager, @@ -85,8 +104,13 @@ public class ClusterSearcher extends Searcher { FS4ResourcePool fs4ResourcePool, VipStatus vipStatus) { super(id); + this.fs4ResourcePool = fs4ResourcePool; + + Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, fs4ResourcePool, clusterInfoConfig.nodeCount(), vipStatus, metric); - Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus, metric); + monitor = (dispatcher.searchCluster().directDispatchTarget().isPresent()) // dispatcher should decide vip status instead + ? new ClusterMonitor(this, monitorConfig, Optional.empty()) + : new ClusterMonitor(this, monitorConfig, Optional.of(vipStatus)); int searchClusterIndex = clusterConfig.clusterId(); clusterModelName = clusterConfig.clusterName(); @@ -124,8 +148,9 @@ public class ClusterSearcher extends Searcher { for (int dispatcherIndex = 0; dispatcherIndex < searchClusterConfig.dispatcher().size(); dispatcherIndex++) { try { if ( ! isRemote(searchClusterConfig.dispatcher(dispatcherIndex).host())) { - FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool.getServerId(), docSumParams, - documentDbConfig, dispatcher, dispatcherIndex); + Backend dispatchBackend = createBackend(searchClusterConfig.dispatcher(dispatcherIndex)); + FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool, docSumParams, + documentDbConfig, dispatchBackend, dispatcher, dispatcherIndex); addBackendSearcher(searcher); } } catch (UnknownHostException e) { @@ -137,6 +162,8 @@ public class ClusterSearcher extends Searcher { if ( server == null ) { throw new IllegalStateException("ClusterSearcher should have a top level dispatch."); } + monitor.freeze(); + monitor.startPingThread(); } private static QrSearchersConfig.Searchcluster getSearchClusterConfigFromClusterName(QrSearchersConfig config, String name) { @@ -162,14 +189,15 @@ public class ClusterSearcher extends Searcher { } private static FastSearcher searchDispatch(int searchclusterIndex, - String serverId, + FS4ResourcePool fs4ResourcePool, SummaryParameters docSumParams, DocumentdbInfoConfig documentdbInfoConfig, + Backend backend, Dispatcher dispatcher, int dispatcherIndex) { ClusterParams clusterParams = makeClusterParams(searchclusterIndex, dispatcherIndex); - return new FastSearcher(serverId, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); + return new FastSearcher(backend, fs4ResourcePool, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); } private static VdsStreamingSearcher vdsCluster(String serverId, @@ -182,7 +210,8 @@ public class ClusterSearcher extends Searcher { throw new IllegalArgumentException("Search clusters in streaming search shall only contain a single searchdefinition : " + searchClusterConfig.searchdef()); } ClusterParams clusterParams = makeClusterParams(searchclusterIndex, 0); - VdsStreamingSearcher searcher = new VdsStreamingSearcher(); + VdsStreamingSearcher searcher = (VdsStreamingSearcher) VespaBackEndSearcher + .getSearcher("com.yahoo.vespa.streamingvisitors.VdsStreamingSearcher"); searcher.setSearchClusterConfigId(searchClusterConfig.rankprofiles().configid()); searcher.setDocumentType(searchClusterConfig.searchdef(0)); searcher.setStorageClusterRouteSpec(searchClusterConfig.storagecluster().routespec()); @@ -193,14 +222,25 @@ public class ClusterSearcher extends Searcher { /** Do not use, for internal testing purposes only. **/ ClusterSearcher(Set<String> documentTypes) { this.documentTypes = documentTypes; + monitor = new ClusterMonitor(this, new QrMonitorConfig(new QrMonitorConfig.Builder()), Optional.of(new VipStatus())); cacheHitRatio = new Value("com.yahoo.prelude.cluster.ClusterSearcher.ClusterSearcher().dummy", Statistics.nullImplementation, new Value.Parameters()); clusterModelName = "testScenario"; + fs4ResourcePool = null; maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT; maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT; } + private Backend createBackend(QrSearchersConfig.Searchcluster.Dispatcher disp) { + return fs4ResourcePool.getBackend(disp.host(), disp.port()); + } + + ClusterMonitor getMonitor() { + return monitor; + } + void addBackendSearcher(VespaBackEndSearcher searcher) { + monitor.add(searcher); server = searcher; } @@ -439,6 +479,77 @@ public class ClusterSearcher extends Searcher { cacheHitRatio.put(0.0); } + /** NodeManager method, called from ClusterMonitor. */ + void working(VespaBackEndSearcher node) { + server = node; + } + + /** Called from ClusterMonitor. */ + void failed(VespaBackEndSearcher node) { + server = null; + } + + /** + * Pinging a node, called from ClusterMonitor. + */ + void ping(VespaBackEndSearcher node) throws InterruptedException { + log.fine("Sending ping to: " + node); + Pinger pinger = new Pinger(node); + + getExecutor().execute(pinger); + Pong pong = pinger.getPong(); // handles timeout + if (pong == null) { + monitor.failed(node, ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out.")); + } else if (pong.badResponse()) { + monitor.failed(node, pong.getError(0)); + } else { + monitor.responded(node, backendCanServeDocuments(pong)); + } + } + + private boolean backendCanServeDocuments(Pong pong) { + if ( ! pong.activeNodes().isPresent()) return true; // no information; assume true + return pong.activeNodes().get() > 0; + } + @Override - public void deconstruct() { } + public void deconstruct() { + monitor.shutdown(); + } + + ExecutorService getExecutor() { + return fs4ResourcePool.getExecutor(); + } + + ScheduledExecutorService getScheduledExecutor() { + return fs4ResourcePool.getScheduledExecutor(); + } + + private class Pinger implements Runnable { + + private final Searcher searcher; + private final Ping pingChallenge = new Ping(monitor.getConfiguration().getRequestTimeout()); + private final Receiver<Pong> pong = new Receiver<>(); + + Pinger(final Searcher searcher) { + this.searcher = searcher; + } + + @Override + public void run() { + pong.put(createExecution().ping(pingChallenge)); + } + + private Execution createExecution() { + return new Execution(new Chain<>(searcher), + new Execution.Context(null, null, null, null, null)); + } + + public Pong getPong() throws InterruptedException { + Tuple2<MessageState, Pong> reply = pong.get(pingChallenge.getTimeout() + 150); + return (reply.first != MessageState.VALID) ? null : reply.second; + } + + } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java index de07839e3e3..524e842eacd 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java @@ -10,6 +10,8 @@ package com.yahoo.prelude.fastsearch; +import java.nio.ByteBuffer; + import com.yahoo.prelude.hitfield.RawData; import com.yahoo.data.access.simple.Value; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java index 329a9caaf91..4f52ef91725 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java @@ -6,6 +6,7 @@ import com.yahoo.log.LogLevel; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java index f6f8006d2d2..6b1445229ec 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java @@ -1,6 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java new file mode 100644 index 00000000000..59bc781c8b2 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java @@ -0,0 +1,184 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.GetDocSumsPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; + +import java.io.IOException; +import java.util.Iterator; + +import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator; + +/** + * {@link FillInvoker} implementation for FS4 nodes and fdispatch + * + * @author ollivir + */ +public class FS4FillInvoker extends FillInvoker { + private final VespaBackEndSearcher searcher; + private FS4Channel channel; + + private int expectedFillResults = 0; + + // fdispatch code path + FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) { + this.searcher = searcher; + this.channel = backend.openChannel(); + channel.setQuery(query); + } + + @Override + protected void sendFillRequest(Result result, String summaryClass) { + + if (countUnfilledFastHits(result, summaryClass) > 0) { + try { + expectedFillResults = requestSummaries(result, summaryClass); + } catch (InvalidChannelException e) { + result.hits() + .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage())); + } + } else { + expectedFillResults = 0; + } + } + + + @Override + protected void getFillResults(Result result, String summaryClass) { + if (expectedFillResults == 0) { + return; + } + + Packet[] receivedPackets; + try { + receivedPackets = getSummaryResponses(result); + } catch (InvalidChannelException e1) { + result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); + return; + } catch (ChannelTimeoutException e1) { + result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); + return; + } + + if (receivedPackets.length == 0) { + result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); + return; + } + + int skippedHits; + try { + FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass); + skippedHits = fillHitsResult.skippedHits; + if (fillHitsResult.error != null) { + result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); + return; + } + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); + return; + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage())); + return; + } + + if (skippedHits > 0) + result.hits().addError( + ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits")); + result.analyzeHits(); + + if (channel.getQuery().getTraceLevel() >= 3) { + int hitNumber = 0; + for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { + com.yahoo.search.result.Hit hit = i.next(); + if (!(hit instanceof FastHit)) + continue; + FastHit fastHit = (FastHit) hit; + + String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend"); + if (!fastHit.isFilled(summaryClass)) + traceMsg += ". Error, hit, not filled"; + channel.getQuery().trace(traceMsg, false, 3); + } + } + } + + @Override + public void release() { + if (channel != null) { + channel.close(); + channel = null; + } + } + + private int countUnfilledFastHits(Result result, String summaryClass) { + int count = 0; + for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { + Hit hit = i.next(); + if (hit instanceof FastHit && !hit.isFilled(summaryClass)) + count++; + } + return count; + } + + private int requestSummaries(Result result, String summaryClass) throws InvalidChannelException, ClassCastException, IOException { + + boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery()); + if (result.getQuery().getTraceLevel() >= 3) + result.getQuery().trace((summaryNeedsQuery ? "FS4: Resending " : "Not resending ") + "query during document summary fetching", 3); + + GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery); + int compressionLimit = result.getQuery().properties().getInteger(FS4SearchInvoker.PACKET_COMPRESSION_LIMIT, 0); + docsumsPacket.setCompressionLimit(compressionLimit); + if (compressionLimit != 0) { + docsumsPacket.setCompressionType(result.getQuery().properties().getString(FS4SearchInvoker.PACKET_COMPRESSION_TYPE, "lz4")); + } + + boolean couldSend = channel.sendPacket(docsumsPacket); + if (!couldSend) + throw new IOException("Could not successfully send GetDocSumsPacket."); + + return docsumsPacket.getNumDocsums() + 1; + } + + private Packet[] getSummaryResponses(Result result) throws InvalidChannelException, ChannelTimeoutException { + if(expectedFillResults == 0) { + return new Packet[0]; + } + BasicPacket[] receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), expectedFillResults); + + return convertBasicPackets(receivedPackets); + } + + private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException { + // trying to cast a BasicPacket[] to Packet[] will compile, + // but lead to a runtime error. At least that's what I got + // from testing and reading the specification. I'm just happy + // if someone tells me what's the proper Java way of doing + // this. -SK + Packet[] packets = new Packet[basicPackets.length]; + + for (int i = 0; i < basicPackets.length; i++) { + packets[i] = (Packet) basicPackets[i]; + } + return packets; + } + + private String getName() { + return searcher.getName(); + } +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java new file mode 100644 index 00000000000..2abaf341c58 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java @@ -0,0 +1,29 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.prelude.Pong; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.PingFactory; +import com.yahoo.search.dispatch.searchcluster.Pinger; + +import java.util.concurrent.Callable; + +/** + * FS4PingFactory constructs {@link Pinger} objects that communicate with + * content nodes or dispatchers over the fnet/FS4 protocol + * + * @author ollivir + */ +public class FS4PingFactory implements PingFactory { + private final FS4ResourcePool fs4ResourcePool; + + public FS4PingFactory(FS4ResourcePool fs4ResourcePool) { + this.fs4ResourcePool = fs4ResourcePool; + } + + @Override + public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) { + return new Pinger(node, monitor, fs4ResourcePool); + } +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java index ed9eb72d7dd..f85a4019b78 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java @@ -5,6 +5,14 @@ import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.QrConfig; +import com.yahoo.container.search.Fs4Config; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.ConnectionPool; +import com.yahoo.fs4.mplex.ListenerPool; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -14,7 +22,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * All users will get the same pool instance. + * Provider for {@link com.yahoo.fs4.mplex.ListenerPool}. All users will get the same pool instance. * * @author baldersheim */ @@ -24,18 +32,22 @@ public class FS4ResourcePool extends AbstractComponent { private static final AtomicInteger instanceCounter = new AtomicInteger(0); private final String serverId; private final int instanceId; + private final ListenerPool listeners; + private final Timer timer = new Timer(); // This is a timer for cleaning the closed connections + private final Map<String, Backend> connectionPoolMap = new HashMap<>(); private final ExecutorService executor; private final ScheduledExecutorService scheduledExecutor; @Inject - public FS4ResourcePool(QrConfig config) { - this(config.discriminator()); + public FS4ResourcePool(Fs4Config fs4Config, QrConfig config) { + this(config.discriminator(), fs4Config.numlistenerthreads()); } - public FS4ResourcePool(String serverId) { + public FS4ResourcePool(String serverId, int listenerThreads) { this.serverId = serverId; instanceId = instanceCounter.getAndIncrement(); String name = "FS4-" + instanceId; + listeners = new ListenerPool(name, listenerThreads); executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(name)); scheduledExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryFactory.getDaemonThreadFactory(name + ".scheduled")); } @@ -45,10 +57,28 @@ public class FS4ResourcePool extends AbstractComponent { public ExecutorService getExecutor() { return executor; } public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; } + public Backend getBackend(String host, int port) { + String key = host + ":" + port; + synchronized (connectionPoolMap) { + Backend pool = connectionPoolMap.get(key); + if (pool == null) { + pool = new Backend(host, port, serverId, listeners, new ConnectionPool(timer)); + connectionPoolMap.put(key, pool); + } + return pool; + } + } + @Override public void deconstruct() { logger.log(Level.INFO, "Deconstructing FS4ResourcePool with id '" + instanceId + "'."); super.deconstruct(); + listeners.close(); + timer.cancel(); + for (Backend backend : connectionPoolMap.values()) { + backend.shutdown(); + backend.close(); + } executor.shutdown(); scheduledExecutor.shutdown(); try { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java new file mode 100644 index 00000000000..f3867288b29 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -0,0 +1,220 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.data.access.simple.Value; +import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.DocumentInfo; +import com.yahoo.fs4.FS4Properties; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.QueryPacketData; +import com.yahoo.fs4.QueryResultPacket; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.ConfigurationException; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.InvokerResult; +import com.yahoo.search.dispatch.LeanHit; +import com.yahoo.search.dispatch.ResponseMonitor; +import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.query.Sorting; +import com.yahoo.search.result.Coverage; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Relevance; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.searchlib.aggregation.Grouping; +import com.yahoo.slime.BinaryFormat; +import com.yahoo.vespa.objects.BufferSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.logging.Logger; + +/** + * {@link SearchInvoker} implementation for FS4 nodes and fdispatch + * + * @author ollivir + */ +public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<FS4Channel> { + static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); + static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype"); + + private static final Logger log = Logger.getLogger(FS4SearchInvoker.class.getName()); + + private final VespaBackEndSearcher searcher; + private FS4Channel channel; + + private ErrorMessage pendingSearchError = null; + private Query query = null; + private QueryPacket queryPacket = null; + + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, Optional<Node> node) { + super(node); + this.searcher = searcher; + this.channel = channel; + + channel.setQuery(query); + channel.setResponseMonitor(this); + } + + @Override + protected void sendSearchRequest(Query query) throws IOException { + log.finest("sending query packet"); + + this.query = query; + createQueryPacket(searcher.getServerId(), query); + + try { + boolean couldSend = channel.sendPacket(queryPacket); + if (!couldSend) { + setPendingError("Could not reach '" + getName() + "'"); + } + } catch (InvalidChannelException e) { + setPendingError("Invalid channel " + getName()); + } catch (IllegalStateException e) { + setPendingError("Illegal state in FS4: " + e.getMessage()); + } + } + + private void setPendingError(String message) { + pendingSearchError = ErrorMessage.createBackendCommunicationError(message); + responseAvailable(); + } + + @Override + protected InvokerResult getSearchResult(Execution execution) throws IOException { + if (pendingSearchError != null) { + return errorResult(query, pendingSearchError); + } + BasicPacket[] basicPackets; + + try { + basicPackets = channel.receivePackets(query.getTimeLeft(), 1); + } catch (ChannelTimeoutException e) { + return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); + } catch (InvalidChannelException e) { + return errorResult(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); + } + + if (basicPackets.length == 0) { + return errorResult(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); + } + + log.finest(() -> "got packets " + basicPackets.length + " packets"); + + basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); + QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; + + log.finest(() -> "got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); + + if (query.getPresentation().getSummary() == null) + query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); + + InvokerResult result = new InvokerResult(query, resultPacket.getDocumentCount()); + + addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result.getResult()); + addUnfilledHits(result.getLeanHits(), resultPacket.getDocuments(), queryPacket.getQueryPacketData()); + + return result; + } + + private QueryPacket createQueryPacket(String serverId, Query query) { + this.queryPacket = QueryPacket.create(serverId, query); + int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); + queryPacket.setCompressionLimit(compressionLimit); + if (compressionLimit != 0) { + queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); + } + log.fine(() -> "made QueryPacket: " + queryPacket); + + return queryPacket; + } + + private void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result) { + result.setTotalHitCount(resultPacket.getTotalDocumentCount()); + + addBackendTrace(query, resultPacket); + + // Grouping + if (resultPacket.getGroupData() != null) { + byte[] data = resultPacket.getGroupData(); + ArrayList<Grouping> list = new ArrayList<>(); + BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(ByteBuffer.wrap(data))); + int cnt = buf.getInt(null); + for (int i = 0; i < cnt; i++) { + Grouping g = new Grouping(); + g.deserialize(buf); + list.add(g); + } + GroupingListHit hit = new GroupingListHit(list, searcher.getDocsumDefinitionSet(query)); + hit.setQuery(result.getQuery()); + hit.setSource(getName()); + hit.setQueryPacketData(queryPacketData); + result.hits().add(hit); + } + + if (resultPacket.getCoverageFeature()) { + result.setCoverage(new Coverage(resultPacket.getCoverageDocs(), resultPacket.getActiveDocs(), resultPacket.getNodesReplied()) + .setSoonActive(resultPacket.getSoonActiveDocs()) + .setDegradedReason(resultPacket.getDegradedReason()) + .setNodesTried(resultPacket.getNodesQueried())); + } + } + + private void addBackendTrace(Query query, QueryResultPacket resultPacket) { + if (resultPacket.propsArray == null) return; + Value.ArrayValue traces = new Value.ArrayValue(); + for (FS4Properties properties : resultPacket.propsArray) { + if ( ! properties.getName().equals("trace")) continue; + for (FS4Properties.Entry entry : properties.getEntries()) { + traces.add(new SlimeAdapter(BinaryFormat.decode(entry.getValue()).get())); + } + } + query.trace(traces, query.getTraceLevel()); + } + + /** + * Creates unfilled hits from a List of DocumentInfo instances. + */ + private void addUnfilledHits(List<LeanHit> result, List<DocumentInfo> documents, QueryPacketData queryPacketData) { + Optional<Integer> channelDistributionKey = distributionKey(); + + for (DocumentInfo document : documents) { + byte [] sortData = document.getSortData(); + LeanHit hit = (sortData == null) + ? new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getMetric()) + : new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getSortData()); + if (queryPacketData != null) { + hit.setQueryPacketData(queryPacketData); + } + result.add(hit); + } + } + + @Override + public void release() { + if (channel != null) { + channel.close(); + channel = null; + } + } + + private String getName() { + return searcher.getName(); + } + + @Override + public void responseAvailable(FS4Channel from) { + responseAvailable(); + } + +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java index 244fad4efde..f7f2d08d713 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java @@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch; import com.yahoo.data.access.ObjectTraverser; import com.yahoo.document.GlobalId; +import com.yahoo.fs4.QueryPacketData; import com.yahoo.net.URI; import com.yahoo.search.query.Sorting; import com.yahoo.search.result.Hit; @@ -39,6 +40,9 @@ public class FastHit extends Hit { /** The global id of this document in the backend node which produced it */ private byte [] globalId; + //TODO Remove with fs4 + private transient QueryPacketData queryPacketData = null; + private transient byte[] sortData = null; // TODO I supect this one can be dropped. private transient Sorting sortDataSorting = null; @@ -143,6 +147,27 @@ public class FastHit extends Hit { /** Sets the index of the node this hit originated at */ public void setDistributionKey(int distributionKey) { this.distributionKey = distributionKey; } + /** + * Add the binary data common for the query packet to a Vespa backend and a + * summary fetch packet to a Vespa backend. This method can only be called + * once for a single hit. + * + * @param queryPacketData binary data from a query packet resulting in this hit + * @throws IllegalStateException if the method is called more than once + * @throws NullPointerException if trying to set query packet data to null + */ + public void setQueryPacketData(QueryPacketData queryPacketData) { + if (this.queryPacketData != null) + throw new IllegalStateException("Query packet data already set to " + + this.queryPacketData + ", tried to set it to " + queryPacketData); + if (queryPacketData == null) + throw new NullPointerException("Query packet data reference can not be set to null."); + this.queryPacketData = queryPacketData; + } + + /** Returns a serial encoding of the query which produced this hit, ot null if not available. */ + public QueryPacketData getQueryPacketData() { return queryPacketData; } + public void setSortData(byte[] data, Sorting sorting) { this.sortData = data; this.sortDataSorting = sorting; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index b0b3a7800e9..6b0041a9e86 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -1,14 +1,23 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.PingPacket; +import com.yahoo.fs4.PongPacket; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; import com.yahoo.prelude.Ping; import com.yahoo.prelude.Pong; import com.yahoo.prelude.querytransform.QueryRewrite; +import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; @@ -37,13 +46,21 @@ import static com.yahoo.container.util.Util.quote; // catch and unwrap into a results with an error in high level methods. -Jon public class FastSearcher extends VespaBackEndSearcher { + /** If this is turned on this will make search queries directly to the local search node when possible */ + private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct"); + /** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */ private final Dispatcher dispatcher; + private final Backend dispatchBackend; + private final FS4ResourcePool fs4ResourcePool; + /** * Creates a Fastsearcher. * - * @param serverId the resource pool used to create direct connections to the local search nodes when + * @param dispatchBackend The backend object containing the connection to the dispatch node this should talk to + * over the fs4 protocol + * @param fs4ResourcePool the resource pool used to create direct connections to the local search nodes when * bypassing the dispatch node * @param dispatcher the dispatcher used (when enabled) to send summary requests over the rpc protocol. * Eventually we will move everything to this protocol and never use dispatch nodes. @@ -53,11 +70,13 @@ public class FastSearcher extends VespaBackEndSearcher { * @param clusterParams the cluster number, and other cluster backend parameters * @param documentdbInfoConfig document database parameters */ - public FastSearcher(String serverId, Dispatcher dispatcher, + public FastSearcher(Backend dispatchBackend, FS4ResourcePool fs4ResourcePool, Dispatcher dispatcher, SummaryParameters docSumParams, ClusterParams clusterParams, DocumentdbInfoConfig documentdbInfoConfig) { - init(serverId, docSumParams, clusterParams, documentdbInfoConfig); + init(fs4ResourcePool.getServerId(), docSumParams, clusterParams, documentdbInfoConfig); + this.dispatchBackend = dispatchBackend; this.dispatcher = dispatcher; + this.fs4ResourcePool = fs4ResourcePool; } /** @@ -65,7 +84,58 @@ public class FastSearcher extends VespaBackEndSearcher { */ @Override public Pong ping(Ping ping, Execution execution) { - throw new IllegalStateException("This ping should not have been called."); + return ping(ping, dispatchBackend, getName()); + } + + public static Pong ping(Ping ping, Backend backend, String name) { + FS4Channel channel = backend.openPingChannel(); + + // If you want to change this code, you need to understand + // com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and + // com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage) + try { + PingPacket pingPacket = new PingPacket(); + try { + boolean couldSend = channel.sendPacket(pingPacket); + if ( ! couldSend) { + return new Pong(ErrorMessage.createBackendCommunicationError("Could not ping " + name)); + } + } catch (InvalidChannelException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel " + name)); + } catch (IllegalStateException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); + } catch (IOException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("IO error while sending ping: " + e.getMessage())); + } + + // We should only get a single packet + BasicPacket[] packets; + + try { + packets = channel.receivePackets(ping.getTimeout(), 1); + } catch (ChannelTimeoutException e) { + return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name)); + } catch (InvalidChannelException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name)); + } + + if (packets.length == 0) { + return new Pong(ErrorMessage.createBackendCommunicationError(name + " got no packets back")); + } + + try { + packets[0].ensureInstanceOf(PongPacket.class, name); + } catch (TimeoutException e) { + return new Pong(ErrorMessage.createTimeout(e.getMessage())); + } catch (IOException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Unexpected packet class returned after ping: " + e.getMessage())); + } + return new Pong((PongPacket)packets[0]); + } finally { + if (channel != null) { + channel.close(); + } + } } @Override @@ -147,7 +217,18 @@ public class FastSearcher extends VespaBackEndSearcher { * on the same host. */ private SearchInvoker getSearchInvoker(Query query) { - return dispatcher.getSearchInvoker(query, this).get(); + Optional<SearchInvoker> invoker = dispatcher.getSearchInvoker(query, this); + if (invoker.isPresent()) { + return invoker.get(); + } + + Optional<Node> direct = getDirectNode(query); + if(direct.isPresent()) { + var node = direct.get(); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4SearchInvoker(this, query, backend.openChannel(), direct); + } + return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty()); } /** @@ -156,17 +237,47 @@ public class FastSearcher extends VespaBackEndSearcher { * content nodes. */ private FillInvoker getFillInvoker(Result result) { - return dispatcher.getFillInvoker(result, this).get(); + Query query = result.getQuery(); + Optional<FillInvoker> invoker = dispatcher.getFillInvoker(result, this); + if (invoker.isPresent()) { + return invoker.get(); + } + + Optional<Node> direct = getDirectNode(query); + if (direct.isPresent()) { + var node = direct.get(); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4FillInvoker(this, query, backend); + } + return new FS4FillInvoker(this, query, dispatchBackend); } + /** + * If the query can be directed to a single local content node, returns that node. Otherwise, + * returns an empty value. + */ + private Optional<Node> getDirectNode(Query query) { + if (!query.properties().getBoolean(dispatchDirect, true)) + return Optional.empty(); + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) + return Optional.empty(); + + Optional<Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); + if (!directDispatchRecipient.isPresent()) + return Optional.empty(); + // Dispatch directly to the single, local search node + Node local = directDispatchRecipient.get(); + query.trace(false, 2, "Dispatching directly to ", local); + return Optional.of(local); + } private static Optional<String> quotedSummaryClass(String summaryClass) { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } public String toString() { - return "fast searcher (" + getName() + ") "; + return "fast searcher (" + getName() + ") " + dispatchBackend; } protected boolean isLoggingFine() { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java index 740b9592efc..e0d569c6ae1 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java @@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch; import java.util.List; +import com.yahoo.fs4.QueryPacketData; import com.yahoo.search.result.Hit; import com.yahoo.searchlib.aggregation.Grouping; @@ -26,4 +27,15 @@ public class GroupingListHit extends Hit { private final List<Grouping> groupingList; private final DocsumDefinitionSet defs; + private QueryPacketData queryPacketData; + + public void setQueryPacketData(QueryPacketData queryPacketData) { + this.queryPacketData = queryPacketData; + } + + /** Returns encoded query data from the query used to create this, or null if none present */ + public QueryPacketData getQueryPacketData() { + return queryPacketData; + } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java index 396a84a28bd..f690d9d4da4 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java index bec39393359..a02d9813793 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java index 388c96b453d..bf77c517d50 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; +import java.nio.ByteBuffer; + +import com.yahoo.io.SlowInflate; import com.yahoo.prelude.hitfield.RawData; import com.yahoo.data.access.simple.Value; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java index b94c902693a..5e3d0babe98 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java index 4df12bd82bd..408cbbbb62d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java @@ -5,6 +5,10 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + +import com.yahoo.text.Utf8; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index 8f4b49ac71e..430ad015493 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch; import com.yahoo.collections.TinyIdentitySet; import com.yahoo.fs4.DocsumPacket; +import com.yahoo.fs4.Packet; import com.yahoo.prelude.query.Item; import com.yahoo.prelude.query.NullItem; import com.yahoo.prelude.query.textualrepresentation.TextualQueryRepresentation; @@ -19,6 +20,8 @@ import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; import com.yahoo.searchlib.aggregation.Grouping; +import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; @@ -48,7 +51,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { private String defaultDocsumClass = null; /** Returns an iterator which returns all hits below this result **/ - private static Iterator<Hit> hitIterator(Result result) { + static Iterator<Hit> hitIterator(Result result) { return result.hits().unorderedDeepIterator(); } @@ -227,7 +230,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if ((query.getTraceLevel()<level) || query.properties().getBoolean(TRACE_DISABLE)) return; StringBuilder s = new StringBuilder(); - s.append(sourceName).append(" ").append(type).append(" to dispatch: ") + s.append(sourceName).append(" " + type + " to dispatch: ") .append("query=[") .append(query.getModel().getQueryTree().getRoot().toString()) .append("]"); @@ -317,7 +320,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { } } - private FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) { + FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) { if (packet != null) { byte[] docsumdata = packet.getData(); if (docsumdata.length > 0) { @@ -341,7 +344,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * @return the number of hits that we did not return data for, and an optional error message. * when things are working normally we return 0. */ - protected FillHitsResult fillHits(Result result, DocsumPacket[] packets, String summaryClass) { + public FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException { int skippedHits = 0; String lastError = null; int packetIndex = 0; @@ -351,7 +354,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if (hit instanceof FastHit && ! hit.isFilled(summaryClass)) { FastHit fastHit = (FastHit) hit; - DocsumPacket docsum = packets[packetIndex]; + packets[packetIndex].ensureInstanceOf(DocsumPacket.class, getName()); + DocsumPacket docsum = (DocsumPacket) packets[packetIndex]; packetIndex++; FillHitResult fr = fillHit(fastHit, docsum, summaryClass); @@ -380,7 +384,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return decodeSummary(summaryClass, hit, docsumdata, db.getDocsumDefinitionSet()); } - private static String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) { + private String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) { String error = docsumSet.lazyDecode(summaryClass, docsumdata, hit); if (error == null) { hit.setFilled(summaryClass); @@ -388,6 +392,28 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return error; } + @SuppressWarnings("rawtypes") + public static VespaBackEndSearcher getSearcher(String s) { + try { + Class c = Class.forName(s); + if (VespaBackEndSearcher.class.isAssignableFrom(c)) { + Constructor[] constructors = c.getConstructors(); + for (Constructor constructor : constructors) { + Class[] parameters = constructor.getParameterTypes(); + if (parameters.length == 0) { + return (VespaBackEndSearcher) constructor.newInstance(); + } + } + throw new RuntimeException("Failed initializing " + s); + + } else { + throw new RuntimeException(s + " is not com.yahoo.prelude.fastsearch.VespaBackEndSearcher"); + } + } catch (Exception e) { + throw new RuntimeException("Failure loading class " + s + ", exception :" + e); + } + } + protected boolean isLoggingFine() { return getLogger().isLoggable(Level.FINE); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java index 00bdc474119..d768dda2657 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java @@ -1,5 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - +/** + * Class converting data (historically XML-encoded) from a document summary field. + * This has only been used to represent geographical positions. + */ package com.yahoo.prelude.fastsearch; import com.yahoo.data.access.Inspector; @@ -8,8 +11,6 @@ import com.yahoo.prelude.hitfield.XMLString; import com.yahoo.search.result.PositionsData; /** - * Class converting data (historically XML-encoded) from a document summary field. - * This has only been used to represent geographical positions. * @author Steinar Knutsen */ public class XMLField extends DocsumField { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 58f73ea52cc..a5c6f3650e0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -4,6 +4,8 @@ package com.yahoo.search.dispatch; import com.yahoo.component.AbstractComponent; import com.yahoo.container.handler.VipStatus; import com.yahoo.jdisc.Metric; +import com.yahoo.prelude.fastsearch.FS4PingFactory; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; @@ -51,6 +53,9 @@ public class Dispatcher extends AbstractComponent { private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3; + /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ + public static final CompoundName dispatchInternal = CompoundName.fromComponents(DISPATCH, INTERNAL); + /** If enabled, search queries will use protobuf rpc */ public static final CompoundName dispatchProtobuf = CompoundName.fromComponents(DISPATCH, PROTOBUF); @@ -59,6 +64,7 @@ public class Dispatcher extends AbstractComponent { private final LoadBalancer loadBalancer; private final boolean multilevelDispatch; + private final boolean internalDispatchByDefault; private final InvokerFactory invokerFactory; @@ -80,13 +86,15 @@ public class Dispatcher extends AbstractComponent { public static Dispatcher create(String clusterId, DispatchConfig dispatchConfig, + FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus, Metric metric) { var searchCluster = new SearchCluster(clusterId, dispatchConfig, containerClusterSize, vipStatus); var rpcFactory = new RpcInvokerFactory(new RpcResourcePool(dispatchConfig), searchCluster, !dispatchConfig.useFdispatchByDefault()); + var pingFactory = dispatchConfig.useFdispatchByDefault()? new FS4PingFactory(fs4ResourcePool) : rpcFactory; - return new Dispatcher(searchCluster, dispatchConfig, rpcFactory, rpcFactory, metric); + return new Dispatcher(searchCluster, dispatchConfig, rpcFactory, pingFactory, metric); } public Dispatcher(SearchCluster searchCluster, @@ -97,8 +105,9 @@ public class Dispatcher extends AbstractComponent { this.searchCluster = searchCluster; this.loadBalancer = new LoadBalancer(searchCluster, dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); - this.invokerFactory = invokerFactory; this.multilevelDispatch = dispatchConfig.useMultilevelDispatch(); + this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault(); + this.invokerFactory = invokerFactory; this.metric = metric; this.metricContext = metric.createContext(null); @@ -116,11 +125,15 @@ public class Dispatcher extends AbstractComponent { } public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher) { - return invokerFactory.createFillInvoker(searcher, result); + Optional<FillInvoker> invoker = invokerFactory.createFillInvoker(searcher, result); + if (invoker.isPresent()) { + return invoker; + } + return Optional.empty(); } public Optional<SearchInvoker> getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - if (multilevelDispatch) { + if (multilevelDispatch || ! query.properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { emitDispatchMetric(Optional.empty()); return Optional.empty(); } @@ -135,6 +148,7 @@ public class Dispatcher extends AbstractComponent { query.setOffset(0); } emitDispatchMetric(invoker); + query.properties().set(dispatchInternal, true); return invoker; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java index ce4891d7c25..cd8228624c5 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java @@ -39,6 +39,9 @@ public class InvokerResult { if (hit.hasSortData()) { fh.setSortData(hit.getSortData(), sorting); } + if (hit.getQueryPacketData() != null) { + fh.setQueryPacketData(hit.getQueryPacketData()); + } fh.setQuery(query); fh.setFillable(); fh.setCached(false); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java b/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java index 0749d8476c2..63302bee8c1 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java @@ -1,5 +1,6 @@ package com.yahoo.search.dispatch; +import com.yahoo.fs4.QueryPacketData; import java.util.Arrays; @@ -9,7 +10,8 @@ public class LeanHit implements Comparable<LeanHit> { private final byte [] sortData; private final int partId; private final int distributionKey; - + //TODO Remove when FS4 is gone + private QueryPacketData queryPacketData; public LeanHit(byte [] gid, int partId, int distributionKey, double relevance) { this.gid = gid; this.relevance = Double.isNaN(relevance) ? Double.NEGATIVE_INFINITY : relevance; @@ -31,6 +33,12 @@ public class LeanHit implements Comparable<LeanHit> { public int getPartId() { return partId; } public int getDistributionKey() { return distributionKey; } + QueryPacketData getQueryPacketData() { return queryPacketData; } + + public void setQueryPacketData(QueryPacketData queryPacketData) { + this.queryPacketData = queryPacketData; + } + @Override public int compareTo(LeanHit o) { int res = (sortData != null) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index d9a76965c3e..6146751f35f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -45,12 +45,15 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory { Query query = result.getQuery(); boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query); - boolean useProtoBuf = query.properties().getBoolean(Dispatcher.dispatchProtobuf, dispatchWithProtobuf); - boolean useDispatchDotSummaries = query.properties().getBoolean(dispatchSummaries, false); - return ((useDispatchDotSummaries || !useProtoBuf) && ! summaryNeedsQuery) - ? Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))) - : Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery)); + if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, dispatchWithProtobuf)) { + return Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery)); + } + if (query.properties().getBoolean(dispatchSummaries, true) && ! summaryNeedsQuery) { + return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))); + } else { + return Optional.empty(); + } } // for testing diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java new file mode 100644 index 00000000000..dea6f741bb0 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java @@ -0,0 +1,40 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; +import com.yahoo.prelude.fastsearch.FastSearcher; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.yolean.Exceptions; + +import java.util.concurrent.Callable; + +/** + * @author bratseth + * @author ollivir + */ +public class Pinger implements Callable<Pong> { + private final Node node; + private final ClusterMonitor<Node> clusterMonitor; + private final FS4ResourcePool fs4ResourcePool; + + public Pinger(Node node, ClusterMonitor<Node> clusterMonitor, FS4ResourcePool fs4ResourcePool) { + this.node = node; + this.clusterMonitor = clusterMonitor; + this.fs4ResourcePool = fs4ResourcePool; + } + + public Pong call() { + try { + Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), + fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString()); + return pong; + } catch (RuntimeException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + + Exceptions.toMessageString(e))); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 85cc16ade6f..3657e0b5c76 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -186,11 +186,17 @@ public class SearchCluster implements NodeManager<Node> { } /** + * Returns the nodes of this cluster as an immutable map indexed by host. + * One host may contain multiple nodes (on different ports), so this is a multi-map. + */ + public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; } + + /** * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), * or empty if we should not dispatch directly. */ public Optional<Node> directDispatchTarget() { - if ( directDispatchTarget.isEmpty()) return Optional.empty(); + if ( ! directDispatchTarget.isPresent()) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage Group localSearchGroup = groups.get(directDispatchTarget.get().group()); @@ -223,26 +229,24 @@ public class SearchCluster implements NodeManager<Node> { private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { // update VIP status if we direct dispatch to this group and coverage status changed - boolean isInRotation = vipStatus.isInRotation(); - boolean hasChanged = sufficientCoverage != group.hasSufficientCoverage(); - boolean isDirectDispatchGroupAndChange = usesDirectDispatchTo(group) && hasChanged; - group.setHasSufficientCoverage(sufficientCoverage); - if ((!isInRotation || isDirectDispatchGroupAndChange) && sufficientCoverage) { - // We will set this cluster in rotation if - // - not already in rotation and one group has sufficient coverage. - vipStatus.addToRotation(clusterId); - } else if (isDirectDispatchGroupAndChange) { - // We will take it out of rotation if the group is mandatory (direct dispatch to this group) - vipStatus.removeFromRotation(clusterId); + if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { + if (sufficientCoverage) { + vipStatus.addToRotation(clusterId); + } else { + vipStatus.removeFromRotation(clusterId); + } } + group.setHasSufficientCoverage(sufficientCoverage); } private boolean usesDirectDispatchTo(Node node) { - return directDispatchTarget.isPresent() && directDispatchTarget.get().equals(node); + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().equals(node); } private boolean usesDirectDispatchTo(Group group) { - return directDispatchTarget.isPresent() && directDispatchTarget.get().group() == group.id(); + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().group() == group.id(); } /** Used by the cluster monitor to manage node status */ @@ -296,21 +300,15 @@ public class SearchCluster implements NodeManager<Node> { sumOfActiveDocuments += activeDocumentsInGroup[i]; } - boolean anyGroupsSufficientCoverage = false; for (int i = 0; i < numGroups; i++) { Group group = orderedGroups.get(i); long activeDocuments = activeDocumentsInGroup[i]; long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); - boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, averageDocumentsInOtherGroups); - anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage; + boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, + averageDocumentsInOtherGroups); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(i, group, sufficientCoverage, averageDocumentsInOtherGroups); } - if ( ! anyGroupsSufficientCoverage && (sumOfActiveDocuments == 0)) { - // If no groups have sufficient coverage (0 might be sufficient) - // and there are no documents in any groups, then we are down. - vipStatus.removeFromRotation(clusterId); - } } private boolean isGroupCoverageSufficient(int workingNodes, int nodesInGroup, long activeDocuments, long averageDocumentsInOtherGroups) { @@ -383,7 +381,7 @@ public class SearchCluster implements NodeManager<Node> { private void trackGroupCoverageChanges(int index, Group group, boolean fullCoverage, long averageDocuments) { boolean changed = group.isFullCoverageStatusChanged(fullCoverage); - if (changed) { + if(changed) { int requiredNodes = groupSize() - dispatchConfig.maxNodesDownPerGroup(); if (fullCoverage) { log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", index, diff --git a/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java b/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java index e8f4d566028..cfed5ed00ad 100644 --- a/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java +++ b/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java @@ -54,6 +54,8 @@ class HitConverter implements ResultBuilder.HitConverter { throw new NullPointerException("Hit has no context."); hit.setSource(hitContext.getSource()); hit.setQuery(hitContext.getQuery()); + if (hitContext.getQueryPacketData() != null) + hit.setQueryPacketData(hitContext.getQueryPacketData()); return hit; } diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java index 4750bac551c..c1217f7acc2 100644 --- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java @@ -5,6 +5,7 @@ import com.yahoo.document.DocumentId; import com.yahoo.document.select.parser.ParseException; import com.yahoo.document.select.parser.TokenMgrException; import com.yahoo.fs4.DocsumPacket; +import com.yahoo.fs4.Packet; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.routing.Route; import com.yahoo.prelude.Ping; @@ -26,6 +27,7 @@ import com.yahoo.vdslib.SearchResult; import com.yahoo.vdslib.VisitorStatistics; import com.yahoo.vespa.streamingvisitors.tracing.TraceDescription; +import java.io.IOException; import java.math.BigInteger; import java.util.List; import java.util.Map; @@ -40,13 +42,14 @@ import java.util.logging.Logger; * @author baldersheim * @author Ulf Carlin */ +@SuppressWarnings("deprecation") public class VdsStreamingSearcher extends VespaBackEndSearcher { private static final CompoundName streamingUserid=new CompoundName("streaming.userid"); private static final CompoundName streamingGroupname=new CompoundName("streaming.groupname"); private static final CompoundName streamingSelection=new CompoundName("streaming.selection"); - static final String STREAMING_STATISTICS = "streaming.statistics"; + public static final String STREAMING_STATISTICS = "streaming.statistics"; private final VisitorFactory visitorFactory; private final TracingOptions tracingOptions; private static final Logger log = Logger.getLogger(VdsStreamingSearcher.class.getName()); @@ -81,12 +84,13 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { public VdsStreamingSearcher() { this(new VdsVisitorFactory()); } - VdsStreamingSearcher(VisitorFactory visitorFactory) { + + public VdsStreamingSearcher(VisitorFactory visitorFactory) { this.visitorFactory = visitorFactory; tracingOptions = TracingOptions.DEFAULT; } - VdsStreamingSearcher(VisitorFactory visitorFactory, TracingOptions tracingOptions) { + public VdsStreamingSearcher(VisitorFactory visitorFactory, TracingOptions tracingOptions) { this.visitorFactory = visitorFactory; this.tracingOptions = tracingOptions; } @@ -216,7 +220,7 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { query.trace(visitor.getStatistics().toString(), false, 2); query.getContext(true).setProperty(STREAMING_STATISTICS, visitor.getStatistics()); - DocsumPacket[] summaryPackets = new DocsumPacket [hits.size()]; + Packet[] summaryPackets = new Packet [hits.size()]; int index = 0; boolean skippedEarlierResult = false; @@ -252,11 +256,19 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { result.hits().add(groupHit); } - FillHitsResult fillHitsResult = fillHits(result, summaryPackets, query.getPresentation().getSummary()); - int skippedHits = fillHitsResult.skippedHits; - if (fillHitsResult.error != null) { - result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); + int skippedHits; + try { + FillHitsResult fillHitsResult = fillHits(result, summaryPackets, query.getPresentation().getSummary()); + skippedHits = fillHitsResult.skippedHits; + if (fillHitsResult.error != null) { + result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); + return result; + } + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); return result; + } catch (IOException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError("Error filling hits with summary fields")); } if (skippedHits == 0) { diff --git a/container-search/src/test/java/com/yahoo/fs4/PacketQueryTracerTestCase.java b/container-search/src/test/java/com/yahoo/fs4/PacketQueryTracerTestCase.java new file mode 100644 index 00000000000..20d9b61c177 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/PacketQueryTracerTestCase.java @@ -0,0 +1,110 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.ByteBuffer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.search.Query; + +/** + * Tests hex dumping of packets. + * + * @author Steinar Knutsen + */ +public class PacketQueryTracerTestCase { + + FS4Channel channel; + BasicPacket packet; + PacketListener tracer; + + static class MockChannel extends FS4Channel { + + @Override + public void setQuery(Query query) { + super.setQuery(query); + } + + @Override + public Query getQuery() { + return super.getQuery(); + } + + @Override + public Integer getChannelId() { + return 1; + } + + @Override + public void close() { + } + + @Override + public boolean sendPacket(BasicPacket packet) { + return true; + } + + @Override + public BasicPacket[] receivePackets(long timeout, int packetCount) { + return null; + } + + @Override + public BasicPacket nextPacket(long timeout) { + return null; + } + + @Override + protected void addPacket(BasicPacket packet) { + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public String toString() { + return "MockChannel"; + } + } + + @Before + public void setUp() throws Exception { + channel = new MockChannel(); + channel.setQuery(new Query("/?query=a&tracelevel=11")); + packet = new PingPacket(); + tracer = new PacketQueryTracer(); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testPacketSent() throws IOException { + byte[] simulatedPacket = new byte[] { 1, 2, 3 }; + tracer.packetReceived(channel, packet, ByteBuffer.wrap(simulatedPacket)); + StringWriter w = new StringWriter(); + channel.getQuery().getContext(false).render(w); + assertTrue(w.getBuffer().toString().indexOf("PingPacket: 010203") != -1); + } + + @Test + public final void testPacketReceived() throws IOException { + byte[] simulatedPacket = new byte[] { 1, 2, 3 }; + tracer.packetReceived(channel, packet, ByteBuffer.wrap(simulatedPacket)); + StringWriter w = new StringWriter(); + channel.getQuery().getContext(false).render(w); + assertTrue(w.getBuffer().toString().indexOf("PingPacket: 010203") != -1); + } + +} diff --git a/container-search/src/test/java/com/yahoo/fs4/mplex/BackendTestCase.java b/container-search/src/test/java/com/yahoo/fs4/mplex/BackendTestCase.java new file mode 100644 index 00000000000..8696ca08d2b --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/mplex/BackendTestCase.java @@ -0,0 +1,208 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import com.yahoo.container.QrConfig; +import com.yahoo.container.search.Fs4Config; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.PingPacket; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.mplex.Backend.BackendStatistics; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; +import com.yahoo.search.Query; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.logging.Logger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test networking code for talking to dispatch. + * + * @author Steinar Knutsen + */ +public class BackendTestCase { + private static final long TIMEOUT = 30000; + + public static class MockDispatch implements Runnable { + + public final ServerSocket socket; + public volatile Socket connection; + volatile int channelId; + + public byte[] packetData = new byte[] {0,0,0,104, + 0,0,0,217-256, + 0,0,0,1, + 0,0,0,0, + 0,0,0,2, + 0,0,0,0,0,0,0,5, + 0x40,0x39,0,0,0,0,0,0, + 0,0,0,111, + 0,0,0,97, + 0,0,0,3, 1,1,1,1,1,1,1,1,1,1,1,1, 0x40,0x37,0,0,0,0,0,23, 0,0,0,7, 0,0,0,36, + 0,0,0,4, 2,2,2,2,2,2,2,2,2,2,2,2, 0x40,0x35,0,0,0,0,0,21, 0,0,0,8, 0,0,0,37}; + + MockDispatch(ServerSocket socket) { + this.socket = socket; + } + + @Override + public void run() { + try { + connection = socket.accept(); + } catch (IOException e) { + e.printStackTrace(); + return; + } + requestRespond(); + } + + void requestRespond() { + byte[] length = new byte[4]; + try { + connection.getInputStream().read(length); + } catch (IOException e) { + e.printStackTrace(); + return; + } + int actual = ByteBuffer.wrap(length).getInt(); + + int read = 0; + int i = 0; + while (read != -1 && i < actual) { + try { + read = connection.getInputStream().read(); + ++i; + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + ByteBuffer reply = ByteBuffer.wrap(packetData); + if (channelId != -1) { + reply.putInt(8, channelId); + } + try { + connection.getOutputStream().write(packetData); + } catch (IOException e) { + e.printStackTrace(); + } + } + public void setNoChannel() { + channelId = -1; + } + + } + + public static class MockServer { + public InetSocketAddress host; + public Thread worker; + public MockDispatch dispatch; + + public MockServer() throws IOException { + ServerSocket socket = new ServerSocket(0, 50, InetAddress.getLoopbackAddress()); + host = (InetSocketAddress) socket.getLocalSocketAddress(); + dispatch = new MockDispatch(socket); + worker = new Thread(dispatch); + worker.start(); + } + + } + + private Backend backend; + private MockServer server; + private Logger logger; + private boolean initUseParent; + private FS4ResourcePool listeners; + + public static final byte[] PONG = new byte[] { 0, 0, 0, 32, 0, 0, 0, 221 - 256, + 0,0,0,1, 0, 0, 0, 42, 0, 0, 0, 127, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, + 0, 0, 1 }; + + public void setUp() throws Exception { + logger = Logger.getLogger(Backend.class.getName()); + initUseParent = logger.getUseParentHandlers(); + logger.setUseParentHandlers(false); + listeners = new FS4ResourcePool(new Fs4Config(new Fs4Config.Builder()), new QrConfig(new QrConfig.Builder())); + + server = new MockServer(); + backend = listeners.getBackend(server.host.getHostString(), server.host.getPort()); + } + + public void tearDown() throws Exception { + listeners.deconstruct(); + server.dispatch.socket.close(); + if (server.dispatch.connection !=null) server.dispatch.connection.close(); + if (server.worker!=null) server.worker.join(); + if (logger !=null) logger.setUseParentHandlers(initUseParent); + } + + private BasicPacket [] read(FS4Channel channel) throws InvalidChannelException { + try { + return channel.receivePackets(TIMEOUT, 1); + } catch (ChannelTimeoutException e) { + fail("Could not get packets from simulated backend."); + } + return new BasicPacket[1]; + } + + @Test + public void testAll() throws Exception { + setUp(); + doTestBackend(); + tearDown(); + + setUp(); + doTestPinging(); + tearDown(); + + setUp(); + doRequireStatistics(); + tearDown(); + } + + private void doTestBackend() throws IOException, InvalidChannelException { + FS4Channel channel = backend.openChannel(); + Query q = new Query("/?query=a"); + int channelId = channel.getChannelId(); + server.dispatch.channelId = channelId; + + assertTrue(backend.sendPacket(QueryPacket.create("container.0", q), channelId)); + BasicPacket[] b = read(channel); + assertEquals(1, b.length); + assertEquals(217, b[0].getCode()); + channel.close(); + } + + private void doTestPinging() throws IOException, InvalidChannelException { + FS4Channel channel = backend.openPingChannel(); + server.dispatch.setNoChannel(); + server.dispatch.packetData = PONG; + + assertTrue(channel.sendPacket(new PingPacket())); + BasicPacket[] b = read(channel); + assertEquals(1, b.length); + assertEquals(221, b[0].getCode()); + channel.close(); + } + + private void doRequireStatistics() throws IOException, InvalidChannelException { + FS4Channel channel = backend.openPingChannel(); + server.dispatch.channelId = -1; + server.dispatch.packetData = PONG; + + assertTrue(channel.sendPacket(new PingPacket())); + read(channel); + BackendStatistics stats = backend.getStatistics(); + assertEquals(1, stats.totalConnections()); + } + +} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/GetDocSumsPacketTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/GetDocSumsPacketTestCase.java new file mode 100644 index 00000000000..527a2736fee --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/test/GetDocSumsPacketTestCase.java @@ -0,0 +1,107 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.test; + +import com.yahoo.fs4.BufferTooSmallException; +import com.yahoo.fs4.GetDocSumsPacket; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.query.SessionId; +import com.yahoo.search.result.Hit; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Tests the GetDocsumsPacket + * + * @author Bjorn Borud + */ +public class GetDocSumsPacketTestCase { + + private static final byte IGNORE = 69; + + @Test + public void testDefaultDocsumClass() { + Query query = new Query("/?query=chain"); + assertNull(query.getPresentation().getSummary()); + } + + @Test + public void testEncodingWithQuery() throws BufferTooSmallException { + Hit hit = new FastHit(); + assertPacket(true, hit, new byte[] {0, 0, 0, 53, 0, 0, 0, -37, 0, 0, 8, 21, 0, 0, 0, 0, IGNORE, IGNORE, IGNORE, + IGNORE, 7, 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 0x03, 0, 0, 0, 7, + 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 1, 0, 0, 0, 6, 4, 0, 3, 102, 111, 111}); + } + + @Test + public void testEncodingWithoutQuery() throws BufferTooSmallException { + Hit hit = new FastHit(); + assertPacket(false, hit, new byte[] { 0, 0, 0, 39, 0, 0, 0, -37, 0, 0, 8, 17, 0, 0, 0, 0, IGNORE, IGNORE, IGNORE, + IGNORE, 7, 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 0x03, 0, 0, 0, 7, 100, 101, 102, 97, 117, 108, 116 + }); + } + + @Test + public void requireThatSessionIdIsEncodedAsPropertyWhenUsingSearchSession() throws BufferTooSmallException { + Result result = new Result(new Query("?query=foo&groupingSessionCache=false")); + SessionId sessionId = result.getQuery().getSessionId("node-0"); + result.getQuery().getRanking().setQueryCache(true); + FastHit hit = new FastHit(); + result.hits().add(hit); + ByteBuffer answer = ByteBuffer.allocate(1024); + //assertEquals(0, sessionId.asUtf8String().getByteLength()); + answer.put(new byte[] { 0, 0, 0, (byte)(103+sessionId.asUtf8String().getByteLength()), 0, 0, 0, -37, 0, 0, 24, 17, 0, 0, 0, 0, + // query timeout + IGNORE, IGNORE, IGNORE, IGNORE, + // "default" - rank profile + 7, 'd', 'e', 'f', 'a', 'u', 'l', 't', 0, 0, 0, 0x03, + // "default" - summaryclass + 0, 0, 0, 7, 'd', 'e', 'f', 'a', 'u', 'l', 't', + // 2 property entries + 0, 0, 0, 2, + // rank: sessionId => qrserver.0.XXXXXXXXXXXXX.0 + 0, 0, 0, 4, 'r', 'a', 'n', 'k', 0, 0, 0, 1, 0, 0, 0, 9, 's', 'e', 's', 's', 'i', 'o', 'n', 'I', 'd'}); + answer.putInt(sessionId.asUtf8String().getByteLength()); + answer.put(sessionId.asUtf8String().getBytes()); + answer.put(new byte [] { + // caches: features => true + 0, 0, 0, 6, 'c', 'a', 'c', 'h', 'e', 's', + 0, 0, 0, 1, 0, 0, 0, 5, 'q', 'u', 'e', 'r', 'y', 0, 0, 0, 4, 't', 'r', 'u', 'e'}); + byte [] expected = new byte [answer.position()]; + answer.flip(); + answer.get(expected); + assertPacket(false, result, expected); + } + + private static void assertPacket(boolean sendQuery, Hit hit, byte[] expected) throws BufferTooSmallException { + Result result = new Result(new Query("?query=foo")); + result.hits().add(hit); + assertPacket(sendQuery, result, expected); + } + + private static void assertPacket(boolean sendQuery, Result result, byte[] expected) throws BufferTooSmallException { + GetDocSumsPacket packet = GetDocSumsPacket.create(result, "default", sendQuery); + ByteBuffer buf = ByteBuffer.allocate(1024); + packet.encode(buf); + buf.flip(); + + byte[] actual = new byte[buf.remaining()]; + buf.get(actual); + // assertEquals(Arrays.toString(expected), Arrays.toString(actual)); + + assertEquals("Equal length", expected.length, actual.length); + for (int i = 0; i < expected.length; ++i) { + if (expected[i] == IGNORE) { + actual[i] = IGNORE; + } + } + + assertArrayEquals(expected, actual); + } +} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/HexByteIteratorTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/HexByteIteratorTestCase.java new file mode 100644 index 00000000000..022feb09b2e --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/test/HexByteIteratorTestCase.java @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.test; + +import java.nio.ByteBuffer; + +import com.yahoo.fs4.HexByteIterator; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test of HexByteIterator + * + * @author Tony Vaagenes + */ +public class HexByteIteratorTestCase { + + @Test + public void testHexByteIterator() { + int[] numbers = { 0x00, 0x01, 0xDE, 0xAD, 0xBE, 0xEF, 0xFF }; + + HexByteIterator i = new HexByteIterator( + ByteBuffer.wrap(toBytes(numbers))); + + assertEquals("00", i.next()); + assertEquals("01", i.next()); + assertEquals("DE", i.next()); + assertEquals("AD", i.next()); + assertEquals("BE", i.next()); + assertEquals("EF", i.next()); + assertEquals("FF", i.next()); + assertTrue(!i.hasNext()); + } + + private byte[] toBytes(int[] ints) { + byte[] bytes = new byte[ints.length]; + for (int i=0; i<bytes.length; ++i) + bytes[i] = (byte)ints[i]; + return bytes; + } +} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/PacketDecoderTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/PacketDecoderTestCase.java new file mode 100644 index 00000000000..2a6ab6a7644 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/test/PacketDecoderTestCase.java @@ -0,0 +1,186 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.test; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.BufferTooSmallException; +import com.yahoo.fs4.ErrorPacket; +import com.yahoo.fs4.PacketDecoder; +import com.yahoo.fs4.PacketDecoder.DecodedPacket; +import com.yahoo.fs4.QueryResultPacket; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests the PacketDecoder + * + * @author Bjørn Borud + */ +public class PacketDecoderTestCase { + + static byte[] queryResultPacketData + = new byte[] {0,0,0,104, + 0,0,0,217-256, + 0,0,0,1, + 0,0,0,0, + 0,0,0,2, + 0,0,0,0,0,0,0,5, + 0x40,0x39,0,0,0,0,0,0, + 0,0,0,111, + 0,0,0,97, + 0,0,0,3, 1,1,1,1,1,1,1,1,1,1,1,1, 0x40,0x37,0,0,0,0,0,23, 0,0,0,7, 0,0,0,36, + 0,0,0,4, 2,2,2,2,2,2,2,2,2,2,2,2, 0x40,0x35,0,0,0,0,0,21, 0,0,0,8, 0,0,0,37}; + static int len = queryResultPacketData.length; + + /** + * In this testcase we have exactly one packet which fills the + * entire buffer + */ + @Test + public void testOnePacket () throws BufferTooSmallException { + ByteBuffer data = ByteBuffer.allocate(len); + data.put(queryResultPacketData); + data.flip(); + + // not really necessary for testing, but these help visualize + // the state the buffer should be in so a reader of this test + // will not have to + assertEquals(0, data.position()); + assertEquals(len, data.limit()); + assertEquals(len, data.capacity()); + assertEquals(data.limit(), data.capacity()); + + PacketDecoder.DecodedPacket p = PacketDecoder.extractPacket(data); + assertTrue(p.packet instanceof QueryResultPacket); + + // now the buffer should have position == capacity == limit + assertEquals(len, data.position()); + assertEquals(len, data.limit()); + assertEquals(len, data.capacity()); + + // next call to decode on same bufer should result + // in null and buffer should be reset for writing. + p = PacketDecoder.extractPacket(data); + assertTrue(p == null); + + // make sure the buffer is now ready for reading + assertEquals(0, data.position()); + assertEquals(len, data.limit()); + assertEquals(len, data.capacity()); + } + + /** + * In this testcase we only have 3 bytes so we can't + * even determine the size of the packet. + */ + @Test + public void testThreeBytesPacket () throws BufferTooSmallException { + ByteBuffer data = ByteBuffer.allocate(len); + data.put(queryResultPacketData, 0, 3); + data.flip(); + + // packetLength() should return -1 since we don't even have + // the size of the packet + assertEquals(-1, PacketDecoder.packetLength(data)); + + // since we can't determine the size we don't get a packet. + // the buffer should now be at offset 3 so we can read more + // data and limit should be set to capacity + PacketDecoder.DecodedPacket p = PacketDecoder.extractPacket(data); + assertTrue(p == null); + assertEquals(3, data.position()); + assertEquals(len, data.limit()); + assertEquals(len, data.capacity()); + } + + /** + * In this testcase we have a partial packet and room for + * more data + */ + @Test + public void testPartialWithMoreRoom () throws BufferTooSmallException { + ByteBuffer data = ByteBuffer.allocate(len); + data.put(queryResultPacketData, 0, 10); + data.flip(); + + PacketDecoder.DecodedPacket p = PacketDecoder.extractPacket(data); + assertTrue(p == null); + + } + + /** + * In this testcase we have one and a half packet + */ + @Test + public void testOneAndAHalfPackets () throws BufferTooSmallException { + int half = len / 2; + ByteBuffer data = ByteBuffer.allocate(len + half); + data.put(queryResultPacketData); + data.put(queryResultPacketData, 0, half); + assertEquals((len + half), data.position()); + data.flip(); + + // the first packet we should be able to extract just fine + BasicPacket p1 = PacketDecoder.extractPacket(data).packet; + assertTrue(p1 instanceof QueryResultPacket); + + PacketDecoder.DecodedPacket p2 = PacketDecoder.extractPacket(data); + assertTrue(p2 == null); + + // at this point the buffer should be ready for more + // reading so position should be at the end and limit + // should be at capacity + assertEquals(half, data.position()); + assertEquals(data.capacity(), data.limit()); + } + + /** + * Test the case where the buffer is too small for the + * packet + */ + @Test + public void testTooSmallBufferForPacket () { + ByteBuffer data = ByteBuffer.allocate(10); + data.put(queryResultPacketData, 0, 10); + data.flip(); + + try { + PacketDecoder.extractPacket(data); + fail(); + } + catch (BufferTooSmallException e) { + + } + } + + @Test + public void testErrorPacket() throws BufferTooSmallException { + ByteBuffer b = ByteBuffer.allocate(100); + b.putInt(0); + b.putInt(203); + b.putInt(1); + b.putInt(37); + b.putInt(5); + b.put(new byte[] { (byte) 'n', (byte) 'a', (byte) 'l', (byte) 'l', (byte) 'e' }); + b.putInt(0, b.position() - 4); + b.flip(); + DecodedPacket p = PacketDecoder.extractPacket(b); + ErrorPacket e = (ErrorPacket) p.packet; + assertEquals("nalle (37)", e.toString()); + assertEquals(203, e.getCode()); + assertEquals(37, e.getErrorCode()); + b = ByteBuffer.allocate(100); + // warn if encoding support is added untested + e.encode(b); + b.position(0); + assertEquals(4, b.getInt()); + assertEquals(203, b.getInt()); + assertFalse(b.hasRemaining()); + } + +} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/PacketTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/PacketTestCase.java new file mode 100644 index 00000000000..7ee445bc6ba --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/test/PacketTestCase.java @@ -0,0 +1,229 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.BufferTooSmallException; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.search.Query; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests the Packet class. Specifically made this unit test suite + * for checking that queries that are too large for the buffer + * are handled gracefully. + * + * @author Bjorn Borud + */ +public class PacketTestCase { + + /** + * Make sure we don't get false negatives for reasonably sized + * buffers + */ + @Test + public void testSmallQueryOK () { + Query query = new Query("/?query=foo"); + assertNotNull(query); + + QueryPacket queryPacket = QueryPacket.create("container.0", query); + assertNotNull(queryPacket); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int position = buffer.position(); + + try { + queryPacket.encode(buffer, 0); + } + catch (BufferTooSmallException e) { + fail(); + } + + // make sure state of buffer HAS changed and is according + // to contract + assertTrue(position != buffer.position()); + assertTrue(buffer.position() == buffer.limit()); + } + + /** + * Make a query that is too large and then try to encode it + * into a small ByteBuffer + */ + @Test + public void testLargeQueryFail () { + StringBuilder queryBuffer = new StringBuilder(4008); + queryBuffer.append("/?query="); + for (int i=0; i < 1000; i++) { + queryBuffer.append("the%20"); + } + Query query = new Query(queryBuffer.toString()); + assertNotNull(query); + + QueryPacket queryPacket = QueryPacket.create("container.0", query); + assertNotNull(queryPacket); + + ByteBuffer buffer = ByteBuffer.allocate(100); + int position = buffer.position(); + int limit = buffer.limit(); + try { + queryPacket.encode(buffer, 0); + fail(); + } + catch (BufferTooSmallException e) { + // success if exception is thrown + } + + // make sure state of buffer is unchanged + assertEquals(position, buffer.position()); + assertEquals(limit, buffer.limit()); + } + + @Test + public void requireThatPacketsCanTurnOnCompression() throws BufferTooSmallException { + QueryPacket queryPacket = QueryPacket.create("container.0", new Query("/?query=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa&groupingSessionCache=false")); + ByteBuffer buffer = ByteBuffer.allocate(1024); + int channel = 32; + + queryPacket.encode(buffer, channel); + buffer.flip(); + assertEquals(86, buffer.getInt()); // size + assertEquals(0xda, buffer.getInt()); // code + assertEquals(channel, buffer.getInt()); + + queryPacket.setCompressionLimit(88); + buffer.clear(); + queryPacket.encode(buffer, channel); + buffer.flip(); + assertEquals(86, buffer.getInt()); // size + assertEquals(0xda, buffer.getInt()); // code + + queryPacket.setCompressionLimit(84); + buffer.clear(); + queryPacket.encode(buffer, channel); + buffer.flip(); + assertEquals(57, buffer.getInt()); // size + assertEquals(0x060000da, buffer.getInt()); // code + assertEquals(channel, buffer.getInt()); + } + + @Test + public void requireThatUncompressablePacketsArentCompressed() throws BufferTooSmallException { + QueryPacket queryPacket = QueryPacket.create("container.0", new Query("/?query=aaaaaaaaaaaaaaa&groupingSessionCache=false")); + ByteBuffer buffer = ByteBuffer.allocate(1024); + int channel = 32; + + queryPacket.setCompressionLimit(10); + buffer.clear(); + queryPacket.encode(buffer, channel); + buffer.flip(); + assertEquals(56, buffer.getInt()); // size + assertEquals(0xda, buffer.getInt()); // code + assertEquals(channel, buffer.getInt()); + } + + class MyPacket extends Packet { + private String bodyString = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + private int myCode = 1234; + + @Override + public int getCode() { + return myCode; + } + + @Override + protected void encodeBody(ByteBuffer buffer) { + buffer.put(bodyString.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public void codeDecodedHook(int code) { + assertEquals(myCode, code); + } + + @Override + public void decodeBody(ByteBuffer buffer) { + byte[] bytes = new byte[bodyString.length()]; + buffer.get(bytes); + assertEquals(bodyString, new String(bytes)); + } + } + + @Test + public void requireThatCompressedPacketsCanBeDecompressed() throws BufferTooSmallException { + + MyPacket packet = new MyPacket(); + packet.setCompressionLimit(10); + ByteBuffer buffer = ByteBuffer.allocate(1024); + int channel = 32; + packet.encode(buffer, channel); + + buffer.flip(); + new MyPacket().decode(buffer); + } + + @Test + public void requireThatCompressedByteBufferMayContainExtraData() throws BufferTooSmallException { + + MyPacket packet = new MyPacket(); + packet.setCompressionLimit(10); + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.putLong(0xdeadbeefL); + int channel = 32; + packet.encode(buffer, channel); + buffer.limit(buffer.limit() + 8); + buffer.putLong(0xdeadbeefL); + + buffer.flip(); + assertEquals(0xdeadbeefL, buffer.getLong()); // read initial content. + new MyPacket().decode(buffer); + assertEquals(0xdeadbeefL, buffer.getLong()); // read final content. + } + + class MyBasicPacket extends BasicPacket { + private String bodyString = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + private int myCode = 1234; + + @Override + public int getCode() { + return myCode; + } + + @Override + protected void encodeBody(ByteBuffer buffer) { + buffer.put(bodyString.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public void codeDecodedHook(int code) { + assertEquals(myCode, code); + } + + @Override + public void decodeBody(ByteBuffer buffer) { + byte[] bytes = new byte[bodyString.length()]; + buffer.get(bytes); + assertEquals(bodyString, new String(bytes)); + } + } + + @Test + public void requireThatCompressedBasicPacketsCanBeDecompressed() throws BufferTooSmallException { + + MyBasicPacket packet = new MyBasicPacket(); + packet.setCompressionLimit(10); + ByteBuffer buffer = ByteBuffer.allocate(1024); + packet.encode(buffer); + + buffer.flip(); + new MyBasicPacket().decode(buffer); + } + +} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/QueryResultTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/QueryResultTestCase.java new file mode 100644 index 00000000000..edf117b1195 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/test/QueryResultTestCase.java @@ -0,0 +1,113 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.test; + +import com.yahoo.document.GlobalId; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.DocumentInfo; +import com.yahoo.fs4.PacketDecoder; +import com.yahoo.fs4.QueryResultPacket; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests encoding of query packages + * + * @author bratseth + */ +public class QueryResultTestCase { + + private static final double delta = 0.00000001; + + private static GlobalId gid1 = new GlobalId(new byte[] {1,1,1,1,1,1,1,1,1,1,1,1}); + private static GlobalId gid2 = new GlobalId(new byte[] {2,2,2,2,2,2,2,2,2,2,2,2}); + + @Test + public void testDecodeQueryResultX() { + byte[] packetData = new byte[] { + 0,0,0,100, + 0,0,0,217-256, + 0,0,0,1, + 0,0,0,1, + 0,0,0,0, + 0,0,0,2, + 0,0,0,0,0,0,0,5, + 0x40,0x39,0,0,0,0,0,0, + 0,0,0,111, + 0,0,0,0,0,0,0,89, + 0,0,0,0,0,0,0,90, + 0,0,0,0,0,0,0,91, + 0,0,0,1, + 1,1,1,1,1,1,1,1,1,1,1,1, 0x40,0x37,0,0,0,0,0,0, 0,0,0,7, 0,0,0,36, + 2,2,2,2,2,2,2,2,2,2,2,2, 0x40,0x35,0,0,0,0,0,0, 0,0,0,8, 0,0,0,37 + }; + ByteBuffer buffer = ByteBuffer.allocate(200); + buffer.put(packetData); + buffer.flip(); + BasicPacket packet = PacketDecoder.decode(buffer); + assertTrue(packet instanceof QueryResultPacket); + QueryResultPacket result = (QueryResultPacket)packet; + + assertTrue(result.getMldFeature()); + + assertEquals(5, result.getTotalDocumentCount()); + assertEquals(25, result.getMaxRank()); + assertEquals(111, result.getDocstamp()); + assertEquals(89, result.getCoverageDocs()); + assertEquals(90, result.getActiveDocs()); + assertEquals(91, result.getSoonActiveDocs()); + assertEquals(1, result.getDegradedReason()); + + assertEquals(2, result.getDocuments().size()); + DocumentInfo document1 = result.getDocuments().get(0); + assertEquals(gid1, document1.getGlobalId()); + assertEquals(23.0, document1.getMetric(), delta); + assertEquals(7, document1.getPartId()); + assertEquals(36, document1.getDistributionKey()); + DocumentInfo document2 = result.getDocuments().get(1); + assertEquals(gid2, document2.getGlobalId()); + assertEquals(21.0, document2.getMetric(), delta); + assertEquals(8, document2.getPartId()); + assertEquals(37, document2.getDistributionKey()); + } + + @Test + public void testDecodeQueryResultMoreHits() { + byte[] packetData = new byte[] { + 0,0,0,100, + 0,0,0,217-256, + 0,0,0,1, + 0,0,0,3, + 0,0,0,0, + 0,0,0,2, + 0,0,0,0,0,0,0,5, + 0x40,0x39,0,0,0,0,0,0, + 0,0,0,111, + 0,6,0,5, + 0,0,0,0,0,0,0,89, + 0,0,0,0,0,0,0,90, + 0,0,0,0,0,0,0,91, + 0,0,0,1, + 1,1,1,1,1,1,1,1,1,1,1,1, 0x40,0x37,0,0,0,0,0,0, 0,0,0,7, 0,0,0,36, + 2,2,2,2,2,2,2,2,2,2,2,2, 0x40,0x35,0,0,0,0,0,0, 0,0,0,8, 0,0,0,37 + }; + ByteBuffer buffer = ByteBuffer.allocate(200); + buffer.put(packetData); + buffer.flip(); + BasicPacket packet = PacketDecoder.decode(buffer); + assertTrue(packet instanceof QueryResultPacket); + QueryResultPacket result = (QueryResultPacket)packet; + + assertEquals(2, result.getDocuments().size()); + DocumentInfo document1 = result.getDocuments().get(0); + assertEquals(gid1, document1.getGlobalId()); + DocumentInfo document2 = result.getDocuments().get(1); + assertEquals(gid2, document2.getGlobalId()); + assertEquals(6, result.getNodesQueried()); + assertEquals(5, result.getNodesReplied()); + } + +} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java new file mode 100644 index 00000000000..10d55b91131 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java @@ -0,0 +1,319 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.test; + +import com.yahoo.fs4.BufferTooSmallException; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.Freshness; +import com.yahoo.prelude.query.AndItem; +import com.yahoo.prelude.query.Highlight; +import com.yahoo.prelude.query.PhraseItem; +import com.yahoo.prelude.query.PhraseSegmentItem; +import com.yahoo.prelude.query.WeightedSetItem; +import com.yahoo.prelude.query.WordItem; +import com.yahoo.search.Query; +import com.yahoo.search.query.ranking.SoftTimeout; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Tests encoding of query x packages + * + * @author bratseth + */ +public class QueryTestCase { + + @Test + public void testEncodePacket() { + Query query = new Query("/?query=chain&timeout=0&groupingSessionCache=false"); + query.setWindow(2, 8); + QueryPacket packet = QueryPacket.create("container.0", query); + assertEquals(2, packet.getOffset()); + assertEquals(8, packet.getHits()); + + byte[] encoded = packetToBytes(packet); + byte[] correctBuffer = new byte[] {0,0,0,46,0,0,0,-38,0,0,0,0, // Header + 0,0,0,6, // Features + 2, + 8, + 0,0,0,1, // querytimeout + 0,0,0x40,0x03, // qflags + 7, + 'd', 'e', 'f', 'a', 'u', 'l', 't', + 0,0,0,1,0,0,0,8,4, + 0,5, + 99,104,97,105,110}; + assertEqualArrays(correctBuffer, encoded); + } + + @Test + public void testEncodeQueryPacketWithSomeAdditionalFeatures() { + Query query = new Query("/?query=chain&dataset=10&type=phrase&timeout=0&groupingSessionCache=false"); + query.properties().set(SoftTimeout.enableProperty, false); + + // Because the rank mapping now needs config and a searcher, + // we do the sledgehammer dance: + query.getRanking().setProfile("two"); + query.setWindow(2, 8); + QueryPacket packet = QueryPacket.create("container.0", query); + byte[] encoded = packetToBytes(packet); + byte[] correctBuffer = new byte[] {0,0,0,42,0,0,0,-38,0,0,0,0, // Header + 0,0,0,6, // Features + 2, + 8, + 0,0,0,1, // querytimeout + 0,0,0x40,0x03, // QFlags + 3, + 't','w','o', // Ranking + 0,0,0,1,0,0,0,8,4, + 0,5, + 99,104,97,105,110}; + assertEqualArrays(correctBuffer, encoded); + } + + /** This test will tell you if you have screwed up the binary encoding, but it won't tell you how */ + @Test + public void testEncodeQueryPacketWithManyFeatures() { + Query query = new Query("/?query=chain" + + "&ranking.features.query(foo)=30.3&ranking.features.query(bar)=0" + + "&ranking.properties.property.p1=v1&ranking.properties.property.p2=v2" + + "&pos.ll=S22.4532;W123.9887&pos.radius=3&pos.attribute=place&ranking.freshness=37" + + "&model.searchPath=7/3" + + "&groupingSessionCache=false"); + query.getRanking().setFreshness(new Freshness("123456")); + query.getRanking().setSorting("+field1 -field2"); + query.getRanking().setProfile("two"); + Highlight highlight = new Highlight(); + highlight.addHighlightTerm("field1", "term1"); + highlight.addHighlightTerm("field1", "term2"); + query.getPresentation().setHighlight(highlight); + + query.prepare(); + + QueryPacket packet = QueryPacket.create("container.0", query); + byte[] encoded = packetToBytes(packet); + byte[] correctBuffer=new byte[] { + 0, 0, 1, 23, 0, 0, 0, -38, 0, 0, 0, 0, 0, 16, 0, -122, 0, 10, ignored, ignored, ignored, ignored, 0, 0, 0x40, 0x03, 3, 't', 'w', 'o', 0, 0, 0, 3, 0, 0, 0, 4, 'r', 'a', 'n', 'k', 0, 0, 0, 5, 0, 0, 0, 11, 'p', 'r', 'o', 'p', 'e', 'r', 't', 'y', 46, 'p', '2', 0, 0, 0, 2, 'v', '2', 0, 0, 0, 11, 'p', 'r', 'o', 'p', 'e', 'r', 't', 'y', 46, 'p', '1', 0, 0, 0, 2, 'v', '1', 0, 0, 0, 3, 'f', 'o', 'o', 0, 0, 0, 4, '3', '0', 46, '3', 0, 0, 0, 3, 'b', 'a', 'r', 0, 0, 0, 1, '0', 0, 0, 0, 9, 'v', 'e', 's', 'p', 'a', 46, 'n', 'o', 'w', 0, 0, 0, 6, '1', '2', '3', '4', '5', '6', 0, 0, 0, 14, 'h', 'i', 'g', 'h', 'l', 'i', 'g', 'h', 't', 't', 'e', 'r', 'm', 's', 0, 0, 0, 3, 0, 0, 0, 6, 'f', 'i', 'e', 'l', 'd', '1', 0, 0, 0, 1, '2', 0, 0, 0, 6, 'f', 'i', 'e', 'l', 'd', '1', 0, 0, 0, 5, 't', 'e', 'r', 'm', '1', 0, 0, 0, 6, 'f', 'i', 'e', 'l', 'd', '1', 0, 0, 0, 5, 't', 'e', 'r', 'm', '2', 0, 0, 0, 5, 'm', 'o', 'd', 'e', 'l', 0, 0, 0, 1, 0, 0, 0, 10, 's', 'e', 'a', 'r', 'c', 'h', 'p', 'a', 't', 'h', 0, 0, 0, 3, '7', 47, '3', 0, 0, 0, 15, 43, 'f', 'i', 'e', 'l', 'd', '1', 32, 45, 'f', 'i', 'e', 'l', 'd', '2', 0, 0, 0, 1, 0, 0, 0, 9, 68, 1, 0, 5, 'c', 'h', 'a', 'i', 'n' + }; + assertEqualArrays(correctBuffer,encoded); + } + + /** This test will tell you if you have screwed up the binary encoding, but it won't tell you how */ + @Test + public void testEncodeQueryPacketWithManyFeaturesFresnhessAsString() { + Query query = new Query("/?query=chain" + + "&ranking.features.query(foo)=30.3&ranking.features.query(bar)=0" + + "&ranking.properties.property.p1=v1&ranking.properties.property.p2=v2" + + "&pos.ll=S22.4532;W123.9887&pos.radius=3&pos.attribute=place&ranking.freshness=37" + + "&model.searchPath=7/3" + + "&groupingSessionCache=false"); + query.getRanking().setFreshness("123456"); + query.getRanking().setSorting("+field1 -field2"); + query.getRanking().setProfile("two"); + Highlight highlight = new Highlight(); + highlight.addHighlightTerm("field1", "term1"); + highlight.addHighlightTerm("field1", "term2"); + query.getPresentation().setHighlight(highlight); + + query.prepare(); + + QueryPacket packet = QueryPacket.create("container.0", query); + byte[] encoded = packetToBytes(packet); + byte[] correctBuffer=new byte[] { + 0, 0, 1, 23, 0, 0, 0, -38, 0, 0, 0, 0, 0, 16, 0, -122, 0, 10, ignored, ignored, ignored, ignored, 0, 0, 0x40, 0x03, 3, 't', 'w', 'o', 0, 0, 0, 3, 0, 0, 0, 4, 'r', 'a', 'n', 'k', 0, 0, 0, 5, 0, 0, 0, 11, 'p', 'r', 'o', 'p', 'e', 'r', 't', 'y', 46, 'p', '2', 0, 0, 0, 2, 'v', '2', 0, 0, 0, 11, 'p', 'r', 'o', 'p', 'e', 'r', 't', 'y', 46, 'p', '1', 0, 0, 0, 2, 'v', '1', 0, 0, 0, 3, 'f', 'o', 'o', 0, 0, 0, 4, '3', '0', 46, '3', 0, 0, 0, 3, 'b', 'a', 'r', 0, 0, 0, 1, '0', 0, 0, 0, 9, 'v', 'e', 's', 'p', 'a', 46, 'n', 'o', 'w', 0, 0, 0, 6, '1', '2', '3', '4', '5', '6', 0, 0, 0, 14, 'h', 'i', 'g', 'h', 'l', 'i', 'g', 'h', 't', 't', 'e', 'r', 'm', 's', 0, 0, 0, 3, 0, 0, 0, 6, 'f', 'i', 'e', 'l', 'd', '1', 0, 0, 0, 1, '2', 0, 0, 0, 6, 'f', 'i', 'e', 'l', 'd', '1', 0, 0, 0, 5, 't', 'e', 'r', 'm', '1', 0, 0, 0, 6, 'f', 'i', 'e', 'l', 'd', '1', 0, 0, 0, 5, 't', 'e', 'r', 'm', '2', 0, 0, 0, 5, 'm', 'o', 'd', 'e', 'l', 0, 0, 0, 1, 0, 0, 0, 10, 's', 'e', 'a', 'r', 'c', 'h', 'p', 'a', 't', 'h', 0, 0, 0, 3, '7', 47, '3', 0, 0, 0, 15, 43, 'f', 'i', 'e', 'l', 'd', '1', 32, 45, 'f', 'i', 'e', 'l', 'd', '2', 0, 0, 0, 1, 0, 0, 0, 9, 68, 1, 0, 5, 'c', 'h', 'a', 'i', 'n' + }; + assertEqualArrays(correctBuffer, encoded); + } + + @Test + public void testEncodeQueryPacketWithLabelsConnectivityAndSignificance() { + Query query = new Query(); + query.setGroupingSessionCache(false); + AndItem and = new AndItem(); + WeightedSetItem taggable1 = new WeightedSetItem("field1"); + taggable1.setLabel("foo"); + WeightedSetItem taggable2 = new WeightedSetItem("field2"); + taggable1.setLabel("bar"); + and.addItem(taggable1); + and.addItem(taggable2); + WordItem word1 = new WordItem("word1", "field3"); + word1.setSignificance(0.37); + WordItem word2 = new WordItem("word1", "field3"); + word2.setSignificance(0.81); + word2.setConnectivity(word1, 0.15); + and.addItem(word1); + and.addItem(word2); + + query.getModel().getQueryTree().setRoot(and); + + query.prepare(); + + QueryPacket packet = QueryPacket.create("container.0", query); + byte[] encoded = packetToBytes(packet); + byte[] correctBuffer=new byte[] { + 0, 0, 1, 16, 0, 0, 0, -38, 0, 0, 0, 0, 0, 16, 0, 6, 0, 10, ignored, ignored, ignored, ignored, 0, 0, 0x40, 0x03, 7, 'd', 'e', 'f', 'a', 'u', 'l', 't', 0, 0, 0, 1, 0, 0, 0, 4, 'r', 'a', 'n', 'k', 0, 0, 0, 5, 0, 0, 0, 18, 'v', 'e', 's', 'p', 'a', 46, 'l', 'a', 'b', 'e', 'l', 46, 'b', 'a', 'r', 46, 'i', 'd', 0, 0, 0, 1, '1', 0, 0, 0, 22, 'v', 'e', 's', 'p', 'a', 46, 't', 'e', 'r', 'm', 46, '4', 46, 'c', 'o', 'n', 'n', 'e', 'x', 'i', 't', 'y', 0, 0, 0, 1, '3', 0, 0, 0, 22, 'v', 'e', 's', 'p', 'a', 46, 't', 'e', 'r', 'm', 46, '4', 46, 'c', 'o', 'n', 'n', 'e', 'x', 'i', 't', 'y', 0, 0, 0, 4, '0', 46, '1', '5', 0, 0, 0, 25, 'v', 'e', 's', 'p', 'a', 46, 't', 'e', 'r', 'm', 46, '3', 46, 's', 'i', 'g', 'n', 'i', 'f', 'i', 'c', 'a', 'n', 'c', 'e', 0, 0, 0, 4, '0', 46, '3', '7', 0, 0, 0, 25, 'v', 'e', 's', 'p', 'a', 46, 't', 'e', 'r', 'm', 46, '4', 46, 's', 'i', 'g', 'n', 'i', 'f', 'i', 'c', 'a', 'n', 'c', 'e', 0, 0, 0, 4, '0', 46, '8', '1', 0, 0, 0, 5, 0, 0, 0, '4', 1, 4, 79, 1, 0, 6, 'f', 'i', 'e', 'l', 'd', '1', 79, 2, 0, 6, 'f', 'i', 'e', 'l', 'd', '2', 68, 3, 6, 'f', 'i', 'e', 'l', 'd', '3', 5, 'w', 'o', 'r', 'd', '1', 68, 4, 6, 'f', 'i', 'e', 'l', 'd', '3', 5, 'w', 'o', 'r', 'd', 49 + }; + assertEqualArrays(correctBuffer, encoded); + } + + @Test + public void testEncodeSortSpec() throws BufferTooSmallException { + Query query = new Query("/?query=chain&sortspec=%2Ba+-b&timeout=0&groupingSessionCache=false"); + query.setWindow(2, 8); + QueryPacket packet = QueryPacket.create("container.0", query); + ByteBuffer buffer = ByteBuffer.allocate(500); + buffer.limit(0); + packet.encode(buffer, 0); + byte[] encoded = new byte[buffer.position()]; + buffer.rewind(); + buffer.get(encoded); + byte[] correctBuffer = new byte[] {0,0,0,55,0,0,0,-38,0,0,0,0, // Header + 0,0,0,-122, // Features + 2, // offset + 8, // maxhits + 0,0,0,1, // querytimeout + 0,0,0x40,0x03, // qflags + 7, + 'd', 'e', 'f', 'a', 'u', 'l', 't', + 0,0,0,5, // sortspec length + 43,97,32,45,98, // sortspec + 0,0,0,1, // num stackitems + 0,0,0,8,4, + 0,5, + 99,104,97,105,110}; + assertEqualArrays(correctBuffer, encoded); + + // Encode again to test grantEncodingBuffer + buffer = packet.grantEncodingBuffer(0); + encoded = new byte[buffer.limit()]; + buffer.get(encoded); + assertEqualArrays(correctBuffer, encoded); + } + + @Test + public void testBufferExpands() throws BufferTooSmallException { + Query query = new Query("/?query=chain&sortspec=%2Ba+-b&timeout=0"); + QueryPacket packet = QueryPacket.create("container.0", query); + + ByteBuffer buffer = packet.grantEncodingBuffer(0, ByteBuffer.allocate(2)); + assertEquals(128, buffer.capacity()); + } + + @Test + public void testPhraseEqualsPhraseWithPhraseSegment() throws BufferTooSmallException { + Query query = new Query(); + query.setGroupingSessionCache(false); + PhraseItem p = new PhraseItem(); + PhraseSegmentItem ps = new PhraseSegmentItem("a b", false, false); + ps.addItem(new WordItem("a")); + ps.addItem(new WordItem("b")); + p.addItem(ps); + query.getModel().getQueryTree().setRoot(p); + + query.setTimeout(0); + QueryPacket queryPacket = QueryPacket.create("container.0", query); + + ByteBuffer buffer1 = ByteBuffer.allocate(1024); + + queryPacket.encode(buffer1, 0); + + query = new Query(); + query.setGroupingSessionCache(false); + p = new PhraseItem(); + p.addItem(new WordItem("a")); + p.addItem(new WordItem("b")); + query.getModel().getQueryTree().setRoot(p); + + query.setTimeout(0); + queryPacket = QueryPacket.create("container.0", query); + assertNotNull(queryPacket); + + ByteBuffer buffer2 = ByteBuffer.allocate(1024); + + queryPacket.encode(buffer2, 0); + + byte[] encoded1 = new byte[buffer1.position()]; + buffer1.rewind(); + buffer1.get(encoded1); + byte[] encoded2 = new byte[buffer2.position()]; + buffer2.rewind(); + buffer2.get(encoded2); + assertEqualArrays(encoded2, encoded1); + } + + @Test + public void testPatchInChannelId() { + Query query = new Query("/?query=chain&timeout=0&groupingSessionCache=false"); + query.setWindow(2, 8); + QueryPacket packet = QueryPacket.create("container.0", query); + assertEquals(2,packet.getOffset()); + assertEquals(8, packet.getHits()); + + ByteBuffer buffer = packet.grantEncodingBuffer(0x07070707); + + byte[] correctBuffer = new byte[] {0,0,0,46,0,0,0,-38,7,7,7,7, // Header + 0,0,0,6, // Features + 2, + 8, + 0,0,0,1, // querytimeout + 0,0,0x40,0x03, // qflags + 7, + 'd', 'e', 'f', 'a', 'u', 'l', 't', + 0,0,0,1,0,0,0,8,4, + 0,5, + 99,104,97,105,110}; + + byte[] encoded = new byte[buffer.limit()]; + buffer.get(encoded); + + assertEqualArrays(correctBuffer,encoded); + + buffer = packet.grantEncodingBuffer(0x09090909); + correctBuffer = new byte[] {0,0,0,46,0,0,0,-38,9,9,9,9, // Header + 0,0,0,6, // Features + 2, + 8, + 0,0,0,1, // querytimeout + 0,0,0x40,0x03, // qflags + 7, + 'd', 'e', 'f', 'a', 'u', 'l', 't', + 0,0,0,1,0,0,0,8,4, + 0,5, + 99,104,97,105,110}; + + encoded = new byte[buffer.limit()]; + buffer.get(encoded); + + assertEqualArrays(correctBuffer,encoded); + } + + public static byte[] packetToBytes(Packet packet) { + try { + ByteBuffer buffer = ByteBuffer.allocate(500); + buffer.limit(0); + packet.encode(buffer, 0); + byte[] encoded = new byte[buffer.position()]; + buffer.rewind(); + buffer.get(encoded); + return encoded; + } + catch (BufferTooSmallException e) { + throw new RuntimeException(e); + } + } + + public static void assertEqualArrays(byte[] correct, byte[] test) { + assertEquals("Incorrect length,", correct.length, test.length); + for (int i = 0; i < correct.length; i++) { + if (correct[i] == ignored) continue; // Special value used to ignore bytes we don't want to check + assertEquals("Byte nr " + i, correct[i], test[i]); + } + } + + public static final byte ignored = -128; + +} diff --git a/container-search/src/test/java/com/yahoo/search/query/test/RankFeaturesTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/RankFeaturesTestCase.java index 8aff81a90db..b52c708ce4b 100644 --- a/container-search/src/test/java/com/yahoo/search/query/test/RankFeaturesTestCase.java +++ b/container-search/src/test/java/com/yahoo/fs4/test/RankFeaturesTestCase.java @@ -1,5 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.query.test; +package com.yahoo.fs4.test; import com.yahoo.io.GrowableByteBuffer; import com.yahoo.search.query.ranking.RankFeatures; @@ -11,11 +11,7 @@ import com.yahoo.text.Utf8; import org.junit.Test; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static org.junit.Assert.assertEquals; diff --git a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java index 9ea7276583b..86553a86add 100644 --- a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java @@ -7,6 +7,8 @@ import com.yahoo.container.QrConfig; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.handler.VipStatus; import com.yahoo.container.protect.Error; +import com.yahoo.container.search.Fs4Config; +import com.yahoo.fs4.QueryPacket; import com.yahoo.prelude.IndexFacts; import com.yahoo.prelude.IndexModel; import com.yahoo.prelude.SearchDefinition; @@ -46,12 +48,12 @@ import static org.junit.Assert.assertTrue; * @author bratseth */ public class ClusterSearcherTestCase { - private static final double DELTA = 0.0000000000000001; @Test public void testNoBackends() { ClusterSearcher cluster = new ClusterSearcher(new LinkedHashSet<>(Arrays.asList("dummy"))); try { + cluster.getMonitor().getConfiguration().setRequestTimeout(100); Execution execution = new Execution(cluster, Execution.Context.createContextStub()); Query query = new Query("query=hello"); query.setHits(10); @@ -144,7 +146,7 @@ public class ClusterSearcherTestCase { private final String type3 = "type3"; private final Map<String, List<Hit>> results = new LinkedHashMap<>(); private final boolean expectAttributePrefetch; - static final String ATTRIBUTE_PREFETCH = "attributeprefetch"; + public static final String ATTRIBUTE_PREFETCH = "attributeprefetch"; private String getId(String type, int i) { return "id:ns:" + type + "::" + i; @@ -194,7 +196,7 @@ public class ClusterSearcherTestCase { createHit(getId(type3, 2), 5))); } - MyMockSearcher(boolean expectAttributePrefetch) { + public MyMockSearcher(boolean expectAttributePrefetch) { this.expectAttributePrefetch = expectAttributePrefetch; init(); } @@ -261,7 +263,8 @@ public class ClusterSearcherTestCase { } private Execution createExecution(List<String> docTypesList, boolean expectAttributePrefetch) { - Set<String> documentTypes = new LinkedHashSet<>(docTypesList); + Set<String> documentTypes = new LinkedHashSet<>(); + documentTypes.addAll(docTypesList); ClusterSearcher cluster = new ClusterSearcher(documentTypes); try { cluster.addBackendSearcher(new MyMockSearcher( @@ -274,7 +277,6 @@ public class ClusterSearcherTestCase { } } - @Test public void testThatSingleDocumentTypeCanBeSearched() { { // Explicit 1 type in restrict set Execution execution = createExecution(); @@ -283,9 +285,9 @@ public class ClusterSearcherTestCase { assertEquals(3, result.getTotalHitCount()); List<Hit> hits = result.hits().asList(); assertEquals(3, hits.size()); - assertEquals(9.0, hits.get(0).getRelevance().getScore(), DELTA); - assertEquals(6.0, hits.get(1).getRelevance().getScore(), DELTA); - assertEquals(3.0, hits.get(2).getRelevance().getScore(), DELTA); + assertEquals(9.0, hits.get(0).getRelevance().getScore()); + assertEquals(6.0, hits.get(1).getRelevance().getScore()); + assertEquals(3.0, hits.get(2).getRelevance().getScore()); } { // Only 1 registered type in cluster searcher, empty restrict set // NB ! Empty restrict sets does not exist below the cluster searcher. @@ -297,11 +299,10 @@ public class ClusterSearcherTestCase { assertEquals(3, result.getTotalHitCount()); List<Hit> hits = result.hits().asList(); assertEquals(3, hits.size()); - assertEquals(9.0, hits.get(0).getRelevance().getScore(), DELTA); + assertEquals(9.0, hits.get(0).getRelevance().getScore()); } } - @Test public void testThatSubsetOfDocumentTypesCanBeSearched() { Execution execution = createExecution(); Query query = new Query("?query=hello&restrict=type1,type3"); @@ -310,15 +311,14 @@ public class ClusterSearcherTestCase { assertEquals(6, result.getTotalHitCount()); List<Hit> hits = result.hits().asList(); assertEquals(6, hits.size()); - assertEquals(11.0, hits.get(0).getRelevance().getScore(), DELTA); - assertEquals(9.0, hits.get(1).getRelevance().getScore(), DELTA); - assertEquals(8.0, hits.get(2).getRelevance().getScore(), DELTA); - assertEquals(6.0, hits.get(3).getRelevance().getScore(), DELTA); - assertEquals(5.0, hits.get(4).getRelevance().getScore(), DELTA); - assertEquals(3.0, hits.get(5).getRelevance().getScore(), DELTA); + assertEquals(11.0, hits.get(0).getRelevance().getScore()); + assertEquals(9.0, hits.get(1).getRelevance().getScore()); + assertEquals(8.0, hits.get(2).getRelevance().getScore()); + assertEquals(6.0, hits.get(3).getRelevance().getScore()); + assertEquals(5.0, hits.get(4).getRelevance().getScore()); + assertEquals(3.0, hits.get(5).getRelevance().getScore()); } - @Test public void testThatMultipleDocumentTypesCanBeSearchedAndFilled() { Execution execution = createExecution(); Query query = new Query("?query=hello"); @@ -327,15 +327,15 @@ public class ClusterSearcherTestCase { assertEquals(9, result.getTotalHitCount()); List<Hit> hits = result.hits().asList(); assertEquals(9, hits.size()); - assertEquals(11.0, hits.get(0).getRelevance().getScore(), DELTA); - assertEquals(10.0, hits.get(1).getRelevance().getScore(), DELTA); - assertEquals(9.0, hits.get(2).getRelevance().getScore(), DELTA); - assertEquals(8.0, hits.get(3).getRelevance().getScore(), DELTA); - assertEquals(7.0, hits.get(4).getRelevance().getScore(), DELTA); - assertEquals(6.0, hits.get(5).getRelevance().getScore(), DELTA); - assertEquals(5.0, hits.get(6).getRelevance().getScore(), DELTA); - assertEquals(4.0, hits.get(7).getRelevance().getScore(), DELTA); - assertEquals(3.0, hits.get(8).getRelevance().getScore(), DELTA); + assertEquals(11.0, hits.get(0).getRelevance().getScore()); + assertEquals(10.0, hits.get(1).getRelevance().getScore()); + assertEquals(9.0, hits.get(2).getRelevance().getScore()); + assertEquals(8.0, hits.get(3).getRelevance().getScore()); + assertEquals(7.0, hits.get(4).getRelevance().getScore()); + assertEquals(6.0, hits.get(5).getRelevance().getScore()); + assertEquals(5.0, hits.get(6).getRelevance().getScore()); + assertEquals(4.0, hits.get(7).getRelevance().getScore()); + assertEquals(3.0, hits.get(8).getRelevance().getScore()); for (int i = 0; i < 9; ++i) { assertNull(hits.get(i).getField("score")); } @@ -390,7 +390,7 @@ public class ClusterSearcherTestCase { assertResult(9, Arrays.asList(5.0, 4.0), getResult(6, 2, ex)); assertResult(9, Arrays.asList(4.0, 3.0), getResult(7, 2, ex)); assertResult(9, Arrays.asList(3.0), getResult(8, 2, ex)); - assertResult(9, new ArrayList<>(), getResult(9, 2, ex)); + assertResult(9, new ArrayList<Double>(), getResult(9, 2, ex)); assertResult(9, Arrays.asList(11.0, 10.0, 9.0, 8.0, 7.0), getResult(0, 5, ex)); assertResult(9, Arrays.asList(6.0, 5.0, 4.0, 3.0), getResult(5, 5, ex)); @@ -425,7 +425,11 @@ public class ClusterSearcherTestCase { final String yahoo = "www.yahoo.com"; try { - canFindYahoo = (null != InetAddress.getByName(yahoo)); + if (null != InetAddress.getByName(yahoo)) { + canFindYahoo = true; + } else { + canFindYahoo = false; + } } catch (Exception e) { canFindYahoo = false; } @@ -538,11 +542,12 @@ public class ClusterSearcherTestCase { qrSearchersConfig.build(), clusterConfig.build(), documentDbConfig.build(), + new QrMonitorConfig.Builder().build(), new DispatchConfig.Builder().build(), createClusterInfoConfig(), Statistics.nullImplementation, new MockMetric(), - new FS4ResourcePool(new QrConfig.Builder().build()), + new FS4ResourcePool(new Fs4Config.Builder().build(), new QrConfig.Builder().build()), new VipStatus()); } @@ -585,7 +590,7 @@ public class ClusterSearcherTestCase { @Test public void testThatQueryTimeoutIsCappedWithSpecifiedMax() { - QueryTimeoutFixture f = new QueryTimeoutFixture(70.0, null); + QueryTimeoutFixture f = new QueryTimeoutFixture(Double.valueOf(70), null); f.query.setTimeout(70001); f.search(); assertEquals(70000, f.query.getTimeout()); @@ -611,7 +616,7 @@ public class ClusterSearcherTestCase { @Test public void testThatQueryCacheIsDisabledIfTimeoutIsLargerThanConfiguredMax() { - QueryTimeoutFixture f = new QueryTimeoutFixture(null, 5.0); + QueryTimeoutFixture f = new QueryTimeoutFixture(null, Double.valueOf(5)); f.query.setTimeout(5001); f.query.getRanking().setQueryCache(true); f.search(); diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/FS4SearchInvokerTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/FS4SearchInvokerTestCase.java new file mode 100644 index 00000000000..b9119528490 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/FS4SearchInvokerTestCase.java @@ -0,0 +1,72 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.prelude.fastsearch; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.InterleavedSearchInvoker; +import com.yahoo.search.dispatch.MockSearchCluster; +import com.yahoo.search.dispatch.ResponseMonitor; +import com.yahoo.search.searchchain.Execution; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +import static org.junit.Assert.assertThat; + +public class FS4SearchInvokerTestCase { + @SuppressWarnings("resource") + @Test + public void testThatConnectionErrorsAreReportedImmediately() throws IOException { + var query = new Query("?"); + query.setTimeout(1000); + + var searcher = mockSearcher(); + var cluster = new MockSearchCluster("?", 1, 1); + var fs4invoker = new FS4SearchInvoker(searcher, query, mockFailingChannel(), Optional.empty()); + var interleave = new InterleavedSearchInvoker(Collections.singleton(fs4invoker), cluster, null); + + long start = System.currentTimeMillis(); + interleave.search(query, null); + long elapsed = System.currentTimeMillis() - start; + + assertThat("Connection error should fail fast", elapsed, Matchers.lessThan(500L)); + } + + private static VespaBackEndSearcher mockSearcher() { + return new VespaBackEndSearcher() { + @Override + protected Result doSearch2(Query query, Execution execution) { + return null; + } + + @Override + protected void doPartialFill(Result result, String summaryClass) {} + }; + } + + private static FS4Channel mockFailingChannel() { + return new FS4Channel() { + @Override + public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { + // pretend there's a connection error + return false; + } + + @Override + public void setQuery(Query q) {} + + @Override + public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) {} + + @Override + public void close() {} + }; + } +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/DirectSearchTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/DirectSearchTestCase.java new file mode 100644 index 00000000000..b0662a93f62 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/DirectSearchTestCase.java @@ -0,0 +1,137 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch.test; + +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.search.Result; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests that FastSearcher will bypass dispatch when the conditions are right + * + * @author bratseth + */ +public class DirectSearchTestCase { + + @Test + public void testDirectSearchEnabled() { + FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); + tester.search("?query=test&dispatch.direct=true"); + assertEquals("The FastSearcher has used the local search node connection", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + + @Test + public void testDirectSearchDisabled() { + FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); + tester.search("?query=test&dispatch.direct=false&dispatch.internal=false"); + assertEquals(0, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + + @Test + public void testDirectSearchEnabledByDefault() { + FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); + tester.search("?query=test"); + assertEquals("The FastSearcher has used the local search node connection", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + + @Test + public void testNoDirectSearchWhenMoreSearchNodesThanContainers() { + FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:1"); + tester.search("?query=test&dispatch.direct=true&dispatch.internal=false"); + assertEquals(0, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + + @Test + public void testDirectSearchWhenMultipleGroupsAndEnoughContainers() { + FastSearcherTester tester = new FastSearcherTester(2, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:1"); + tester.search("?query=test&dispatch.direct=true"); + assertEquals(1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + + @Test + public void testDirectSearchSummaryFetchGoToLocalNode() { + FastSearcherTester tester = new FastSearcherTester(2, "otherhost:9999:1", FastSearcherTester.selfHostname + ":9999:0"); + int localDistributionKey = tester.dispatcher().searchCluster().nodesByHost().get(FastSearcherTester.selfHostname).asList().get(0).key(); + assertEquals(1, localDistributionKey); + Result result = tester.search("?query=test&dispatch.direct=true"); + assertEquals(1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + FastHit hit = (FastHit)result.hits().get(0); + assertEquals(localDistributionKey, hit.getDistributionKey()); + } + + @Test + public void testNoDirectSearchWhenMultipleNodesPerGroup() { + FastSearcherTester tester = new FastSearcherTester(2, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:0"); + tester.search("?query=test&dispatch.direct=true&dispatch.internal=false"); + assertEquals(0, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + + @Test + public void testNoDirectSearchWhenLocalNodeIsDown() { + FastSearcherTester tester = new FastSearcherTester(2, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:1"); + assertTrue(tester.vipStatus().isInRotation()); + tester.setResponding(FastSearcherTester.selfHostname, false); + assertFalse(tester.vipStatus().isInRotation()); + assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.search("?query=test&dispatch.direct=true&nocache"); + assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.setResponding(FastSearcherTester.selfHostname, true); + assertTrue(tester.vipStatus().isInRotation()); + assertEquals("2 ping requests, 0 search request", 2, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.search("?query=test&dispatch.direct=true&nocache"); + assertEquals("2 ping requests, 1 search request", 3, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + + @Test + public void testNoDirectDispatchWhenInsufficientCoverage() { + FastSearcherTester tester = new FastSearcherTester(3, + FastSearcherTester.selfHostname + ":9999:0", + "host1:9999:1", + "host2:9999:2"); + double k = 38.78955; // multiply all document counts by some number > 1 to test that we compute % correctly + + tester.setActiveDocuments(FastSearcherTester.selfHostname, (long) (96 * k)); + tester.setActiveDocuments("host1", (long) (100 * k)); + tester.setActiveDocuments("host2", (long) (100 * k)); + assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.search("?query=test&dispatch.direct=true&nocache"); + assertEquals("Still 1 ping request, 0 search requests because the default coverage is 97%, and we only have 96% locally", + 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.waitForInRotationIs(false); + + tester.setActiveDocuments(FastSearcherTester.selfHostname, (long) (99 * k)); + assertEquals("2 ping request, 0 search requests", 2, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.search("?query=test&dispatch.direct=true&nocache"); + assertEquals("2 ping request, 1 search requests because we now have 99% locally", + 3, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.waitForInRotationIs(true); + + tester.setActiveDocuments("host1", (long) (104 * k)); + assertEquals("2 ping request, 1 search requests", 3, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.search("?query=test&dispatch.direct=true&nocache"); + assertEquals("2 ping request, 2 search requests because 99/((104+100)/2) > 0.97", + 4, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.waitForInRotationIs(true); + + tester.setActiveDocuments("host2", (long) (102 * k)); + assertEquals("2 ping request, 2 search requests", 4, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.search("?query=test&dispatch.direct=true&nocache"); + assertEquals("Still 2 ping request, 2 search requests because 99/((104+102)/2) < 0.97", + 4, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.waitForInRotationIs(false); + } + + @Test + public void testCoverageWithSingleGroup() { + FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); + + tester.setActiveDocuments(FastSearcherTester.selfHostname, 100); + assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + tester.search("?query=test&dispatch.direct=true&nocache"); + assertEquals("1 ping request, 1 search requests", 2, tester.requestCount(FastSearcherTester.selfHostname, 9999)); + } + +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index eb4d65693bb..c6e87170f07 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -3,34 +3,64 @@ package com.yahoo.prelude.fastsearch.test; import com.google.common.collect.ImmutableList; import com.yahoo.component.chain.Chain; +import com.yahoo.config.subscription.ConfigGetter; +import com.yahoo.container.QrConfig; +import com.yahoo.container.handler.VipStatus; import com.yahoo.container.protect.Error; +import com.yahoo.container.search.Fs4Config; +import com.yahoo.document.GlobalId; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.BackendTestCase; +import com.yahoo.fs4.test.QueryTestCase; import com.yahoo.language.simple.SimpleLinguistics; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; +import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.FastSearcher; import com.yahoo.prelude.fastsearch.SummaryParameters; +import com.yahoo.prelude.fastsearch.test.fs4mock.MockBackend; +import com.yahoo.prelude.fastsearch.test.fs4mock.MockFS4ResourcePool; +import com.yahoo.prelude.fastsearch.test.fs4mock.MockFSChannel; +import com.yahoo.processing.execution.Execution.Trace; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.Searcher; +import com.yahoo.search.dispatch.rpc.MockRpcResourcePoolBuilder; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.AllOperation; import com.yahoo.search.grouping.request.EachOperation; import com.yahoo.search.grouping.request.GroupingOperation; +import com.yahoo.search.query.SessionId; import com.yahoo.search.rendering.RendererRegistry; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; +import com.yahoo.yolean.trace.TraceNode; +import com.yahoo.yolean.trace.TraceVisitor; import org.junit.Test; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; - +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Tests the Fast searcher @@ -40,12 +70,30 @@ import static org.junit.Assert.assertNotNull; public class FastSearcherTestCase { private final static DocumentdbInfoConfig documentdbInfoConfig = new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder()); + private MockBackend mockBackend; + @Test + public void testNoNormalizing() { + Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); + FastSearcher fastSearcher = new FastSearcher(new MockBackend(), + new FS4ResourcePool("container.0", 1), + MockDispatcher.create(Collections.emptyList()), + new SummaryParameters(null), + new ClusterParams("testhittype"), + documentdbInfoConfig); + + MockFSChannel.setEmptyDocsums(false); + + Result result = doSearch(fastSearcher, new Query("?query=ignored"), 0, 10); + + assertTrue(result.hits().get(0).getRelevance().getScore() > 1000); + } @Test public void testNullQuery() { Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); - FastSearcher fastSearcher = new FastSearcher("container.0", + FastSearcher fastSearcher = new FastSearcher(new MockBackend(), + new FS4ResourcePool("container.0", 1), MockDispatcher.create(Collections.emptyList()), new SummaryParameters(null), new ClusterParams("testhittype"), @@ -61,6 +109,152 @@ public class FastSearcherTestCase { assertEquals(Error.NULL_QUERY.code, message.getCode()); } + @Test + public void testDispatchDotSummaries() { + Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); + DocumentdbInfoConfig documentdbConfigWithOneDb = + new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder().documentdb(new DocumentdbInfoConfig.Documentdb.Builder() + .name("testDb") + .summaryclass(new DocumentdbInfoConfig.Documentdb.Summaryclass.Builder().name("simple").id(7)) + .rankprofile(new DocumentdbInfoConfig.Documentdb.Rankprofile.Builder() + .name("simpler").hasRankFeatures(false).hasSummaryFeatures(false)))); + + List<Node> nodes = new ArrayList<>(); + nodes.add(new Node(0, "host1", 5000, 0)); + nodes.add(new Node(1, "host2", 5000, 0)); + + var mockFs4ResourcePool = new MockFS4ResourcePool(); + var mockRpcResourcePool = new MockRpcResourcePoolBuilder().connection(0).connection(1).build(); + + FastSearcher fastSearcher = new FastSearcher(new MockBackend(), + mockFs4ResourcePool, + MockDispatcher.create(nodes, mockFs4ResourcePool, mockRpcResourcePool, 1, new VipStatus()), + new SummaryParameters(null), + new ClusterParams("testhittype"), + documentdbConfigWithOneDb); + + { // No direct.summaries + String query = "?query=sddocname:a&summary=simple&timeout=20s"; + Result result = doSearch(fastSearcher, new Query(query), 0, 10); + doFill(fastSearcher, result); + ErrorMessage error = result.hits().getError(); + assertNull("Since we don't route to the dispatcher we hit the mock backend, so no error", error); + } + + { // direct.summaries due to query cache + String query = "?query=sddocname:a&ranking.queryCache&timeout=20s"; + Result result = doSearch(fastSearcher, new Query(query), 0, 10); + doFill(fastSearcher, result); + ErrorMessage error = result.hits().getError(); + assertEquals("Since we don't actually run summary backends we get this error when the Dispatcher is used", + "getDocsums(..) attempted for node X", error.getDetailedMessage().replaceAll("\\d", "X")); + } + + { // direct.summaries due to no summary features + String query = "?query=sddocname:a&dispatch.summaries&summary=simple&ranking=simpler&timeout=20s"; + Result result = doSearch(fastSearcher, new Query(query), 0, 10); + doFill(fastSearcher, result); + ErrorMessage error = result.hits().getError(); + assertEquals("Since we don't actually run summary backends we get this error when the Dispatcher is used", + "getDocsums(..) attempted for node X", error.getDetailedMessage().replaceAll("\\d", "X")); + } + } + + @Test + public void testQueryWithRestrict() { + mockBackend = new MockBackend(); + DocumentdbInfoConfig documentdbConfigWithOneDb = + new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder().documentdb(new DocumentdbInfoConfig.Documentdb.Builder().name("testDb"))); + FastSearcher fastSearcher = new FastSearcher(mockBackend, + new FS4ResourcePool("container.0", 1), + MockDispatcher.create(Collections.emptyList()), + new SummaryParameters(null), + new ClusterParams("testhittype"), + documentdbConfigWithOneDb); + + Query query = new Query("?query=foo&model.restrict=testDb&groupingSessionCache=false"); + query.prepare(); + doSearch(fastSearcher, query, 0, 10); + + Packet receivedPacket = mockBackend.getChannel().getLastQueryPacket(); + byte[] encoded = QueryTestCase.packetToBytes(receivedPacket); + byte[] correct = new byte[] { + 0, 0, 0, 100, 0, 0, 0, -38, 0, 0, 0, 0, 0, 16, 0, 6, 0, 10, + QueryTestCase.ignored, QueryTestCase.ignored, QueryTestCase.ignored, QueryTestCase.ignored, // time left + 0, 0, 0x40, 0x03, 7, 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 1, 0, 0, 0, 5, 109, 97, 116, 99, 104, 0, 0, 0, 1, 0, 0, 0, 24, 100, 111, 99, 117, 109, 101, 110, 116, 100, 98, 46, 115, 101, 97, 114, 99, 104, 100, 111, 99, 116, 121, 112, 101, 0, 0, 0, 6, 116, 101, 115, 116, 68, 98, 0, 0, 0, 1, 0, 0, 0, 7, 68, 1, 0, 3, 102, 111, 111 + }; + QueryTestCase.assertEqualArrays(correct, encoded); + } + + @Test + public void testSearch() { + FastSearcher fastSearcher = createFastSearcher(); + + Result result = doSearch(fastSearcher, new Query("?query=ignored"), 0, 10); + + Execution execution = new Execution(chainedAsSearchChain(fastSearcher), Execution.Context.createContextStub()); + assertEquals(2, result.getHitCount()); + execution.fill(result); + assertCorrectHit1((FastHit)result.hits().get(0)); + assertCorrectTypes1((FastHit)result.hits().get(0)); + for (int idx = 0; idx < result.getHitCount(); idx++) { + assertTrue(!result.hits().get(idx).isCached()); + } + + // Repeat the request a couple of times, to verify whether the packet cache works + result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 10); + assertEquals(2, result.getHitCount()); + execution.fill(result); + assertCorrectHit1((FastHit) result.hits().get(0)); + for (int i = 0; i < result.getHitCount(); i++) { + assertFalse(result.hits().get(i) + " should never be cached", + result.hits().get(i).isCached()); + } + + result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 10); + assertEquals(2, result.getHitCount()); + execution.fill(result); + assertCorrectHit1((FastHit) result.hits().get(0)); + assertTrue("All hits are not cached", !result.isCached()); + for (int i = 0; i < result.getHitCount(); i++) { + assertTrue(!result.hits().get(i).isCached()); + } + + // Test that partial result sets can be retrieved from the cache + result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 1); + assertEquals(1, result.getConcreteHitCount()); + execution.fill(result); + + result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 2); + assertEquals(2, result.getConcreteHitCount()); + execution.fill(result); + // No hit should be cached + assertFalse(result.hits().get(0).isCached()); + assertFalse(result.hits().get(1).isCached()); + + // Still nothing cached + result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 2); + assertEquals(2, result.getConcreteHitCount()); + execution.fill(result); + // both first and second should now be cached + assertFalse(result.hits().get(0).isCached()); + assertFalse(result.hits().get(1).isCached()); + + // Tests that the cache _hit_ is not returned if _another_ + // hit is requested + + result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 1); + assertEquals(1, result.getConcreteHitCount()); + + result = doSearch(fastSearcher,new Query("?query=ignored"), 1, 1); + assertEquals(1, result.getConcreteHitCount()); + + for (int i = 0; i < result.getHitCount(); i++) { + assertFalse("Hit " + i + " should not be cached.", + result.hits().get(i).isCached()); + } + } + private Chain<Searcher> chainedAsSearchChain(Searcher topOfChain) { List<Searcher> searchers = new ArrayList<>(); searchers.add(topOfChain); @@ -78,9 +272,79 @@ public class FastSearcherTestCase { return new Execution(chainedAsSearchChain(searcher), context); } + private void doFill(Searcher searcher, Result result) { + createExecution(searcher).fill(result); + } + + @Test + public void testThatPropertiesAreReencoded() throws Exception { + FastSearcher fastSearcher = createFastSearcher(); + + Query query = new Query("?query=ignored&dispatch.summaries=false&groupingSessionCache=false"); + query.getRanking().setQueryCache(true); + Result result = doSearch(fastSearcher, query, 0, 10); + + Execution execution = new Execution(chainedAsSearchChain(fastSearcher), Execution.Context.createContextStub()); + assertEquals(2, result.getHitCount()); + execution.fill(result); + + BasicPacket receivedPacket = mockBackend.getChannel().getLastReceived(); + ByteBuffer buf = ByteBuffer.allocate(1000); + receivedPacket.encode(buf); + buf.flip(); + byte[] actual = new byte[buf.remaining()]; + buf.get(actual); + + SessionId sessionId = query.getSessionId(); + byte IGNORE = 69; + ByteBuffer answer = ByteBuffer.allocate(1024); + answer.put(new byte[] { 0, 0, 0, (byte)(141+sessionId.asUtf8String().getByteLength()), 0, 0, 0, -37, 0, 0, 16, 17, 0, 0, 0, 0, + // query timeout + IGNORE, IGNORE, IGNORE, IGNORE, + // "default" - rank profile + 7, 'd', 'e', 'f', 'a', 'u', 'l', 't', 0, 0, 0, 0x03, + // 3 property entries (rank, match, caches) + 0, 0, 0, 3, + // rank: sessionId => qrserver.0.XXXXXXXXXXXXX.0 + 0, 0, 0, 4, 'r', 'a', 'n', 'k', 0, 0, 0, 1, 0, 0, 0, 9, 's', 'e', 's', 's', 'i', 'o', 'n', 'I', 'd'}); + answer.putInt(sessionId.asUtf8String().getBytes().length); + answer.put(sessionId.asUtf8String().getBytes()); + answer.put(new byte [] { + // match: documentdb.searchdoctype => test + 0, 0, 0, 5, 'm', 'a', 't', 'c', 'h', 0, 0, 0, 1, 0, 0, 0, 24, 'd', 'o', 'c', 'u', 'm', 'e', 'n', 't', 'd', 'b', '.', 's', 'e', 'a', 'r', 'c', 'h', 'd', 'o', 'c', 't', 'y', 'p', 'e', 0, 0, 0, 4, 't', 'e', 's', 't', + // sessionId => qrserver.0.XXXXXXXXXXXXX.0 + 0, 0, 0, 6, 'c', 'a', 'c', 'h', 'e', 's', 0, 0, 0, 1, 0, 0, 0, 5, 'q', 'u', 'e', 'r', 'y', 0, 0, 0, 4, 't', 'r', 'u', 'e'}); + byte [] expected = new byte [answer.position()]; + answer.flip(); + answer.get(expected); + + for (int i = 0; i < expected.length; ++i) { + if (expected[i] == IGNORE) { + actual[i] = IGNORE; + } + } + assertArrayEquals(expected, actual); + } + + private FastSearcher createFastSearcher() { + mockBackend = new MockBackend(); + ConfigGetter<DocumentdbInfoConfig> getter = new ConfigGetter<>(DocumentdbInfoConfig.class); + DocumentdbInfoConfig config = getter.getConfig("file:src/test/java/com/yahoo/prelude/fastsearch/test/documentdb-info.cfg"); + + MockFSChannel.resetDocstamp(); + Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); + return new FastSearcher(mockBackend, + new FS4ResourcePool("container.0", 1), + MockDispatcher.create(Collections.emptyList()), + new SummaryParameters(null), + new ClusterParams("testhittype"), + config); + } + @Test public void testSinglePassGroupingIsForcedWithSingleNodeGroups() { - FastSearcher fastSearcher = new FastSearcher("container.0", + FastSearcher fastSearcher = new FastSearcher(new MockBackend(), + new FS4ResourcePool("container.0", 1), MockDispatcher.create(Collections.singletonList(new Node(0, "host0", 123, 0))), new SummaryParameters(null), new ClusterParams("testhittype"), @@ -104,7 +368,8 @@ public class FastSearcherTestCase { public void testSinglePassGroupingIsNotForcedWithSingleNodeGroups() { MockDispatcher dispatcher = MockDispatcher.create(ImmutableList.of(new Node(0, "host0", 123, 0), new Node(2, "host1", 123, 0))); - FastSearcher fastSearcher = new FastSearcher("container.0", + FastSearcher fastSearcher = new FastSearcher(new MockBackend(), + new FS4ResourcePool("container.0", 1), dispatcher, new SummaryParameters(null), new ClusterParams("testhittype"), @@ -136,4 +401,95 @@ public class FastSearcherTestCase { assertForceSinglePassIs(expected, child); } + @Test + public void testPing() throws IOException, InterruptedException { + Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); + BackendTestCase.MockServer server = new BackendTestCase.MockServer(); + FS4ResourcePool listeners = new FS4ResourcePool(new Fs4Config(new Fs4Config.Builder()), new QrConfig(new QrConfig.Builder())); + Backend backend = listeners.getBackend(server.host.getHostString(),server.host.getPort()); + FastSearcher fastSearcher = new FastSearcher(backend, + new FS4ResourcePool("container.0", 1), + MockDispatcher.create(Collections.emptyList()), + new SummaryParameters(null), + new ClusterParams("testhittype"), + documentdbInfoConfig); + server.dispatch.packetData = BackendTestCase.PONG; + server.dispatch.setNoChannel(); + Chain<Searcher> chain = new Chain<>(fastSearcher); + Execution e = new Execution(chain, Execution.Context.createContextStub()); + Pong pong = e.ping(new Ping()); + backend.shutdown(); + server.dispatch.socket.close(); + server.dispatch.connection.close(); + server.worker.join(); + pong.setPingInfo("blbl"); + assertEquals("Result of pinging using blbl", pong.toString()); + } + + private void assertCorrectTypes1(FastHit hit) { + assertEquals(String.class, hit.getField("TITLE").getClass()); + assertEquals(Integer.class, hit.getField("BYTES").getClass()); + } + + private void assertCorrectHit1(FastHit hit) { + assertEquals( + "StudyOfMadonna.com - Interviews, Articles, Reviews, Quotes, Essays and more..", + hit.getField("TITLE")); + assertEquals("352", hit.getField("WORDS").toString()); + assertEquals(2003., hit.getRelevance().getScore(), 0.01d); + assertEquals("index:testhittype/234/" + asHexString(hit.getGlobalId()), hit.getId().toString()); + assertEquals("9190", hit.getField("BYTES").toString()); + assertEquals("testhittype", hit.getSource()); + } + + private static String asHexString(GlobalId gid) { + StringBuilder sb = new StringBuilder(); + byte[] rawGid = gid.getRawId(); + for (byte b : rawGid) { + String hex = Integer.toHexString(0xFF & b); + if (hex.length() == 1) + sb.append('0'); + sb.append(hex); + } + return sb.toString(); + } + + @Test + public void null_summary_is_included_in_trace() { + String summary = null; + assertThat(getTraceString(summary), containsString("summary=[null]")); + } + + @Test + public void non_null_summary_is_included_in_trace() { + String summary = "all"; + assertThat(getTraceString(summary), containsString("summary='all'")); + } + + private String getTraceString(String summary) { + FastSearcher fastSearcher = createFastSearcher(); + + Query query = new Query("?query=ignored"); + query.getPresentation().setSummary(summary); + query.setTraceLevel(2); + + Result result = doSearch(fastSearcher, query, 0, 10); + doFill(fastSearcher, result); + + Trace trace = query.getContext(false).getTrace(); + final AtomicReference<String> fillTraceString = new AtomicReference<>(); + + + trace.traceNode().accept(new TraceVisitor() { + @Override + public void visit(TraceNode traceNode) { + if (traceNode.payload() instanceof String && traceNode.payload().toString().contains("fill to dispatch")) + fillTraceString.set((String) traceNode.payload()); + + } + }); + + return fillTraceString.get(); + } + } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java new file mode 100644 index 00000000000..6eab16045c2 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java @@ -0,0 +1,127 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch.test; + +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.container.QrSearchersConfig; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.net.HostName; +import com.yahoo.prelude.fastsearch.ClusterParams; +import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; +import com.yahoo.prelude.fastsearch.FastSearcher; +import com.yahoo.prelude.fastsearch.SummaryParameters; +import com.yahoo.prelude.fastsearch.test.fs4mock.MockBackend; +import com.yahoo.prelude.fastsearch.test.fs4mock.MockFS4ResourcePool; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.rpc.MockRpcResourcePoolBuilder; +import com.yahoo.search.dispatch.rpc.RpcResourcePool; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.searchchain.Execution; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * @author bratseth + */ +class FastSearcherTester { + + public static final String selfHostname = HostName.getLocalhost(); + + private final MockFS4ResourcePool mockFS4ResourcePool; + private final RpcResourcePool mockRpcResourcePool; + private final FastSearcher fastSearcher; + private final MockDispatcher mockDispatcher; + private final VipStatus vipStatus; + + public FastSearcherTester(int containerClusterSize, Node searchNode) { + this(containerClusterSize, Collections.singletonList(searchNode)); + } + + public FastSearcherTester(int containerClusterSize, String... hostAndPortAndGroupStrings) { + this(containerClusterSize, toNodes(hostAndPortAndGroupStrings)); + } + + public FastSearcherTester(int containerClusterSize, List<Node> searchNodes) { + String clusterId = "a"; + + var b = new QrSearchersConfig.Builder(); + var searchClusterB = new QrSearchersConfig.Searchcluster.Builder(); + searchClusterB.name(clusterId); + b.searchcluster(searchClusterB); + vipStatus = new VipStatus(b.build()); + + mockFS4ResourcePool = new MockFS4ResourcePool(); + var builder = new MockRpcResourcePoolBuilder(); + searchNodes.forEach(node -> builder.connection(node.key())); + mockRpcResourcePool = builder.build(); + mockDispatcher = MockDispatcher.create(searchNodes, mockFS4ResourcePool, mockRpcResourcePool, containerClusterSize, vipStatus); + fastSearcher = new FastSearcher(new MockBackend(selfHostname, 0L, true), + mockFS4ResourcePool, + mockDispatcher, + new SummaryParameters(null), + new ClusterParams("testhittype"), + new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder())); + } + + private static List<Node> toNodes(String... hostAndPortAndGroupStrings) { + List<Node> nodes = new ArrayList<>(); + int key = 0; + for (String s : hostAndPortAndGroupStrings) { + String[] parts = s.split(":"); + nodes.add(new Node(key++, parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]))); + } + return nodes; + } + + public Result search(String query) { + Result result = fastSearcher.search(new Query(query), new Execution(Execution.Context.createContextStub())); + assertEquals(null, result.hits().getError()); + return result; + } + + /** Returns the number of times a backend for this hostname and port has been requested */ + public int requestCount(String hostname, int port) { + return mockFS4ResourcePool.requestCount(hostname, port); + } + + public MockDispatcher dispatcher() { return mockDispatcher; } + + /** Sets the response status of a node and ping it to update the monitor status */ + public void setResponding(String hostname, boolean responding) { + // Start/stop returning a failing backend + mockFS4ResourcePool.setResponding(hostname, responding); + + // Make the search cluster monitor notice right now in this thread + Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); + mockDispatcher.searchCluster().ping(node, MoreExecutors.directExecutor()); + } + + /** Sets the response status of a node and ping it to update the monitor status */ + public void setActiveDocuments(String hostname, long activeDocuments) { + mockFS4ResourcePool.setActiveDocuments(hostname, activeDocuments); + + // Make the search cluster monitor notice right now in this thread + Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); + mockDispatcher.searchCluster().ping(node, MoreExecutors.directExecutor()); + mockDispatcher.searchCluster().pingIterationCompleted(); + } + + public VipStatus vipStatus() { return vipStatus; } + + /** Retrying is needed because earlier pings from the monitoring thread may interfere with the testing thread */ + public void waitForInRotationIs(boolean expectedRotationStatus) { + int triesLeft = 9000; + while (vipStatus.isInRotation() != expectedRotationStatus && triesLeft > 0) { + triesLeft--; + try { Thread.sleep(10); } catch (InterruptedException e) {} + } + if (triesLeft == 0) + fail("Did not reach VIP in rotation status = " + expectedRotationStatus + " after trying for 90 seconds"); + } + +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java index afb9cf6f571..ccb265b799b 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java @@ -2,6 +2,8 @@ package com.yahoo.prelude.fastsearch.test; import com.yahoo.container.handler.VipStatus; +import com.yahoo.prelude.fastsearch.FS4PingFactory; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -13,24 +15,23 @@ import java.util.List; class MockDispatcher extends Dispatcher { public static MockDispatcher create(List<Node> nodes) { + var fs4ResourcePool = new FS4ResourcePool("container.0", 1); var rpcResourcePool = new RpcResourcePool(toDispatchConfig(nodes)); - return create(nodes, rpcResourcePool, 1, new VipStatus()); + return create(nodes, fs4ResourcePool, rpcResourcePool, 1, new VipStatus()); } - public static MockDispatcher create(List<Node> nodes, RpcResourcePool rpcResourcePool, + public static MockDispatcher create(List<Node> nodes, FS4ResourcePool fs4ResourcePool, RpcResourcePool rpcResourcePool, int containerClusterSize, VipStatus vipStatus) { var dispatchConfig = toDispatchConfig(nodes); var searchCluster = new SearchCluster("a", dispatchConfig, containerClusterSize, vipStatus); - return new MockDispatcher(searchCluster, dispatchConfig, rpcResourcePool); + return new MockDispatcher(searchCluster, dispatchConfig, fs4ResourcePool, rpcResourcePool); } - private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcResourcePool rpcResourcePool) { - this(searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig.dispatchWithProtobuf())); - } - - private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcInvokerFactory invokerFactory) { - super(searchCluster, dispatchConfig, invokerFactory, invokerFactory, new MockMetric()); + private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, + RpcResourcePool rpcResourcePool) { + super(searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig.dispatchWithProtobuf()), + new FS4PingFactory(fs4ResourcePool), new MockMetric()); } private static DispatchConfig toDispatchConfig(List<Node> nodes) { diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/DispatchThread.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/DispatchThread.java new file mode 100644 index 00000000000..d09e8856ee7 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/DispatchThread.java @@ -0,0 +1,101 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- +// +// +package com.yahoo.prelude.fastsearch.test.fs4mock; + + +import com.yahoo.prelude.ConfigurationException; + + +/** + * Thread-wrapper for MockFDispatch + * + * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a> + */ +public class DispatchThread extends Thread { + int listenPort; + long replyDelay; + long byteDelay; + MockFDispatch dispatch; + Object barrier = new Object(); + + /** + * Instantiate MockFDispatch; if the wanted port is taken we + * bump the port number. Note that the delays are not + * accurate: in reality they will be significantly longer for + * low values. + * + * @param listenPort Wanted port number, note that this may be + * bumped if someone is already running something + * on this port, so it is a starting point for + * scanning only + * @param replyDelay how many milliseconds we should delay when + * replying + * @param byteDelay how many milliseconds we delay for each byte + * written + */ + + public DispatchThread(int listenPort, long replyDelay, long byteDelay) { + this.listenPort = listenPort; + this.replyDelay = replyDelay; + this.byteDelay = byteDelay; + dispatch = new MockFDispatch(listenPort, replyDelay, byteDelay); + dispatch.setBarrier(barrier); + } + + /** + * Run the MockFDispatch and anticipate multiple instances of + * same running. + */ + public void run() { + int maxTries = 20; + // the following section is here to make sure that this + // test is somewhat robust, ie. if someone is already + // listening to the port in question, we'd like to NOT + // fail, but keep probing until we find a port we can use. + boolean up = false; + + while ((!up) && (maxTries-- != 0)) { + try { + dispatch.run(); + up = true; + } catch (ConfigurationException e) { + listenPort++; + dispatch.setListenPort(listenPort); + } + } + } + + /** + * Wait until MockFDispatch is ready to accept connections + * or we time out and indicate which of the two outcomes it was. + * + * @return If we time out we return <code>false</code>. Else we + * return <code>true</code> + * + */ + public boolean waitOnBarrier(long timeout) throws InterruptedException { + long start = System.currentTimeMillis(); + + synchronized (barrier) { + barrier.wait(timeout); + } + long diff = System.currentTimeMillis() - start; + + return (diff < timeout); + } + + /** + * Return the port on which the MockFDispatch actually listens. + * use this instead of assuming where it is since, if more than + * one application tries to use the port we've assigned to it + * we might have to up the port number. + * + * @return port number of active MockFDispatch instance + * + */ + public int listenPort() { + return listenPort; + } +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockBackend.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockBackend.java new file mode 100644 index 00000000000..d3fbf8f3645 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockBackend.java @@ -0,0 +1,53 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch.test.fs4mock; + +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; + +/** + * @author bratseth + */ +public class MockBackend extends Backend { + + private String hostname; + private final long activeDocumentsInBackend; + private final boolean working; + + /** Created lazily as we want to have just one but it depends on the channel */ + private MockFSChannel channel = null; + + public MockBackend() { + this("", 0L, true); + } + + public MockBackend(String hostname, long activeDocumentsInBackend, boolean working) { + super(); + this.hostname = hostname; + this.activeDocumentsInBackend = activeDocumentsInBackend; + this.working = working; + } + + @Override + public FS4Channel openChannel() { + if (channel == null) + channel = working ? new MockFSChannel(activeDocumentsInBackend, this) + : new NonWorkingMockFSChannel(this); + return channel; + } + + @Override + public FS4Channel openPingChannel() { return openChannel(); } + + @Override + public String getHost() { return hostname; } + + /** Returns the channel in use or null if no channel has been used yet */ + public MockFSChannel getChannel() { return channel; } + + public void shutdown() {} + + @Override + public boolean probeConnection() { + return working; + } +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java new file mode 100644 index 00000000000..6956f288d1a --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java @@ -0,0 +1,212 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch.test.fs4mock; + + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.yahoo.prelude.ConfigurationException; +import com.yahoo.prelude.fastsearch.test.DocsumDefinitionTestCase; + + +/** + * A server which replies to any query with the same query result after + * a configurable delay, with a configurable slowness (delay between each byte). + * Connections are never timed out. + * + * @author bratseth + */ +public class MockFDispatch { + + private static int connectionCount = 0; + + private static Logger log = Logger.getLogger(MockFDispatch.class.getName()); + + /** The port we accept incoming requests at */ + private int listenPort = 0; + + private long replyDelay; + + private long byteDelay; + + private Object barrier; + + private static byte[] queryResultPacketData = new byte[] { + 0, 0, 0, 64, 0, 0, + 0, 214 - 256, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 5, 0, 0, 0, + 25, 0, 0, 0, 111, 0, 0, 0, 97, 0, 0, 0, 3, 0, 0, 0, 23, 0, 0, 0, 7, 0, 0, + 0, 36, 0, 0, 0, 4, 0, 0, 0, 21, 0, 0, 0, 8, 0, 0, 0, 37}; + + private static byte[] docsumData = DocsumDefinitionTestCase.makeDocsum(); + + private static byte[] docsumHeadPacketData = new byte[] { + 0, 0, 3, 39, 0, 0, + 0, 205 - 256, 0, 0, 0, 1, 0, 0, 0, 0}; + + private static byte[] eolPacketData = new byte[] { + 0, 0, 0, 8, 0, 0, 0, + 200 - 256, 0, 0, 0, 1 }; + + private Set<ConnectionThread> connectionThreads = new HashSet<>(); + + public MockFDispatch(int listenPort, long replyDelay, long byteDelay) { + this.replyDelay = replyDelay; + this.byteDelay = byteDelay; + this.listenPort = listenPort; + } + + public void setBarrier(Object barrier) { + this.barrier = barrier; + } + + public void setListenPort(int listenPort) { + this.listenPort = listenPort; + } + + public void run() { + try { + ServerSocketChannel channel = createServerSocket(listenPort); + + channel.socket().setReuseAddress(true); + while (!Thread.currentThread().isInterrupted()) { + try { + // notify those waiting at the barrier that they + // can now proceed and talk to us + synchronized (barrier) { + if (barrier != null) { + barrier.notify(); + } + } + SocketChannel socketChannel = channel.accept(); + + connectionThreads.add(new ConnectionThread(socketChannel)); + } catch (ClosedByInterruptException e) {// We'll exit + } catch (ClosedChannelException e) { + return; + } catch (Exception e) { + log.log(Level.WARNING, "Unexpected error reading request", e); + } + } + channel.close(); + } catch (IOException e) { + throw new ConfigurationException("Socket channel failure", e); + } + } + + private ServerSocketChannel createServerSocket(int listenPort) + throws IOException { + ServerSocketChannel channel = ServerSocketChannel.open(); + ServerSocket socket = channel.socket(); + + socket.bind( + new InetSocketAddress(InetAddress.getLocalHost(), listenPort)); + String host = socket.getInetAddress().getHostName(); + + log.fine("Accepting dfispatch requests at " + host + ":" + listenPort); + return channel; + } + + public static void main(String[] args) { + log.setLevel(Level.FINE); + MockFDispatch m = new MockFDispatch(7890, Integer.parseInt(args[0]), + Integer.parseInt(args[1])); + + m.run(); + } + + private class ConnectionThread extends Thread { + + private ByteBuffer writeBuffer = ByteBuffer.allocate(2000); + + private ByteBuffer readBuffer = ByteBuffer.allocate(2000); + + private int connectionNr = 0; + + private SocketChannel channel; + + public ConnectionThread(SocketChannel channel) { + this.channel = channel; + fillBuffer(writeBuffer); + start(); + } + + private void fillBuffer(ByteBuffer buffer) { + buffer.clear(); + buffer.put(queryResultPacketData); + buffer.put(docsumHeadPacketData); + buffer.put(docsumData); + buffer.put(docsumHeadPacketData); + buffer.put(docsumData); + buffer.put(eolPacketData); + } + + public void run() { + connectionNr = connectionCount++; + log.fine("Opened connection " + connectionNr); + + try { + long lastRequest = System.currentTimeMillis(); + + while ((System.currentTimeMillis() - lastRequest) <= 5000 + && (!isInterrupted())) { + readBuffer.clear(); + channel.read(readBuffer); + lastRequest = System.currentTimeMillis(); + delay(replyDelay); + + if (byteDelay > 0) { + writeSlow(writeBuffer); + } else { + write(writeBuffer); + } + log.fine( + "Replied in " + + (System.currentTimeMillis() - lastRequest) + + " ms"); + } + + log.fine("Closing timed out connection " + connectionNr); + connectionCount--; + channel.close(); + } catch (IOException e) {} + } + + private void write(ByteBuffer writeBuffer) throws IOException { + writeBuffer.flip(); + channel.write(writeBuffer); + } + + private void writeSlow(ByteBuffer writeBuffer) throws IOException { + writeBuffer.flip(); + int dataSize = writeBuffer.limit(); + + for (int i = 0; i < dataSize; i++) { + writeBuffer.position(i); + writeBuffer.limit(i + 1); + channel.write(writeBuffer); + delay(byteDelay); + } + writeBuffer.limit(dataSize); + } + + private void delay(long delay) { + + try { + Thread.sleep(delay); + } catch (InterruptedException e) {} + } + + } + +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java new file mode 100644 index 00000000000..0d756cbeff3 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java @@ -0,0 +1,63 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch.test.fs4mock; + +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * @author bratseth + */ +public class MockFS4ResourcePool extends FS4ResourcePool { + + private final Map<String, Integer> requestsPerBackend = new HashMap<>(); + private final Set<String> nonRespondingBackends = new HashSet<>(); + private final Map<String, Long> activeDocumentsInBackend = new HashMap<>(); + private final long testingThreadId; + + public MockFS4ResourcePool() { + super("container.0", 1); + this.testingThreadId = Thread.currentThread().getId(); + } + + @Override + public Backend getBackend(String hostname, int port) { + countRequest(hostname + ":" + port); + if (nonRespondingBackends.contains(hostname)) + return new MockBackend(hostname, 0L, false); + else + return new MockBackend(hostname, activeDocumentsInBackend.getOrDefault(hostname, 0L), true); + } + + /** + * Returns the number of times a backend for this hostname and port has been requested + * from the thread creating this + */ + public int requestCount(String hostname, int port) { + return requestsPerBackend.getOrDefault(hostname + ":" + port, 0); + } + + /** Sets the number of active documents the given host will report to have in ping responses */ + public void setActiveDocuments(String hostname, long activeDocuments) { + activeDocumentsInBackend.put(hostname, activeDocuments); + } + + private void countRequest(String hostAndPort) { + // ignore requests from the ping thread to avoid timing issues + if (Thread.currentThread().getId() != testingThreadId) return; + + requestsPerBackend.put(hostAndPort, requestsPerBackend.getOrDefault(hostAndPort, 0) + 1); + } + + public void setResponding(String hostname, boolean responding) { + if (responding) + nonRespondingBackends.remove(hostname); + else + nonRespondingBackends.add(hostname); + } + +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFSChannel.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFSChannel.java new file mode 100644 index 00000000000..db14a2894db --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFSChannel.java @@ -0,0 +1,176 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch.test.fs4mock; + +import com.yahoo.document.GlobalId; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.BufferTooSmallException; +import com.yahoo.fs4.DocumentInfo; +import com.yahoo.fs4.EolPacket; +import com.yahoo.fs4.GetDocSumsPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.PacketDecoder; +import com.yahoo.fs4.PingPacket; +import com.yahoo.fs4.PongPacket; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.QueryResultPacket; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.prelude.fastsearch.test.DocsumDefinitionTestCase; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * A channel which returns hardcoded packets of the same type as fdispatch + */ +public class MockFSChannel extends FS4Channel { + + /** The number of active documents this should report in ping reponses */ + private final long activeDocuments; + + MockFSChannel(Backend backend) { + this(0, backend); + } + + MockFSChannel(long activeDocuments, Backend backend) { + super(backend, 0); + this.activeDocuments = activeDocuments; + } + + private BasicPacket lastReceived = null; + + private static QueryPacket lastQueryPacket = null; + + /** Initial value of docstamp */ + private static int docstamp = 1088490666; + + private static boolean emptyDocsums = false; + + @Override + public synchronized boolean sendPacket(BasicPacket packet) { + try { + if (packet instanceof Packet) + packet.encode(ByteBuffer.allocate(65536), 0); + } catch (BufferTooSmallException e) { + throw new RuntimeException("Too small buffer to encode packet in mock backend."); + } + + if (packet instanceof QueryPacket) + lastQueryPacket = (QueryPacket) packet; + + lastReceived = packet; + notifyMonitor(); + return true; + } + + /** Change docstamp to invalidate cache */ + public static void resetDocstamp() { + docstamp = 1088490666; + } + + /** Flip sending (in)valid docsums */ + public static void setEmptyDocsums(boolean d) { + emptyDocsums = d; + } + + /** Returns the last query packet received or null if none */ + public QueryPacket getLastQueryPacket() { + return lastQueryPacket; + } + + public BasicPacket getLastReceived() { + return lastReceived; + } + + public BasicPacket[] receivePackets(long timeout, int packetCount) { + List<BasicPacket> packets = new java.util.ArrayList<>(); + + if (lastReceived instanceof QueryPacket) { + lastQueryPacket = (QueryPacket) lastReceived; + QueryResultPacket result = QueryResultPacket.create(); + + result.setDocstamp(docstamp); + result.setChannel(0); + result.setTotalDocumentCount(2); + result.setOffset(lastQueryPacket.getOffset()); + + if (lastQueryPacket.getOffset() == 0 + && lastQueryPacket.getLastOffset() >= 1) { + result.addDocument( + new DocumentInfo(DocsumDefinitionTestCase.createGlobalId(123), + 2003, 234, 0)); + } + if (lastQueryPacket.getOffset() <= 1 + && lastQueryPacket.getLastOffset() >= 2) { + result.addDocument( + new DocumentInfo(DocsumDefinitionTestCase.createGlobalId(456), + 1855, 234, 1)); + } + packets.add(result); + } + else if (lastReceived instanceof GetDocSumsPacket) { + addDocsums(packets, lastQueryPacket); + } + else if (lastReceived instanceof PingPacket) { + packets.add(new PongPacket(activeDocuments)); + } + while (packetCount >= 0 && packets.size() > packetCount) { + packets.remove(packets.size() - 1); + } + + return packets.toArray(new BasicPacket[packets.size()]); + } + + /** Adds the number of docsums requested in queryPacket.getHits() */ + private void addDocsums(List<BasicPacket> packets, QueryPacket queryPacket) { + int numHits = queryPacket.getHits(); + + if (lastReceived instanceof GetDocSumsPacket) { + numHits = ((GetDocSumsPacket) lastReceived).getNumDocsums(); + } + for (int i = 0; i < numHits; i++) { + ByteBuffer buffer; + + if (emptyDocsums) { + buffer = createEmptyDocsumPacketData(); + } else { + int[] docids = { + 123, 456, 789, 789, 789, 789, 789, 789, 789, + 789, 789, 789 }; + + buffer = createDocsumPacketData(docids[i], DocsumDefinitionTestCase.makeDocsum()); + } + buffer.position(0); + packets.add(PacketDecoder.decode(buffer)); + } + packets.add(EolPacket.create()); + } + + private ByteBuffer createEmptyDocsumPacketData() { + ByteBuffer buffer = ByteBuffer.allocate(16); + + buffer.limit(buffer.capacity()); + buffer.position(0); + buffer.putInt(12); // length + buffer.putInt(205); // a code for docsumpacket + buffer.putInt(0); // channel + buffer.putInt(0); // dummy location + return buffer; + } + + private ByteBuffer createDocsumPacketData(int docid, byte[] docsumData) { + ByteBuffer buffer = ByteBuffer.allocate(docsumData.length + 4 + 8 + GlobalId.LENGTH); + + buffer.limit(buffer.capacity()); + buffer.position(0); + buffer.putInt(docsumData.length + 8 + GlobalId.LENGTH); + buffer.putInt(205); // Docsum packet code + buffer.putInt(0); + byte[] rawGid = DocsumDefinitionTestCase.createGlobalId(docid).getRawId(); + buffer.put(rawGid); + buffer.put(docsumData); + return buffer; + } + + public void close() {} +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/NonWorkingMockFSChannel.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/NonWorkingMockFSChannel.java new file mode 100644 index 00000000000..c7425afd611 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/NonWorkingMockFSChannel.java @@ -0,0 +1,21 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch.test.fs4mock; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.mplex.Backend; + +/** + * @author bratseth + */ +public class NonWorkingMockFSChannel extends MockFSChannel { + + public NonWorkingMockFSChannel(Backend backend) { + super(backend); + } + + @Override + public synchronized boolean sendPacket(BasicPacket bPacket) { + return false; + } + +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index 7ee62ae9978..42a22f6f86b 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -89,6 +89,11 @@ public class MockSearchCluster extends SearchCluster { } @Override + public ImmutableMultimap<String, Node> nodesByHost() { + return nodesByHost; + } + + @Override public Optional<Node> directDispatchTarget() { return Optional.empty(); } diff --git a/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java b/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java index 7bdf1916d85..6ed8a209cc5 100644 --- a/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java @@ -3,6 +3,7 @@ package com.yahoo.search.grouping.vespa; import com.yahoo.document.DocumentId; import com.yahoo.document.GlobalId; +import com.yahoo.fs4.QueryPacketData; import com.yahoo.net.URI; import com.yahoo.prelude.fastsearch.GroupingListHit; import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; @@ -61,6 +62,19 @@ public class HitConverterTestCase { } @Test + public void requireThatHitTagIsCopiedFromGroupingListContext() { + QueryPacketData ctxTag = new QueryPacketData(); + GroupingListHit ctxHit = context(); + ctxHit.setQueryPacketData(ctxTag); + + HitConverter converter = new HitConverter(new MySearcher(), new Query()); + Hit hit = converter.toSearchHit("default", new FS4Hit(1, createGlobalId(2), 3).setContext(ctxHit)); + assertNotNull(hit); + assertTrue(hit instanceof FastHit); + assertSame(ctxTag, ((FastHit)hit).getQueryPacketData()); + } + + @Test public void requireThatSummaryClassIsSet() { Searcher searcher = new MySearcher(); HitConverter converter = new HitConverter(searcher, new Query()); |