From 3c6887a27a5acafbbcb8d2b42dbb0c02caf1308b Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 19 Sep 2019 07:50:08 +0200 Subject: Revert "Revert "Revert "Revert "Revert "Revert "Balder/no more fs4 dispatching from fastsearcher""""."" --- .../src/main/java/com/yahoo/fs4/BasicPacket.java | 315 --------------- .../com/yahoo/fs4/BufferTooSmallException.java | 17 - .../com/yahoo/fs4/ChannelTimeoutException.java | 23 -- .../src/main/java/com/yahoo/fs4/DocsumPacket.java | 33 +- .../src/main/java/com/yahoo/fs4/DocumentInfo.java | 74 ---- .../src/main/java/com/yahoo/fs4/EolPacket.java | 35 -- .../src/main/java/com/yahoo/fs4/ErrorPacket.java | 48 --- .../src/main/java/com/yahoo/fs4/FS4Properties.java | 60 --- .../main/java/com/yahoo/fs4/GetDocSumsPacket.java | 202 +-------- .../main/java/com/yahoo/fs4/HexByteIterator.java | 42 -- .../src/main/java/com/yahoo/fs4/Packet.java | 114 ------ .../src/main/java/com/yahoo/fs4/PacketDecoder.java | 200 --------- .../src/main/java/com/yahoo/fs4/PacketDumper.java | 135 ------- .../main/java/com/yahoo/fs4/PacketListener.java | 16 - .../yahoo/fs4/PacketNotificationsBroadcaster.java | 34 -- .../main/java/com/yahoo/fs4/PacketQueryTracer.java | 53 --- .../src/main/java/com/yahoo/fs4/PingPacket.java | 26 -- .../src/main/java/com/yahoo/fs4/PongPacket.java | 92 ----- .../src/main/java/com/yahoo/fs4/QueryPacket.java | 215 ---------- .../main/java/com/yahoo/fs4/QueryPacketData.java | 91 ----- .../main/java/com/yahoo/fs4/QueryResultPacket.java | 228 ----------- .../src/main/java/com/yahoo/fs4/mplex/Backend.java | 449 --------------------- .../java/com/yahoo/fs4/mplex/ConnectionPool.java | 90 ----- .../main/java/com/yahoo/fs4/mplex/FS4Channel.java | 255 ------------ .../java/com/yahoo/fs4/mplex/FS4Connection.java | 371 ----------------- .../yahoo/fs4/mplex/InvalidChannelException.java | 15 - .../java/com/yahoo/fs4/mplex/ListenerPool.java | 56 --- .../src/main/java/com/yahoo/prelude/Pong.java | 31 +- .../com/yahoo/prelude/cluster/ClusterMonitor.java | 161 -------- .../com/yahoo/prelude/cluster/ClusterSearcher.java | 125 +----- .../com/yahoo/prelude/fastsearch/DataField.java | 2 - .../com/yahoo/prelude/fastsearch/DocsumField.java | 1 - .../com/yahoo/prelude/fastsearch/DoubleField.java | 3 - .../yahoo/prelude/fastsearch/FS4FillInvoker.java | 184 --------- .../yahoo/prelude/fastsearch/FS4PingFactory.java | 29 -- .../yahoo/prelude/fastsearch/FS4ResourcePool.java | 38 +- .../yahoo/prelude/fastsearch/FS4SearchInvoker.java | 220 ---------- .../java/com/yahoo/prelude/fastsearch/FastHit.java | 25 -- .../com/yahoo/prelude/fastsearch/FastSearcher.java | 125 +----- .../yahoo/prelude/fastsearch/GroupingListHit.java | 12 - .../com/yahoo/prelude/fastsearch/Int64Field.java | 3 - .../com/yahoo/prelude/fastsearch/IntegerField.java | 3 - .../yahoo/prelude/fastsearch/LongdataField.java | 3 - .../com/yahoo/prelude/fastsearch/ShortField.java | 3 - .../com/yahoo/prelude/fastsearch/StringField.java | 4 - .../prelude/fastsearch/VespaBackEndSearcher.java | 38 +- .../com/yahoo/prelude/fastsearch/XMLField.java | 7 +- .../java/com/yahoo/search/dispatch/Dispatcher.java | 22 +- .../com/yahoo/search/dispatch/InvokerResult.java | 3 - .../java/com/yahoo/search/dispatch/LeanHit.java | 10 +- .../search/dispatch/rpc/RpcInvokerFactory.java | 13 +- .../search/dispatch/searchcluster/Pinger.java | 40 -- .../dispatch/searchcluster/SearchCluster.java | 44 +- .../yahoo/search/grouping/vespa/HitConverter.java | 2 - .../streamingvisitors/VdsStreamingSearcher.java | 28 +- .../com/yahoo/fs4/PacketQueryTracerTestCase.java | 110 ----- .../java/com/yahoo/fs4/mplex/BackendTestCase.java | 208 ---------- .../yahoo/fs4/test/GetDocSumsPacketTestCase.java | 107 ----- .../yahoo/fs4/test/HexByteIteratorTestCase.java | 42 -- .../com/yahoo/fs4/test/PacketDecoderTestCase.java | 186 --------- .../java/com/yahoo/fs4/test/PacketTestCase.java | 229 ----------- .../com/yahoo/fs4/test/QueryResultTestCase.java | 113 ------ .../java/com/yahoo/fs4/test/QueryTestCase.java | 319 --------------- .../com/yahoo/fs4/test/RankFeaturesTestCase.java | 136 ------- .../prelude/cluster/ClusterSearcherTestCase.java | 67 ++- .../fastsearch/FS4SearchInvokerTestCase.java | 72 ---- .../fastsearch/test/DirectSearchTestCase.java | 137 ------- .../fastsearch/test/FastSearcherTestCase.java | 364 +---------------- .../fastsearch/test/FastSearcherTester.java | 127 ------ .../prelude/fastsearch/test/MockDispatcher.java | 19 +- .../fastsearch/test/fs4mock/DispatchThread.java | 101 ----- .../fastsearch/test/fs4mock/MockBackend.java | 53 --- .../fastsearch/test/fs4mock/MockFDispatch.java | 212 ---------- .../test/fs4mock/MockFS4ResourcePool.java | 63 --- .../fastsearch/test/fs4mock/MockFSChannel.java | 176 -------- .../test/fs4mock/NonWorkingMockFSChannel.java | 21 - .../yahoo/search/dispatch/MockSearchCluster.java | 5 - .../grouping/vespa/HitConverterTestCase.java | 14 - .../search/query/test/RankFeaturesTestCase.java | 140 +++++++ 79 files changed, 257 insertions(+), 7232 deletions(-) delete mode 100644 container-search/src/main/java/com/yahoo/fs4/BasicPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/EolPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/FS4Properties.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/Packet.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketDumper.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketListener.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PingPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PongPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/QueryPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java delete mode 100644 container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java delete mode 100644 container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java delete mode 100644 container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java delete mode 100644 container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/PacketQueryTracerTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/mplex/BackendTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/test/GetDocSumsPacketTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/test/HexByteIteratorTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/test/PacketDecoderTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/test/PacketTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/test/QueryResultTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/fs4/test/RankFeaturesTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/FS4SearchInvokerTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/DirectSearchTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/DispatchThread.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockBackend.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFSChannel.java delete mode 100644 container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/NonWorkingMockFSChannel.java create mode 100644 container-search/src/test/java/com/yahoo/search/query/test/RankFeaturesTestCase.java (limited to 'container-search/src') diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java deleted file mode 100644 index f87721dc503..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java +++ /dev/null @@ -1,315 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.log.LogLevel; -import com.yahoo.prelude.fastsearch.TimeoutException; -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.logging.Logger; - -/** - * Superclass of fs4 packets - * - * @author bratseth - */ -public abstract class BasicPacket { - - private final Compressor compressor = new Compressor(); - - private static Logger log = Logger.getLogger(QueryResultPacket.class.getName()); - private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024); - public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags. - - private byte[] encodedBody; - - /** The length of this packet in bytes or -1 if not known */ - protected int length = -1; - - /** - * A timestamp which can be set or inspected by clients of this class - * but which is never updated by the class itself. This is mostly - * a convenience for when you need to queue packets or retain them - * in some structure where their validity is limited by a timeout - * or similar. - */ - private long timeStamp = -1; - - private int compressionLimit = 0; - - private CompressionType compressionType; - - /** - * Sets the number of bytes the package must be before activating compression. - * A value of 0 means no compression. - * - * @param limit smallest package size that triggers compression. - */ - public void setCompressionLimit(int limit) { compressionLimit = limit; } - - public void setCompressionType(String type) { - compressionType = CompressionType.valueOf(type); - } - - /** - * Fills this package from a byte buffer positioned at the first byte of the package - * - * @return this for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public BasicPacket decode(ByteBuffer buffer) { - length = buffer.getInt()+4; // Streamed packet length is the length-4 - int code = buffer.getInt(); - - decodeAndDecompressBody(buffer, code, length - 2*4); - return this; - } - - protected void decodeAndDecompressBody(ByteBuffer buffer, int code, int packetLength) { - byte compressionType = (byte)((code & ~CODE_MASK) >> 24); - boolean isCompressed = compressionType != 0; - codeDecodedHook(code & CODE_MASK); - if (isCompressed) { - int uncompressedSize = buffer.getInt(); - int compressedSize = packetLength - 4; - int offset = 0; - byte[] compressedData; - if (buffer.hasArray()) { - compressedData = buffer.array(); - offset = buffer.arrayOffset() + buffer.position(); - buffer.position(buffer.position() + compressedSize); - } else { - compressedData = new byte[compressedSize]; - buffer.get(compressedData); - } - byte[] body = compressor.decompress(CompressionType.valueOf(compressionType), compressedData, offset, - uncompressedSize, Optional.of(compressedSize)); - ByteBuffer bodyBuffer = ByteBuffer.wrap(body); - length += uncompressedSize - (compressedSize + 4); - decodeBody(bodyBuffer); - } else { - decodeBody(buffer); - } - } - - /** - * Decodes the body of this package from a byte buffer - * positioned at the first byte of the package. - * - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public void decodeBody(ByteBuffer buffer) { - throw new UnsupportedOperationException("Decoding of " + this + " is not implemented"); - } - - /** - * Called when the packet code is decoded. - * This default implementation just throws an exception if the code - * is not the code of this packet. Packets which has several possible codes - * will use this method to store the code. - */ - protected void codeDecodedHook(int code) { - if (code != getCode()) - throw new RuntimeException("Can not decode " + code + " into " + this); - } - - /** - *

Encodes this package onto the given buffer at the current position. - * The position of the buffer after encoding is the byte following - * the last encoded byte.

- * - *

This method will ensure that everything is written provided - * sufficient capacity regardless of the buffer limit. - * When returning, the limit is at the end of the package (qual to the - * position).

- * - * @return this for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public BasicPacket encode(ByteBuffer buffer) throws BufferTooSmallException { - int oldLimit = buffer.limit(); - int startPosition = buffer.position(); - - buffer.limit(buffer.capacity()); - try { - buffer.putInt(4); // Real length written later, when we know it - buffer.putInt(getCode()); - - encodeAndCompressBody(buffer, startPosition); - } - catch (java.nio.BufferOverflowException e) { - // reset buffer to expected state - buffer.position(startPosition); - buffer.limit(oldLimit); - throw new BufferTooSmallException("Destination buffer too small while encoding packet"); - } - - return this; - } - - protected void encodeAndCompressBody(ByteBuffer buffer, int startPosition) { - int startOfBody = buffer.position(); - encodeBody(buffer); - setEncodedBody(buffer, startOfBody, buffer.position() - startOfBody); - length = buffer.position() - startPosition; - - if (compressionLimit != 0 && length-4 > compressionLimit) { - byte[] compressedBody; - compressionType = CompressionType.LZ4; - LZ4Factory factory = LZ4Factory.fastestInstance(); - LZ4Compressor compressor = factory.fastCompressor(); - compressedBody = compressor.compress(encodedBody); - - log.log(LogLevel.DEBUG, "Uncompressed size: " + encodedBody.length + ", Compressed size: " + compressedBody.length); - if (compressedBody.length + 4 < encodedBody.length) { - buffer.position(startPosition); - buffer.putInt(compressedBody.length + startOfBody - startPosition + 4 - 4); // +4 for compressed size - buffer.putInt(getCompressedCode(compressionType)); - buffer.position(startOfBody); - buffer.putInt(encodedBody.length); - buffer.put(compressedBody); - buffer.limit(buffer.position()); - return; - } - } - buffer.putInt(startPosition, length - 4); // Encoded length 4 less than actual length - buffer.limit(buffer.position()); - } - - private int getCompressedCode(CompressionType compression) { - int code = compression.getCode(); - return getCode() | (code << 24); - } - - /** - * Encodes the body of this package onto the given buffer at the current position. - * The position of the buffer after encoding is the byte following - * the last encoded byte. - * - * @throws UnsupportedOperationException if not implemented in the subclass - */ - protected void encodeBody(ByteBuffer buffer) { - throw new UnsupportedOperationException("Encoding of " + this + " is not implemented"); - } - - private void setEncodedBody(ByteBuffer b, int start, int length) { - encodedBody = new byte[length]; - b.position(start); - b.get(encodedBody); - } - - public boolean isEncoded() { - return encodedBody != null; - } - - /** - * Just a place holder to make the APIs simpler. - */ - public Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException { - throw new UnsupportedOperationException("This class does not support a channel ID"); - } - - /** - * Allocate the needed buffers and encode the packet using the given - * channel ID (if pertinent). - * - * If this packet does not use a channel ID, the ID will be ignored. - */ - private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) { - while (true) { - try { - if (hasChannelId()) { - encode(buffer, channelId); - } else { - encode(buffer); - } - buffer.flip(); - break; - } - catch (BufferTooSmallException e) { - buffer = ByteBuffer.allocate(buffer.capacity()*2); - } - } - return buffer; - } - - /** - * Return buffer containing the encoded form of this package and - * remove internal reference to it. - */ - public final ByteBuffer grantEncodingBuffer(int channelId) { - return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE)); - } - - public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) { - return allocateAndEncode(channelId, buffer); - } - - /** Returns the code of this package */ - public abstract int getCode(); - - /** - * Returns the length of this body (including header (8 bytes) and body), - * or -1 if not known. - * Note that the streamed packet format length is 4 bytes less than this length, - * for unknown reasons. - * The length is always known when decodeBody is called. - */ - public int getLength() { - return length; - } - - /** - * Set the timestamp field of the packet. - * - * A timestamp which can be set or inspected by clients of this class - * but which is never updated by the class itself. This is mostly - * a convenience for when you need to queue packets or retain them - * in some structure where their validity is limited by a timeout - * or similar. - */ - public void setTimestamp (long timeStamp) { - this.timeStamp = timeStamp; - } - - /** - * Get the timestamp field of this packet. Note that this is - * not part of the FS4 protocol. @see #setTimestamp for - * more information - * - */ - public long getTimestamp () { - return timeStamp; - } - - public String toString() { - return "packet with code " + getCode(); - } - - /** Whether this is a packets which can encode a channel ID. */ - public boolean hasChannelId() { - return false; - } - - /** - * Throws an IOException if the packet is not of the expected type - */ - public void ensureInstanceOf(Class type, String name) throws IOException { - if ((type.isAssignableFrom(getClass()))) return; - - if (this instanceof ErrorPacket) { - ErrorPacket errorPacket = (ErrorPacket) this; - if (errorPacket.getErrorCode() == 8) - throw new TimeoutException("Query timed out in " + name); - else - throw new IOException("Received error from backend in " + name + ": " + this); - } else { - throw new IOException("Received " + this + " when expecting " + type); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java b/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java deleted file mode 100644 index 38f312e6c8d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- -// -// - -package com.yahoo.fs4; -/** - * Signal that the buffer used to hold a packet is too small - * - * @author Bjorn Borud - */ -@SuppressWarnings("serial") -public class BufferTooSmallException extends Exception { - public BufferTooSmallException (String message) { - super(message); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java b/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java deleted file mode 100644 index fe86e7273c0..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- -// - -package com.yahoo.fs4; - - -/** - * Signal that a timeout occurred in the Channel2 communiction - * - * @author Bjorn Borud - * - */ -@SuppressWarnings("serial") -public class ChannelTimeoutException extends Exception -{ - public ChannelTimeoutException (String msg) { - super(msg); - } - - public ChannelTimeoutException () { - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java b/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java index ad679e0c53c..3105b645cd0 100644 --- a/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java @@ -1,10 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4; -import com.yahoo.document.GlobalId; - -import java.nio.ByteBuffer; - /** * An "extended query result" packet. This is the query result * packets used today, they allow more flexible sets of parameters @@ -12,15 +8,10 @@ import java.nio.ByteBuffer; * * @author bratseth */ -public class DocsumPacket extends Packet { - - private GlobalId globalId = new GlobalId(new byte[GlobalId.LENGTH]); +public class DocsumPacket { private byte[] data; - private DocsumPacket() { - } - /** * Constructor used by streaming search */ @@ -28,31 +19,11 @@ public class DocsumPacket extends Packet { data = buffer.clone(); } - public static DocsumPacket create() { - return new DocsumPacket(); - } - - public int getCode() { return 205; } - - /** - * Fills this packet from a byte buffer positioned at the - * first byte of the packet - */ - public void decodeBody(ByteBuffer buffer) { - byte[] rawGid = new byte[GlobalId.LENGTH]; - buffer.get(rawGid); - globalId = new GlobalId(rawGid); - data=new byte[getLength()-12-GlobalId.LENGTH]; - buffer.get(data); - } - - public GlobalId getGlobalId() { return globalId; } public byte[] getData() { return data; } public String toString() { - return "docsum packet [globalId: " + globalId.toString() + - ", size: " + (data==null ? "(no data)" : data.length + " bytes") + " ]"; + return "docsum packet size: " + (data==null ? "(no data)" : data.length + " bytes") + " ]"; } } diff --git a/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java b/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java deleted file mode 100644 index 8294ae5796d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.document.GlobalId; - -import java.nio.ByteBuffer; - -/** - * Meta attributes on documents (not the document summaries themselves). - * Used in query results and get docusum packages - * - * @author bratseth - */ -public class DocumentInfo implements Cloneable { - - private final byte [] globalId; - private final double metric; - private final int partId; - private final int distributionKey; - private final byte[] sortData; - - DocumentInfo(ByteBuffer buffer, QueryResultPacket owner, byte[] sortData) { - globalId = new byte[GlobalId.LENGTH]; - buffer.get(globalId); - metric = decodeMetric(buffer); - partId = owner.getMldFeature() ? buffer.getInt() : 0; - distributionKey = owner.getMldFeature() ? buffer.getInt() : 0; - this.sortData = sortData; - } - - public DocumentInfo(GlobalId globalId, int metric, int partId, int distributionKey) { - this.globalId = globalId.getRawId(); - this.metric = metric; - this.partId = partId; - this.distributionKey = distributionKey; - this.sortData = null; - } - - private double decodeMetric(ByteBuffer buffer) { - return buffer.getDouble(); - } - - public GlobalId getGlobalId() { return new GlobalId(globalId); } - public byte [] getRawGlobalId() { return globalId; } - - /** Raw rank score */ - public double getMetric() { return metric; } - - /** Partition this document resides on */ - public int getPartId() { return partId; } - - /** Unique key for the node this document resides on */ - public int getDistributionKey() { return distributionKey; } - - public byte[] getSortData() { - return sortData; - } - - public String toString() { - return "document info [globalId=" + new GlobalId(globalId).toString() + ", metric=" + metric + "]"; - } - - /** - * Implements the Cloneable interface - */ - public Object clone() { - try { - return super.clone(); - } - catch (CloneNotSupportedException e) { - throw new RuntimeException("Someone inserted a nonclonable superclass"); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/EolPacket.java b/container-search/src/main/java/com/yahoo/fs4/EolPacket.java deleted file mode 100644 index 1e907f67696..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/EolPacket.java +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * A EOL packet signaling end of transmission. - * This package has no body. - * - * @author bratseth - */ -public class EolPacket extends Packet { - - private EolPacket() { - } - - public static EolPacket create() { - return new EolPacket(); - } - - public int getCode() { return 200; } - - public void decodeBody(ByteBuffer buffer) { - // No body - } - - public void encodeBody(ByteBuffer buffer) { - // No body - } - - public String toString() { - return "EOL packet"; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java b/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java deleted file mode 100644 index f21663272d4..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.text.Utf8; - -/** - * - * An error packet signaling that an error occurred. - * - * @author Bjørn Borud - */ -public class ErrorPacket extends Packet { - private int errorCode; - private int errmsgLen; - private String message; - - private ErrorPacket() { - } - - public static ErrorPacket create() { - return new ErrorPacket(); - } - - public int getCode() { return 203; } - - public void decodeBody(ByteBuffer buffer) { - errorCode = buffer.getInt(); - errmsgLen = buffer.getInt(); - - byte[] tmp = new byte[errmsgLen]; - buffer.get(tmp); - - message = Utf8.toString(tmp); - } - - public int getErrorCode () { return errorCode; } - - public void encodeBody(ByteBuffer buffer) { - // No body - } - - public String toString() { - return (message + " (" + errorCode + ")"); - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java b/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java deleted file mode 100644 index f5f1fca0801..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.text.Utf8; - -import java.nio.ByteBuffer; - -public class FS4Properties { - private String name; - - static public class Entry { - public final String key; - private final byte [] val; - public Entry(byte[] k, byte[] v) { - key = Utf8.toString(k); - val = v; - } - public final byte [] getValue() { return val; } - }; - - private Entry[] entries; - - void decode(ByteBuffer buffer) { - int nameLen = buffer.getInt(); - byte[] utf8name = new byte[nameLen]; - buffer.get(utf8name); - this.setName(Utf8.toString(utf8name)); - - int n = buffer.getInt(); - setEntries(new Entry[n]); - for (int j = 0; j < n; j++) { - int keyLen = buffer.getInt(); - byte[] key = new byte[keyLen]; - buffer.get(key); - - int valLen = buffer.getInt(); - byte[] value = new byte[valLen]; - buffer.get(value); - - getEntries()[j] = new Entry(key, value); - } - } - - public Entry[] getEntries() { - return entries; - } - - public void setEntries(Entry[] entries) { - this.entries = entries; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java b/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java index 7353d0730a4..6a808e17b5c 100644 --- a/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java @@ -1,215 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4; -import com.yahoo.document.GlobalId; -import com.yahoo.log.LogLevel; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.query.Item; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.result.Hit; -import com.yahoo.text.Utf8; - -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.logging.Logger; - /** *

A packet for requesting a list of document summaries. * This packet can be encoded only.

* * @author bratseth */ -public class GetDocSumsPacket extends Packet { +public class GetDocSumsPacket { /** Session id key. Yep, putting this here is ugly as hell */ public static final String sessionIdKey = "sessionId"; - private static final Logger log = Logger.getLogger(GetDocSumsPacket.class.getName()); - private final Result result; - private final Query query; - private final String summaryClass; - private QueryPacketData queryPacketData = null; - private int flags = 0; - - /** - * True if we should send the query with this docsum, false otherwise. - * Sending the query is necessary if we need to return summary features or generate a dynamic summary - */ - private final boolean sendQuery; - - private GetDocSumsPacket(Result result, String summaryClass, boolean sendQuery) { - this.result = result; - this.query = result.getQuery(); - this.summaryClass = summaryClass; - this.sendQuery = sendQuery; - } - - /** - * Creates a get docsums packet for a certain result - */ - public static GetDocSumsPacket create(Result result, String summaryClass, boolean sendQuery) { - return new GetDocSumsPacket(result, summaryClass, sendQuery); - } - - /** - * features bits, as given in searchlib/src/searchlib/common/packets.h - * definition of enum getdocsums_features - */ - public static final int GDF_MLD = 0x00000001; - public static final int GDF_QUERYSTACK = 0x00000004; - public static final int GDF_RANKP_QFLAGS = 0x00000010; - public static final int GDF_LOCATION = 0x00000080; - public static final int GDF_RESCLASSNAME = 0x00000800; - public static final int GDF_PROPERTIES = 0x00001000; - public static final int GDF_FLAGS = 0x00002000; - - public void encodeBody(ByteBuffer buffer) { - setFieldsFromHits(); - - boolean useQueryCache = query.getRanking().getQueryCache(); - // If feature cache is used we need to include the sessionId as key. - if (useQueryCache) { // TODO: Move this decision (and the key) to ranking - query.getRanking().getProperties().put(sessionIdKey, query.getSessionId().toString()); - } - - // set the default features - long features = GDF_MLD; - if (sendQuery) - features |= GDF_QUERYSTACK; - features |= GDF_RANKP_QFLAGS; - - // do we want a specific result class? - if (summaryClass != null) - features |= GDF_RESCLASSNAME; - if (query.getRanking().getLocation() != null) - features |= GDF_LOCATION; - if (query.hasEncodableProperties()) - features |= GDF_PROPERTIES; - if (flags != 0) { - features |= GDF_FLAGS; - } - buffer.putInt((int)features); - buffer.putInt(0); //Unused, was docstamp - long timeLeft = query.getTimeLeft(); - buffer.putInt(Math.max(1, (int)timeLeft)); // Safety to avoid sending down 0 or negative number - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, "Timeout from query(" + query.getTimeout() + "), sent to backend: " + timeLeft); - } - - if (queryPacketData != null) - encodeQueryFromPacketData(buffer, useQueryCache); - else - encodeQuery(buffer); - - if (flags != 0) - buffer.putInt(flags); - encodeDocIds(buffer); - } - - private void setFieldsFromHits() { - for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { - Hit h = i.next(); - if (h instanceof FastHit) { - FastHit hit = (FastHit)h; - QueryPacketData tag = hit.getQueryPacketData(); - if (tag != null) { - this.queryPacketData = tag; - break; - } - } - } - } - - private void encodeQueryFromPacketData(ByteBuffer buffer, boolean reencodePropertyMaps) { - queryPacketData.encodeRankProfile(buffer); - queryPacketData.encodeQueryFlags(buffer); - - encodeSummaryClass(buffer); - - if (reencodePropertyMaps || ! sendQuery) // re-encode when we're not sending query, to avoid resending all encoded properties - query.encodeAsProperties(buffer, sendQuery); - else - queryPacketData.encodePropertyMaps(buffer); - - if (sendQuery) - queryPacketData.encodeQueryStack(buffer); - queryPacketData.encodeLocation(buffer); - } - - private void encodeSummaryClass(ByteBuffer buffer) { - if (summaryClass != null) { - byte[] tmp = Utf8.toBytes(summaryClass); - buffer.putInt(tmp.length); - buffer.put(tmp); - } - } - - private void encodeQuery(ByteBuffer buffer) { - Item.putString(query.getRanking().getProfile(), buffer); - buffer.putInt(QueryPacket.getQueryFlags(query)); - - encodeSummaryClass(buffer); - - query.encodeAsProperties(buffer, sendQuery); - - if (sendQuery) { - // The stack must be resubmitted to generate dynamic docsums - int itemCountPosition = buffer.position(); - buffer.putInt(0); - int dumpLengthPosition = buffer.position(); - buffer.putInt(0); - int count = query.encode(buffer); - buffer.putInt(itemCountPosition, count); - buffer.putInt(dumpLengthPosition, buffer.position() - dumpLengthPosition - 4); - } - - if (query.getRanking().getLocation() != null) { - int locationLengthPosition = buffer.position(); - buffer.putInt(0); - int locationLength = query.getRanking().getLocation().encode(buffer); - buffer.putInt(locationLengthPosition, locationLength); - } - } - - private void encodeDocIds(ByteBuffer buffer) { - byte[] emptyGid = new byte[GlobalId.LENGTH]; - for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { - Hit hit = i.next(); - if (hit instanceof FastHit && !hit.isFilled(summaryClass)) { - FastHit fastHit = (FastHit)hit; - buffer.put(fastHit.getGlobalId() != null ? fastHit.getRawGlobalId() : emptyGid); - buffer.putInt(fastHit.getPartId()); - buffer.putInt(0); //Unused, was docstamp - } - } - } - - public int getCode() { - return 219; - } - - public String toString() { - return "Get docsums x packet fetching " + getNumDocsums() + " docsums and packet length of " + getLength() + " bytes."; - } - - public int getNumDocsums() { - int num = 0; - for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { - Hit hit = i.next(); - if (hit instanceof FastHit && !hit.isFilled(summaryClass)) { - num++; - } - } - return num; - } - - /** - * Return the document summary class we want the fdispatch - * to use when replying to us - */ - @SuppressWarnings("UnusedDeclaration") - public String getSummaryClass() { - return summaryClass; - } } diff --git a/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java b/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java deleted file mode 100644 index 78ba857c475..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -/** - * Provides sequential access to each byte of a buffer - * as a hexadecimal string of length 2. - * - * @author Tony Vaagenes - */ -public final class HexByteIterator implements Iterator { - private final ByteBuffer buffer; - - private String hexByte(byte b) { - final int unsignedValue = ((int)b) & 0xff; - String s = Integer.toHexString(unsignedValue).toUpperCase(); - - boolean singleChar = unsignedValue < 0x10; - if (singleChar) - return '0' + s; - else - return s; - } - - public boolean hasNext() { - return buffer.hasRemaining(); - } - - public String next() { - return hexByte(buffer.get()); - } - - public void remove() { - throw new UnsupportedOperationException(); - } - - public HexByteIterator(ByteBuffer buffer) { - this.buffer = buffer.slice(); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/Packet.java b/container-search/src/main/java/com/yahoo/fs4/Packet.java deleted file mode 100644 index 1e9deede59d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/Packet.java +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.util.logging.Logger; - -/** - * Superclass of fs4 packets containing channel/query ID - * - * @author bratseth - */ -public abstract class Packet extends BasicPacket { - - private static Logger log = Logger.getLogger(Packet.class.getName()); - - /** - * The channel at which this packet will be sent or was received, - * or -1 when this is not known - */ - protected int channel = -1; - - /** - * Fills this package from a byte buffer positioned at the first - * byte of the package - * - * @return this Packet (as a BasicPacket) for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public BasicPacket decode(ByteBuffer buffer) { - int originalPos = buffer.position(); - length = buffer.getInt()+4; // Streamed packet length is the length-4 - int packetLength = length; - try { - int code = buffer.getInt(); - channel = buffer.getInt(); - - decodeAndDecompressBody(buffer, code, length - 3*4); - } - finally { - int targetPosition = (originalPos + packetLength); - if (buffer.position() != targetPosition) { - log.warning("Position in buffer is " + buffer.position() + " should be " + targetPosition); - buffer.position(targetPosition); - } - } - - return this; - } - - /** - *

Encodes this package onto the given buffer at the current - * position. The position of the buffer after encoding is the - * byte following the last encoded byte.

- * - *

This method will ensure that everything is written provided - * sufficient capacity regardless of the buffer limit. - * When returning, the limit is at the end of the package (qual to the - * position).

- * - * @return this for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public final Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException { - this.channel = channel; - int oldLimit = buffer.limit(); - int startPosition = buffer.position(); - - buffer.limit(buffer.capacity()); - try { - buffer.putInt(8); // Real length written later, when we know it - buffer.putInt(getCode()); - buffer.putInt(channel); - - encodeAndCompressBody(buffer, startPosition); - } - catch (java.nio.BufferOverflowException e) { - // reset buffer to expected state - buffer.position(startPosition); - buffer.limit(oldLimit); - throw new BufferTooSmallException("Destination buffer too small while encoding packet"); - } - return this; - } - - /** - * Get the channel id of the packet. In the FS4 transport protocol, - * there is the concept of a channel. This must not be confused - * with all the other channels we have floating around this code (aargh!). - *

- * The channel can be thought of as a way to pair up requests and - * responses in the FS4 protocol: A response always belongs to - * to a channel and it is the clients responsibility to not re-use - * channel ids within the same connection. - *

- * Summary: This "channel" means "session id" - * - * @return FS4 channel id - * - */ - public int getChannel() { return channel; } - - public void setChannel(int channel) { this.channel=channel; } - - - /** Informs that this packets needs a channel ID. */ - public boolean hasChannelId() { - return true; - } - - public String toString() { - return "packet with code " + getCode() + ", channelId=" + getChannel(); - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java b/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java deleted file mode 100644 index 3e673717d02..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * Returns the correct package for a package byte stream - * - * @author bratseth - * @author Bj\u00f8rn Borud - */ -public class PacketDecoder { - - /** Represents a packet and the data used to construct it */ - public static class DecodedPacket { - public BasicPacket packet; - public ByteBuffer consumedBytes; - - DecodedPacket(BasicPacket packet, ByteBuffer consumedBytes) { - this.packet = packet; - this.consumedBytes = consumedBytes; - } - } - - private PacketDecoder() {} - - /** - * Returns the package starting at the current position in the buffer - * - * @throws IllegalArgumentException if an unknown package code is - * encountered - * @throws java.nio.BufferUnderflowException if the buffer contains too little - * data to decode the pcode. - */ - public static BasicPacket decode(ByteBuffer buffer) { - int packetCode = buffer.getInt(buffer.position()+4); - packetCode &= BasicPacket.CODE_MASK; - - switch (packetCode) { - case 200: - return EolPacket.create().decode(buffer); - - case 203: - return ErrorPacket.create().decode(buffer); - - case 205: - return DocsumPacket.create().decode(buffer); - - case 217: - return QueryResultPacket.create().decode(buffer); - - case 221: - return PongPacket.create().decode(buffer); - - default: - throw new IllegalArgumentException("No support for packet " + packetCode); - } - } - - /** Gives the packet along with the bytes consumed to construct it. */ - public static DecodedPacket decodePacket(ByteBuffer buffer) { - ByteBuffer dataUsed = buffer.slice(); - int start = buffer.position(); - - BasicPacket packet = decode(buffer); - dataUsed.limit(buffer.position() - start); - return new DecodedPacket(packet, dataUsed); - } - - /** Sniff channel ID for query result packets */ - public static int sniffChannel(ByteBuffer buffer) { - int remaining = buffer.remaining(); - if (remaining < 12) { - return 0; - } - int packetCode = buffer.getInt(buffer.position()+4); - packetCode &= BasicPacket.CODE_MASK; - switch (packetCode) { - case 202: - case 208: - case 214: - case 217: - return buffer.getInt(buffer.position()+8); - default: - return 0; - } - } - - /** - * Test whether the buffer contains (the start of) a pong packet. - * - * Returns false if there is not enough data to determine the - * answer. - */ - public static boolean isPongPacket(ByteBuffer buffer) { - - int remaining = buffer.remaining(); - if (remaining < 8) - return false; - int packetCode = buffer.getInt(buffer.position()+4); - packetCode &= BasicPacket.CODE_MASK; - if (packetCode == 221) - return true; - else - return false; - } - - /** - * Note that it assumes that the position of the ByteBuffer is at the - * start of a packet and that we have enough data to actually read - * an integer out of the buffer. - * - * @return Return the length of the fs4 packet. Returns -1 if length - * could not be determined because we had too little - * data in the buffer. - * - */ - public static int packetLength(ByteBuffer buffer) - { - if (buffer.remaining() < 4) { - return -1; - } - return (buffer.getInt(buffer.position()) + 4); - } - - /** - * Takes a buffer possibly containing a packet. - * - *

- * If we return a packet when we return: - *

    - *
  • the buffer is positioned at the beginning of the next - * packet when we return. - *
  • limit is unchanged - *
- * - *

- * If we return null there were no more packets - * there to decode and the following is true of the buffer - *

    - *
  • the buffer is compacted, ie. partial packet is - * moved to the start, or if no more data is available - * the buffer is cleared. - *
  • the position is set to the next byte after the valid - * data so the buffer is ready for reading. - *
- * - * If there are no packets to be returned the buffer is compacted - * (ie. content is moved to the start and read-pointer is positioned - * - * @return Returns the next available packet from the buffer or - * null if there are no more complete - * packets in the buffer at this time. - */ - public static DecodedPacket extractPacket(ByteBuffer buffer) - throws BufferTooSmallException - { - int remaining = buffer.remaining(); - - // if we are empty we can reset the buffer - if (remaining == 0) { - buffer.clear(); - return null; - } - - // if we can't figure out the size because we have less than - // 4 bytes we prepare the buffer for more data reading. - if (remaining < 4) { - buffer.compact(); - return null; - } - - int plen = packetLength(buffer); - - // -1 means that we do not have enough data to read the packet - // size yet - if (plen == -1) { - buffer.compact(); - return null; - } - - // if we haven't read an entire packet yet, we compact and return - // (same as above but separate code for clarity). note that this - // also occurs when there is no physical room for the packet, so - // clients of this API need to be aware of this and check for it - if (remaining < plen) { - - // if the read buffer is too small we must take drastic action - if (buffer.capacity() < plen) { - throw new BufferTooSmallException("Buffer too small to hold packet"); - } - - buffer.compact(); - return null; - } - - return PacketDecoder.decodePacket(buffer); - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java b/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java deleted file mode 100644 index 6b2c792837a..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Logger; - -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.log.LogLevel; -import com.yahoo.search.Query; - -/** - * Responsible for dumping query & query result packets - * - * @author Tony Vaagenes - */ -public class PacketDumper implements PacketListener { - /** High level representation of packet types (e.g. query, result, ...) */ - public static enum PacketType { - query(QueryPacket.class), - result(QueryResultPacket.class); - - Class implementationType; - - PacketType(Class implementationType) { - this.implementationType = implementationType; - } - } - - private static Logger log = Logger.getLogger(PacketDumper.class.getSimpleName()); - - private volatile boolean disabled = true; - private final File logDirectory; - private final Map, DataOutputStream> dumpFiles = - new HashMap<>(); - private final String fileNamePattern; - - private void handlePacket(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm, String direction) { - //minimize overhead when disabled: - if (disabled) - return; - - try { - DataOutputStream stream = getOutputStream(packet); - if (stream != null) { - synchronized (stream) { - stream.writeChars(packet.getTimestamp() + " " + direction + " packet on channel " + channel.getChannelId()); - String indent = " "; - Query query = channel.getQuery(); - if (query != null) - stream.writeChars('\n' + indent + "Query: '" + query.getModel().getQueryString()); - hexDump(indent, stream, serializedForm); - - stream.writeChar('\n'); - stream.flush(); - } - } - } catch (IOException e) { - log.log(LogLevel.WARNING, "Could not log packet.", e); - } - } - - private void hexDump(String indent, DataOutputStream stream, ByteBuffer serializedForm) throws IOException { - HexByteIterator hexByteIterator = new HexByteIterator(serializedForm); - - long count = 0; - final int maxNumCharacters = 80; - while (hexByteIterator.hasNext()) { - if (count++ % maxNumCharacters == 0) - stream.writeChar('\n'); - stream.writeChars(hexByteIterator.next()); - } - } - - private synchronized DataOutputStream getOutputStream(BasicPacket packet) { - return dumpFiles.get(packet.getClass()); - } - - public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - handlePacket(channel, packet, serializedForm, "Sent"); - } - - public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - handlePacket(channel, packet, serializedForm, "Received"); - } - - public synchronized void dumpPackets(PacketType packetType, boolean on) throws IOException { - OutputStream stream = dumpFiles.get(packetType.implementationType); - if (!on && stream != null) - closeFile(stream, packetType); - else if (on && stream == null) - openFile(packetType); - } - - private void openFile(PacketType packetType) throws FileNotFoundException { - if (!logDirectory.exists() || - logDirectory.mkdirs()) { - - throw new RuntimeException("PacketDumper: Could not create log directory " + logDirectory); - } - String fileName = fileNamePattern.replace("%s", packetType.toString()); - boolean append = true; - DataOutputStream outputStream = - new DataOutputStream( - new BufferedOutputStream( - new FileOutputStream(new File(logDirectory, fileName), append))); - dumpFiles.put(packetType.implementationType, outputStream); - - disabled = dumpFiles.isEmpty(); - } - - private void closeFile(OutputStream stream, PacketType packetType) throws IOException { - try { - synchronized (stream) { - stream.close(); - } - } finally { - dumpFiles.remove(packetType.implementationType); - disabled = dumpFiles.isEmpty(); - } - } - - public PacketDumper(File logDirectory, String fileNamePattern) { - this.logDirectory = logDirectory; - this.fileNamePattern = fileNamePattern; - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketListener.java b/container-search/src/main/java/com/yahoo/fs4/PacketListener.java deleted file mode 100644 index 113da03b420..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketListener.java +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.fs4.mplex.FS4Channel; - -/** - * Interface for recieving notifications of packets sent or recieved. - * - * @author Tony Vaagenes - */ -public interface PacketListener { - void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm); - void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm); -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java b/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java deleted file mode 100644 index 1be79031d1b..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.fs4.mplex.FS4Channel; - -/** - * Broadcasts packet notifications to a list of listeners. - * - * @author Tony Vaagenes - */ -public class PacketNotificationsBroadcaster implements PacketListener { - - private final PacketListener[] listeners; - - public PacketNotificationsBroadcaster(PacketListener... listeners) { - this.listeners = listeners; - } - - @Override - public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - if (channel == null) return; - for (PacketListener listener : listeners) - listener.packetSent(channel, packet, serializedForm); - } - - @Override - public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - if (channel == null) return; - for (PacketListener listener : listeners) - listener.packetReceived(channel, packet, serializedForm); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java b/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java deleted file mode 100644 index b577ef31ad8..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.search.Query; - -/** - * Adds packets to the query context - * - * @author Tony Vaagenes - */ -public class PacketQueryTracer implements PacketListener { - - private final static int traceLevel = 10; - - private void addTrace(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - Query query = channel.getQuery(); - if (query != null && query.getTraceLevel() >= traceLevel) { - StringBuilder traceString = new StringBuilder(); - traceString.append(packet.getClass().getSimpleName()).append(": "); - hexDump(serializedForm, traceString); - - final boolean includeQuery = true; - query.trace(traceString.toString(), includeQuery, traceLevel); - } - } - - private void hexDump(ByteBuffer serializedForm, StringBuilder traceString) { - HexByteIterator hexByteIterator = new HexByteIterator(serializedForm); - - long count = 0; - final int maxNumCharacters = 80; - while (hexByteIterator.hasNext()) { - if (++count % maxNumCharacters == 0) - traceString.append('\n'); - traceString.append(hexByteIterator.next()); - } - } - - @Override - public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - addTrace(channel, packet, serializedForm); - } - - @Override - public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - addTrace(channel, packet, serializedForm); - } - -} - diff --git a/container-search/src/main/java/com/yahoo/fs4/PingPacket.java b/container-search/src/main/java/com/yahoo/fs4/PingPacket.java deleted file mode 100644 index 7c1f8df1101..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PingPacket.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * A ping packet for FS4. This packet has no data. It maps to - * PCODE_MONITORQUERY the C++ implementation of the protocol. - * - * @author Steinar Knutsen - */ -public class PingPacket extends BasicPacket { - - public int getCode() { return 220; } - - public void encodeBody(ByteBuffer buffer) { - buffer.putInt(MQF_QFLAGS); - buffer.putInt(MQFLAG_REPORT_ACTIVEDOCS); - } - - /** feature bits, taken from searchlib/common/transport.h */ - static final int MQF_QFLAGS = 0x00000002; - - /** flag bits, taken from searchlib/common/transport.h */ - static final int MQFLAG_REPORT_ACTIVEDOCS = 0x00000020; -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java deleted file mode 100644 index 37aaf7067a9..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.util.Optional; - -/** - * A pong packet for FS4. It maps to PCODE_MONITORRESULTX - * in the C++ implementation of the protocol. - * - * @author Steinar Knutsen - */ -public class PongPacket extends BasicPacket { - - private int dispatchTimestamp; - - @SuppressWarnings("unused") - private int totalNodes; // configured nodes - private Optional activeNodes = Optional.empty(); // number of nodes that are up - @SuppressWarnings("unused") - private int totalPartitions; // configured partitions - private Optional activePartitions = Optional.empty(); // number of partitions that are up - - private Optional activeDocuments = Optional.empty(); // how many documents are searchable (sum) - - public PongPacket() { - } - - /** For testing */ - public PongPacket(long activeDocuments) { - this.activeDocuments = Optional.of(activeDocuments); - } - - private int code; - protected void codeDecodedHook(int code) { this.code = code; } - public int getCode() { return code; } - - public void decodeBody(ByteBuffer buffer) { - int features = buffer.getInt(); - buffer.getInt(); // Unused lowPartitionId - dispatchTimestamp = buffer.getInt(); - if ((features & MRF_MLD) != 0) { - totalNodes = buffer.getInt(); - activeNodes = Optional.of(buffer.getInt()); - totalPartitions = buffer.getInt(); - activePartitions = Optional.of(buffer.getInt()); - } - if ((features & MRF_RFLAGS) != 0) { - buffer.getInt(); // ignore rflags (historical field) - } - if ((features & MRF_ACTIVEDOCS) != 0) { - activeDocuments = Optional.of(Long.valueOf(buffer.getLong())); - } - } - - public static PongPacket create() { - return new PongPacket(); - } - - /** - * Return current docstamp for backend to make cache invalidation - * possible. - * */ - public int getDocstamp() { - return dispatchTimestamp; - } - - /** - * retrieve the reported number of active (searchable) documents - * in the monitored backend. - **/ - public Optional getActiveDocuments() { - return activeDocuments; - } - - public Optional getActiveNodes() { - return activeNodes; - } - - public Optional getActivePartitions() { - return activePartitions; - } - - /** feature bits, taken from searchlib/common/transport.h */ - static final int MRF_MLD = 0x00000001; - static final int MRF_RFLAGS = 0x00000008; - static final int MRF_ACTIVEDOCS = 0x00000010; - - /** packet codes, taken from searchlib/common/transport.h */ - static final int PCODE_MONITORRESULTX = 221; - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java deleted file mode 100644 index d1d8b9a0f77..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.compress.IntegerCompressor; -import com.yahoo.io.GrowableByteBuffer; -import com.yahoo.prelude.query.Item; -import com.yahoo.search.Query; -import com.yahoo.search.dispatch.Dispatcher; -import com.yahoo.search.grouping.vespa.GroupingExecutor; -import com.yahoo.search.query.Ranking; -import com.yahoo.searchlib.aggregation.Grouping; -import com.yahoo.text.Utf8String; -import com.yahoo.vespa.objects.BufferSerializer; - -import java.nio.ByteBuffer; -import java.util.List; - -/** - * An "extended query" packet. This is the query packets used today, - * they allow more flexible sets of parameters to be shipped with queries. - * This packet can be encoded only. - * - * @author bratseth - * @author Bjørn Borud - */ -public class QueryPacket extends Packet { - - private final String serverId; - private final Query query; - - private QueryPacketData queryPacketData; - - private QueryPacket(String serverId, Query query) { - this.serverId = serverId; - this.query = query; - } - - /** Returns the query from which this packet is populated */ - public Query getQuery() { - return query; - } - - /** - * Creates and returns a query packet - * - * @param query the query to convert to a packet - */ - public static QueryPacket create(String serverId, Query query) { - return new QueryPacket(serverId, query); - } - - - /** Returns the first offset requested */ - public int getOffset() { - return query.getOffset(); - } - - /** - * Returns the last offset requested (inclusively), that is - * getOffset() + getHits() - */ - public int getLastOffset() { - return getOffset() + getHits(); - } - - /** Returns the number of hits requested */ - public int getHits() { - return query.getHits(); - } - - public void encodeBody(ByteBuffer buffer) { - queryPacketData = new QueryPacketData(); - - boolean sendSessionKey = query.getGroupingSessionCache() || query.getRanking().getQueryCache(); - int featureFlag = getFeatureInt(sendSessionKey); - buffer.putInt(featureFlag); - - IntegerCompressor.putCompressedPositiveNumber(getOffset(), buffer); - IntegerCompressor.putCompressedPositiveNumber(getHits(), buffer); - buffer.putInt(Math.max(1, (int)query.getTimeLeft())); // Safety to avoid sending down 0 or negative number - buffer.putInt(getFlagInt()); - int startOfFieldToSave = buffer.position(); - Item.putString(query.getRanking().getProfile(), buffer); - queryPacketData.setRankProfile(buffer, startOfFieldToSave); - - if ( (featureFlag & QF_PROPERTIES) != 0) { - startOfFieldToSave = buffer.position(); - query.encodeAsProperties(buffer, true); - queryPacketData.setPropertyMaps(buffer, startOfFieldToSave); - } - - // Language not needed when sending query stacks - - if ((featureFlag & QF_SORTSPEC) != 0) { - int sortSpecLengthPosition=buffer.position(); - buffer.putInt(0); - int sortSpecLength = query.getRanking().getSorting().encode(buffer); - buffer.putInt(sortSpecLengthPosition, sortSpecLength); - } - - if ( (featureFlag & QF_GROUPSPEC) != 0) { - List groupingList = GroupingExecutor.getGroupingList(query); - BufferSerializer gbuf = new BufferSerializer(new GrowableByteBuffer()); - gbuf.putInt(null, groupingList.size()); - for (Grouping g: groupingList){ - g.serialize(gbuf); - } - gbuf.getBuf().flip(); - byte[] blob = new byte [gbuf.getBuf().limit()]; - gbuf.getBuf().get(blob); - buffer.putInt(blob.length); - buffer.put(blob); - } - - if (sendSessionKey) { - Utf8String key = query.getSessionId(serverId).asUtf8String(); - buffer.putInt(key.getByteLength()); - buffer.put(key.getBytes()); - } - - if ((featureFlag & QF_LOCATION) != 0) { - startOfFieldToSave = buffer.position(); - int locationLengthPosition=buffer.position(); - buffer.putInt(0); - int locationLength= query.getRanking().getLocation().encode(buffer); - buffer.putInt(locationLengthPosition, locationLength); - queryPacketData.setLocation(buffer, startOfFieldToSave); - } - - startOfFieldToSave = buffer.position(); - int stackItemPosition=buffer.position(); - buffer.putInt(0); // Number of stack items written below - int stackLengthPosition = buffer.position(); - buffer.putInt(0); - int stackPosition = buffer.position(); - int stackItemCount=query.encode(buffer); - int stackLength = buffer.position() - stackPosition; - buffer.putInt(stackItemPosition,stackItemCount); - buffer.putInt(stackLengthPosition, stackLength); - queryPacketData.setQueryStack(buffer, startOfFieldToSave); - } - - /** - * feature bits, taken from searchlib/common/transport.h - */ - private static final int QF_PARSEDQUERY = 0x00000002; - private static final int QF_RANKP = 0x00000004; - private static final int QF_SORTSPEC = 0x00000080; - private static final int QF_LOCATION = 0x00000800; - private static final int QF_PROPERTIES = 0x00100000; - private static final int QF_GROUPSPEC = 0x00400000; - private static final int QF_SESSIONID = 0x00800000; - - private int getFeatureInt(boolean sendSessionId) { - int features = QF_PARSEDQUERY | QF_RANKP; // this bitmask means "parsed query" in query packet. - // And rank properties. Both are always present - - features |= (query.getRanking().getSorting() != null) ? QF_SORTSPEC : 0; - features |= (query.getRanking().getLocation() != null) ? QF_LOCATION : 0; - features |= (query.hasEncodableProperties()) ? QF_PROPERTIES : 0; - features |= GroupingExecutor.hasGroupingList(query) ? QF_GROUPSPEC : 0; - features |= (sendSessionId) ? QF_SESSIONID : 0; - - return features; - } - - /** - * query flag bits, taken from searchlib/common/transport.h - */ - private static final int QFLAG_EXTENDED_COVERAGE = 0x00000001; - private static final int QFLAG_COVERAGE_NODES = 0x00000002; - private static final int QFLAG_ESTIMATE = 0x00000080; - private static final int QFLAG_DROP_SORTDATA = 0x00004000; - private static final int QFLAG_NO_RESULTCACHE = 0x00010000; - private static final int QFLAG_DUMP_FEATURES = 0x00040000; - - private int getFlagInt() { - int flags = getQueryFlags(query); - queryPacketData.setQueryFlags(flags); - flags |= query.properties().getBoolean(Dispatcher.dispatchInternal, false) ? 0 : QFLAG_DROP_SORTDATA; - return flags; - } - - - public int getCode() { - return 218; - } - - public String toString() { - return "Query x packet [query: " + query + "]"; - } - - static int getQueryFlags(Query query) { - int flags = QFLAG_EXTENDED_COVERAGE | QFLAG_COVERAGE_NODES; - - flags |= query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE) ? QFLAG_ESTIMATE : 0; - flags |= query.getNoCache() ? QFLAG_NO_RESULTCACHE : 0; - flags |= query.properties().getBoolean(Ranking.RANKFEATURES, false) ? QFLAG_DUMP_FEATURES : 0; - return flags; - } - - /** - * Fetch a binary wrapper containing data from encoding process for use in - * creating a summary request. - * - * @return wrapper object suitable for creating a summary fetch packet - * @throws IllegalStateException if no wrapper has been generated - */ - public QueryPacketData getQueryPacketData() { - if (queryPacketData == null) - throw new IllegalStateException("Trying to fetch a hit tag without having encoded the packet first."); - return queryPacketData; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java b/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java deleted file mode 100644 index 673b9cc0c47..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * Class for storing data which has to be constant between query and summary - * fetch for a Vespa hit. Used to avoid to tagging Vespa summary hits with - * the entire query as an immutable. - * - * @author Steinar Knutsen - */ -public final class QueryPacketData { - - private byte[] rankProfile = null; - private int queryFlags = 0; - private byte[] queryStack = null; - private byte[] location = null; - private byte[] propertyMaps = null; - - /** - * Given src.position() bigger than startOfField, allocate a fresh byte - * array, and copy the data from startOfField to src.position() into it. - * - * @param src - * the ByteBuffer to copy from - * @param startOfField - * the position of the buffer at which the field starts - * @return a copy of the data between startOfField and the buffer position - * before invokation - * @throws IllegalArgumentException - * if startOfField is somewhere after src.position() - */ - private byte[] copyField(final ByteBuffer src, final int startOfField) { - if (startOfField > src.position()) { - throw new IllegalArgumentException("startOfField after src.position()"); - } - final byte[] dst = new byte[src.position() - startOfField]; - - src.position(startOfField); - src.get(dst); - return dst; - } - - ByteBuffer encodeRankProfile(final ByteBuffer buffer) { - return buffer.put(rankProfile); - } - - void setRankProfile(final ByteBuffer src, final int startOfField) { - rankProfile = copyField(src, startOfField); - } - - ByteBuffer encodeQueryFlags(final ByteBuffer buffer) { - return buffer.putInt(queryFlags); - } - - void setQueryFlags(final int queryFlags) { - this.queryFlags = queryFlags; - } - - ByteBuffer encodeQueryStack(final ByteBuffer buffer) { - return buffer.put(queryStack); - } - - void setQueryStack(final ByteBuffer src, final int startOfField) { - queryStack = copyField(src, startOfField); - } - - ByteBuffer encodePropertyMaps(final ByteBuffer buffer) { - if (propertyMaps != null) { - buffer.put(propertyMaps); - } - return buffer; - } - - void setPropertyMaps(final ByteBuffer src, final int startOfField) { - propertyMaps = copyField(src, startOfField); - } - - void setLocation(final ByteBuffer src, final int startOfField) { - this.location = copyField(src, startOfField); - } - - ByteBuffer encodeLocation(final ByteBuffer buffer) { - if (location != null) { - buffer.put(location); - } - return buffer; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java deleted file mode 100644 index 6a27beefb5e..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.nio.IntBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * A query result packet (code 217). This packet can be decoded only. - * - * @author bratseth - */ -public class QueryResultPacket extends Packet { - - /** The code of this type of package */ - private static final int code = 217; - - /** Whether mld data is included in this result */ - private boolean mldFeature = false; - - /** Whether sort data is included in this result */ - private boolean sortData = false; - - /** Whether coverage information is included in this result */ - private boolean coverageNodes = false; - private long coverageDocs = 0; - private long activeDocs = 0; - private long soonActiveDocs = 0; - private int degradedReason = 0; - private short nodesQueried = 0; - private short nodesReplied = 0; - - /** Whether the result contains grouping results **/ - private boolean groupDataFeature = false; - - /** Whether the result contains properties **/ - private boolean propsFeature = false; - - private long totalDocumentCount; - - private Number maxRank; - - private int docstamp; - - private byte[] groupData = null; - - private List documents=new ArrayList<>(10); - - public FS4Properties[] propsArray; - - private int offset; - - private QueryResultPacket() { } - - public static QueryResultPacket create() { - return new QueryResultPacket(); - } - - public void setDocstamp(int docstamp){ this.docstamp=docstamp; } - - public int getDocstamp() { return docstamp; } - - /** Returns whether this has the mysterious mld feature */ - public boolean getMldFeature() { return mldFeature; } - - public boolean getCoverageFeature() { return true; } - - public long getCoverageDocs() { return coverageDocs; } - - public long getActiveDocs() { return activeDocs; } - - public long getSoonActiveDocs() { return soonActiveDocs; } - - public int getDegradedReason() { return degradedReason; } - - public boolean getCoverageFull() { - return coverageDocs == activeDocs; - } - - - /** @return offset returned by backend */ - public int getOffset() { return offset; } - - /** Only for testing. */ - public void setOffset(int offset) { - this.offset = offset; - } - - @Override - public void decodeBody(ByteBuffer buffer) { - IntBuffer ints = buffer.asIntBuffer(); - decodeFeatures(ints); - offset = ints.get(); - int documentCount = ints.get(); - buffer.position(buffer.position() + ints.position() * 4); - totalDocumentCount = buffer.getLong(); - maxRank = decodeMaxRank(buffer); - ints = buffer.asIntBuffer(); - docstamp = ints.get(); - buffer.position(buffer.position() + ints.position() * 4); - // do not access "ints" below here! - - if (coverageNodes) { - nodesQueried = buffer.getShort(); - nodesReplied = buffer.getShort(); - } - - byte[][] documentSortData = null; - if (sortData && documentCount > 0) { - documentSortData = decodeSortData(buffer, documentCount); - } - - if (groupDataFeature) { - int len = buffer.getInt(); - groupData = new byte[len]; - buffer.get(groupData); - } - - coverageDocs = buffer.getLong(); - activeDocs = buffer.getLong(); - soonActiveDocs = buffer.getLong(); - degradedReason = buffer.getInt(); - - decodeDocuments(buffer, documentCount, documentSortData); - if (propsFeature) { - int numMaps = buffer.getInt(); - propsArray = new FS4Properties[numMaps]; - for (int i = 0; i < numMaps; i++) { - propsArray[i] = new FS4Properties(); - propsArray[i].decode(buffer); - } - } - } - - private byte[][] decodeSortData(ByteBuffer buffer, int documentCount) { - int[] indexes = new int[documentCount]; - indexes[0] = 0; - for (int i = 1; i < documentCount; i++) { - indexes[i] = buffer.getInt(); - } - - int sortDataLengthInBytes = buffer.getInt(); - byte[][] ret = new byte[indexes.length][]; - - for (int i = 0; i < indexes.length; i++) { - int end = i + 1 >= indexes.length ? sortDataLengthInBytes : indexes[i + 1]; - int len = end - indexes[i]; - ret[i] = new byte[len]; - buffer.get(ret[i], 0, len); - } - return ret; - } - - private Number decodeMaxRank(ByteBuffer buffer) { - return Double.valueOf(buffer.getDouble()); - } - - /** - * feature bits - */ - private static final int QRF_MLD = 0x00000001; - private static final int QRF_COVERAGE_NODES = 0x00000002; - private static final int QRF_SORTDATA = 0x00000010; - private static final int QRF_UNUSED_1 = 0x00000020; - private static final int QRF_UNUSED_2 = 0x00000040; - private static final int QRF_GROUPDATA = 0x00000200; - private static final int QRF_PROPERTIES = 0x00000400; - - /** Decodes the feature int of this package data into boolean feature fields */ - private void decodeFeatures(IntBuffer buffer) { - int features = buffer.get(); - mldFeature = (QRF_MLD & features) != 0; - sortData = (QRF_SORTDATA & features) != 0; - coverageNodes = (QRF_COVERAGE_NODES & features) != 0; - groupDataFeature = (QRF_GROUPDATA & features) != 0; - propsFeature = (QRF_PROPERTIES & features) != 0; - } - - private void decodeDocuments(ByteBuffer buffer, int documentCount, byte[][] documentSortData) { - for (int i = 0; i < documentCount; i++) { - byte[] sort = documentSortData == null ? null : documentSortData[i]; - documents.add(new DocumentInfo(buffer, this, sort)); - } - } - - public int getCode() { return code; } - - protected void codeDecodedHook(int code) { - if ( code != QueryResultPacket.code) - throw new RuntimeException("Programming error, packet " + getCode() + "Not expected."); - } - - public int getDocumentCount() { return documents.size(); } - - public String toString() { - return "Query result x packet [" + getDocumentCount() + " documents]"; - } - - /** Returns the opaque grouping results **/ - public byte[] getGroupData() { return groupData; } - - - /** Returns the total number of documents avalable for this query */ - public long getTotalDocumentCount() { return totalDocumentCount; } - - /** Only for testing. */ - public void setTotalDocumentCount(long totalDocumentCount) { - this.totalDocumentCount = totalDocumentCount; - } - - /** Returns a read-only list containing the DocumentInfo objects of this result */ - public List getDocuments() { - return Collections.unmodifiableList(documents); - } - - public void addDocument(DocumentInfo document) { - documents.add(document); - } - - // TODO: Handle new maxRank intelligently - public int getMaxRank() { return maxRank.intValue(); } - - public short getNodesQueried() { return nodesQueried; } - public short getNodesReplied() { return nodesReplied; } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java deleted file mode 100644 index 12f8e9e387d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java +++ /dev/null @@ -1,449 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.Packet; -import com.yahoo.fs4.PacketDumper; -import com.yahoo.fs4.PacketListener; -import com.yahoo.fs4.PacketNotificationsBroadcaster; -import com.yahoo.fs4.PacketQueryTracer; -import com.yahoo.io.Connection; -import com.yahoo.io.ConnectionFactory; -import com.yahoo.io.Listener; -import com.yahoo.vespa.defaults.Defaults; -import com.yahoo.yolean.Exceptions; -import com.yahoo.yolean.concurrent.ConcurrentResourcePool; -import com.yahoo.yolean.concurrent.ResourceFactory; -import com.yahoo.yolean.concurrent.ResourcePool; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * @author Bjorn Borud - */ -public class Backend implements ConnectionFactory { - - private static int DEFAULT_BUFFER_SIZE = 0x8000; - - public static final class BackendStatistics { - - public final int activeConnections; - public final int passiveConnections; - - public BackendStatistics(int activeConnections, int passiveConnections) { - this.activeConnections = activeConnections; - this.passiveConnections = passiveConnections; - } - - @Override - public String toString() { - return activeConnections + "/" + totalConnections(); - } - - public int totalConnections() { - return activeConnections + passiveConnections; - } - } - - private static final Logger log = Logger.getLogger(Backend.class.getName()); - - private final ListenerPool listeners; - private final InetSocketAddress address; - private final String host; - private final int port; - private final Map activeChannels = new HashMap<>(); - private int channelId = 0; - private boolean shutdownInitiated = false; - - /** Whether we are currently in the state of not being able to connect, to avoid repeated logging */ - private boolean areInSocketNotConnectableState = false; - - private final LinkedList pingChannels = new LinkedList<>(); - private final PacketListener packetListener; - private final ConnectionPool connectionPool; - private final PacketDumper packetDumper; - private final AtomicInteger connectionCount = new AtomicInteger(0); - private final ConcurrentResourcePool byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() { - @Override - public ByteBuffer create() { - return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - } - }); - - /** - * For unit testing. do not use - */ - protected Backend() { - listeners = null; - host = null; - port = 0; - packetListener = null; - packetDumper = null; - address = null; - connectionPool = new ConnectionPool(); - } - - public Backend(String host, - int port, - String serverDiscriminator, - ListenerPool listenerPool, - ConnectionPool connectionPool) { - String fileNamePattern = "qrs." + serverDiscriminator + '.' + host + ":" + port + ".%s" + ".dump"; - packetDumper = new PacketDumper(new File(Defaults.getDefaults().underVespaHome("logs/vespa/qrs/")), - fileNamePattern); - packetListener = new PacketNotificationsBroadcaster(packetDumper, new PacketQueryTracer()); - this.listeners = listenerPool; - this.host = host; - this.port = port; - address = new InetSocketAddress(host, port); - this.connectionPool = connectionPool; - } - - private void logWarning(String attemptDescription, Exception e) { - log.log(Level.WARNING, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); - } - - private void logInfo(String attemptDescription, Exception e) { - log.log(Level.INFO, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); - } - - // ============================================================ - // ==== connection pool stuff - // ============================================================ - - /** - * Fetch a connection from the connection pool. If the pool - * is empty we create a connection. - */ - private FS4Connection getConnection() throws IOException { - FS4Connection connection = connectionPool.getConnection(); - if (connection == null) { - // if pool was empty create one: - connection = createConnection(); - } - return connection; - } - - ConcurrentResourcePool getBufferPool() { - return byteBufferResourcePool; - } - - /** - * Return a connection to the connection pool. If the - * connection is not valid anymore we drop it, ie. do not - * put it into the pool. - */ - public void returnConnection(FS4Connection connection) { - connectionPool.releaseConnection(connection); - } - - /** - * Create a new connection to the target for this backend. - */ - private FS4Connection createConnection() throws IOException { - SocketChannel socket = SocketChannel.open(); - try { - connectSocket(socket); - } catch (Exception e) { - // was warning, see VESPA-1922 - if ( ! areInSocketNotConnectableState) { - logInfo("connecting to", e); - } - areInSocketNotConnectableState = true; - socket.close(); - return null; - } - areInSocketNotConnectableState = false; - int listenerId = connectionCount.getAndIncrement()%listeners.size(); - Listener listener = listeners.get(listenerId); - FS4Connection connection = new FS4Connection(socket, listener, this, packetListener); - listener.registerConnection(connection); - - log.fine("Created new connection to " + host + ":" + port); - connectionPool.createdConnection(); - return connection; - } - - private void connectSocket(SocketChannel socket) throws IOException { - socket.configureBlocking(false); - - boolean connected = socket.connect(address); - - // wait for connection - if (!connected) { - long timeBarrier = System.currentTimeMillis() + 20L; - while (true) { - try { - Thread.sleep(5L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Received InterruptedException while waiting for socket to connect.", e); - } - // don't care whether it's spurious wakeup - connected = socket.finishConnect(); - if (connected || System.currentTimeMillis() > timeBarrier) { - break; - } - } - } - - // did we get a connection? - if ( !connected) { - throw new IllegalArgumentException("Could not create connection to dispatcher on " - + address.getHostName() + ":" + address.getPort()); - } - socket.socket(). - setTcpNoDelay(true); - } - - - //============================================================ - //==== channel management - //============================================================ - - /** Opens a new channel to fdispatch. Analogous to the "Channel" concept as used in FS4. */ - public FS4Channel openChannel() { - int cachedChannelId; - synchronized (this) { - if (channelId >= ((1 << 31) - 2)) { - channelId = 0; - } - cachedChannelId = channelId; - channelId += 2; - } - Integer id = cachedChannelId; - FS4Channel chan = new FS4Channel(this, id); - synchronized (activeChannels) { - activeChannels.put(id, chan); - } - return chan; - } - - public FS4Channel openPingChannel() { - FS4Channel chan = FS4Channel.createPingChannel(this); - synchronized (pingChannels) { - pingChannels.add(chan); - } - return chan; - } - - /** - * Get the remote address for this Backend. This method - * has package access only, because it is really only of - * importance to FS4Channel for writing slightly more sensible - * log messages. - * @return Returns the address (host, port) for this Backend. - */ - InetSocketAddress getAddress() { - return address; - } - - /** - * Get an active channel by id. - * - * @param id the (fs4) channel id - * @return returns the (fs4) channel associated with this id - * or null if the channel is not in the - * set of active channels. - */ - public FS4Channel getChannel(Integer id) { - synchronized (activeChannels) { - return activeChannels.get(id); - } - } - - /** - * Return the first channel in the queue waiting for pings or - * null if none. - */ - public FS4Channel getPingChannel() { - synchronized (pingChannels) { - return (pingChannels.isEmpty()) ? null : pingChannels.getFirst(); - } - } - - /** - * Get an active channel by id. This is a wrapper for the method - * that takes the id as an Integer. - * - * @param id The (fs4) channel id - * @return Returns the (fs4) channel associated with this id - * or null if the channel is not in the - * set of active channels. - */ - public FS4Channel getChannel(int id) { - return getChannel(Integer.valueOf(id)); - } - - /** - * Remove a channel. We do not want this method to be called - * directly by the client -- removal of channels should be done - * by calling the close() method of the channel. - * - * @param id The (fs4) channel id - * @return Removes and returns the (fs4) channel associated - * with this id or null if the channel is - * not in the set of active channels. - */ - protected FS4Channel removeChannel(Integer id) { - synchronized (activeChannels) { - return activeChannels.remove(id); - } - } - - /** - * Remove a ping channel. We do not want this method to be called - * directly by the client -- removal of channels should be done - * by calling the close() method of the channel. - * - * @return Removes and returns the (fs4) channel first in - * the queue of ping channels or null - * if there are no active ping channels. - */ - protected FS4Channel removePingChannel() { - synchronized (pingChannels) { - if (pingChannels.isEmpty()) - return null; - return pingChannels.removeFirst(); - } - } - //============================================================ - //==== packet sending and reception - //============================================================ - - protected boolean sendPacket(BasicPacket packet, Integer channelId) throws IOException { - if (shutdownInitiated) { - log.fine("Tried to send packet after shutdown initiated. Ignored."); - return false; - } - - FS4Connection connection = null; - try { - connection = getConnection(); - if (connection == null) { - return false; - } - connection.sendPacket(packet, channelId); - } - finally { - if (connection != null) { - returnConnection(connection); - } - } - - return true; - } - - /** - * When a connection receives a packet, it uses this method to - * dispatch the packet to the right FS4Channel. If the corresponding - * FS4Channel does not exist the packet is dropped and a message is - * logged saying so. - */ - protected void receivePacket(BasicPacket packet) { - FS4Channel fs4; - if (packet.hasChannelId()) - fs4 = getChannel(((Packet)packet).getChannel()); - else - fs4 = getPingChannel(); - - // channel does not exist - if (fs4 == null) { - return; - } - try { - fs4.addPacket(packet); - } - catch (InterruptedException e) { - log.info("Interrupted during packet adding. Packet = " + packet.toString()); - Thread.currentThread().interrupt(); - } - catch (InvalidChannelException e) { - log.log(Level.WARNING, "Channel was invalid. Packet = " + packet.toString() - + " Backend probably sent data pertaining an old request," - + " system may be overloaded."); - } - } - - /** - * Attempt to establish a connection without sending messages and then - * return it to the pool. The assumption is that if the probing is - * successful, the connection will be used soon after. There should be - * minimal overhead since the connection is cached. - */ - public boolean probeConnection() { - if (shutdownInitiated) { - return false; - } - - FS4Connection connection = null; - try { - connection = getConnection(); - } catch (IOException ignored) { - // connection is null - } finally { - if (connection != null) { - returnConnection(connection); - } - } - - return connection != null; - } - - /** - * This method should be used to ensure graceful shutdown of the backend. - */ - public void shutdown() { - log.fine("shutting down"); - if (shutdownInitiated) { - throw new IllegalStateException("Shutdown already in progress"); - } - shutdownInitiated = true; - } - - public void close() { - for (Connection c = connectionPool.getConnection(); c != null; c = connectionPool.getConnection()) { - try { - c.close(); - } catch (IOException e) { - logWarning("closing", e); - } - } - } - - /** - * Connection factory used by the Listener class. - */ - public Connection newConnection(SocketChannel channel, Listener listener) { - return new FS4Connection(channel, listener, this, packetListener); - } - - public String toString () { - return("Backend/" + host + ":" + port); - } - - public BackendStatistics getStatistics() { - synchronized (connectionPool) { //ensure consistent values - return new BackendStatistics(connectionPool.activeConnections(), connectionPool.passiveConnections()); - } - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java deleted file mode 100644 index e84adfbef2c..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Logger; -import java.util.Iterator; -import java.util.Timer; -import java.util.TimerTask; - -import com.yahoo.log.LogLevel; -/** - * Pool of FS4 connections. - * - * @author Tony Vaagenes - */ -public class ConnectionPool { - - private final static int CLEANINGPERIOD = 1000; // Execute every second - private final Queue connections = new ConcurrentLinkedQueue<>(); - private final AtomicInteger activeConnections = new AtomicInteger(0); - private final AtomicInteger passiveConnections = new AtomicInteger(0); - private static final Logger log = Logger.getLogger(ConnectionPool.class.getName()); - - class PoolCleanerTask extends TimerTask { - private final ConnectionPool connectionPool; - public PoolCleanerTask(ConnectionPool connectionPool) { - this.connectionPool = connectionPool; - } - - public void run() { - try { - connectionPool.dropInvalidConnections(); - } catch (Exception e) { - log.log(LogLevel.WARNING, - "Caught exception in connection pool cleaner, ignoring.", - e); - } - } - } - - public ConnectionPool() { - } - - public ConnectionPool(Timer timer) { - timer.schedule(new PoolCleanerTask(this), CLEANINGPERIOD, CLEANINGPERIOD); - } - - private void dropInvalidConnections() { - for (Iterator i = connections.iterator(); i.hasNext();) { - FS4Connection connection = i.next(); - if (!connection.isValid()) { - i.remove(); - } - } - } - - private FS4Connection registerAsActiveIfNonZero(FS4Connection connection) { - activeConnections.incrementAndGet(); - passiveConnections.decrementAndGet(); - return connection; - } - - public FS4Connection getConnection() { - return registerAsActiveIfNonZero(connections.poll()); - } - - void releaseConnection(FS4Connection connection) { - assert(connection != null); - activeConnections.decrementAndGet(); - if (connection.isValid()) { - passiveConnections.incrementAndGet(); - connections.add(connection); - } - } - - void createdConnection() { - activeConnections.incrementAndGet(); - } - - int activeConnections() { - return activeConnections.get(); - } - - //unused connections in the pool - int passiveConnections() { - return passiveConnections.get(); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java deleted file mode 100644 index adfc63d02f7..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java +++ /dev/null @@ -1,255 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.concurrent.SystemTimer; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.Packet; -import com.yahoo.search.Query; -import com.yahoo.search.dispatch.ResponseMonitor; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * This class is used to represent a "channel" in the FS4 protocol. - * A channel represents a session between a client and the fdispatch. - * Internally this class has a response queue used by the backend - * for queueing up FS4 packets that belong to this channel (or - * session, which might be a more appropriate name for it). - * Outbound packets are handed off to the FS4Connection. - * - * @author Bjorn Borud - */ -public class FS4Channel { - - private static Logger log = Logger.getLogger(FS4Channel.class.getName()); - - private Integer channelId; - private Backend backend; - volatile private BlockingQueue responseQueue; - private Query query; - private boolean isPingChannel = false; - private ResponseMonitor monitor; - - /** for unit testing. do not use */ - protected FS4Channel () { - } - - protected FS4Channel(Backend backend, Integer channelId) { - this.channelId = channelId; - this.backend = backend; - this.responseQueue = new LinkedBlockingQueue<>(); - } - - static public FS4Channel createPingChannel(Backend backend) { - FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0)); - pingChannel.isPingChannel = true; - return pingChannel; - } - - /** Set the query currently associated with this channel */ - public void setQuery(Query query) { - this.query = query; - } - - /** Get the query currently associated with this channel */ - public Query getQuery() { - return query; - } - - /** Returns the (fs4) channel id */ - public Integer getChannelId () { - return channelId; - } - - /** - * Closes the channel - */ - public void close () { - BlockingQueue q = responseQueue; - responseQueue = null; - query = null; - if (isPingChannel) { - backend.removePingChannel(); - } else { - backend.removeChannel(channelId); - } - if (q != null) { - q.clear(); - } - } - - /** - * Legacy interface. - */ - public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { - ensureValid(); - return backend.sendPacket(packet, channelId); - } - - /** - * Receives the given number of packets and returns them, OR - *
    - *
  • Returns a smaller number of packets if an error or eol packet is received - *
  • Throws a ChannelTimeoutException if timeout occurs before all packets - * are received. Packets received with the wrong channel id are ignored. - *
- * - * @param timeout the number of ms to attempt to get packets before throwing an exception - * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error - */ - public BasicPacket[] receivePackets(long timeout, int packetCount) - throws InvalidChannelException, ChannelTimeoutException { - ensureValid(); - - List packets = new ArrayList<>(12); - long startTime = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeout; - - try { - while (timeLeft >= 0) { - BasicPacket p = nextPacket(timeLeft); - if (p == null) throw new ChannelTimeoutException("Timed out"); - - if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) { - log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId()); - continue; - } - - packets.add(p); - if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) { - BasicPacket[] packetArray = new BasicPacket[packets.size()]; - packets.toArray(packetArray); - return packetArray; - } - - // doing this last might save us one system call for the last - // packet. - timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime); - } - } - catch (InvalidChannelException e) { - // nop. if we get this we want to return the default - // zero length packet array indicating that we have no - // valid response - log.info("FS4Channel was invalid. timeLeft=" - + timeLeft + ", timeout=" + timeout); - } - catch (InterruptedException e) { - log.info("FS4Channel was interrupted. timeLeft=" - + timeLeft + ", timeout=" + timeout); - Thread.currentThread().interrupt(); - } - - // default return, we only hit this if we timed out and - // did not get the end of the packet stream - throw new ChannelTimeoutException(); - } - - private static boolean hasEnoughPackets(int packetCount,List packets) { - if (packetCount<0) return false; - return packets.size()>=packetCount; - } - - /** - * Returns true if we will definitely receive more packets on this stream - * - * Shouldn't that be "_not_ receive more packets"? - */ - private static boolean isLastPacket (BasicPacket packet) { - if (packet instanceof com.yahoo.fs4.ErrorPacket) return true; - if (packet instanceof com.yahoo.fs4.EolPacket) return true; - if (packet instanceof com.yahoo.fs4.PongPacket) return true; - return false; - } - - /** - * Return the next available packet from the response queue. If there - * are no packets available we wait a maximum of timeout - * milliseconds before returning a null - * - * @param timeout Number of milliseconds to wait for a packet - * to become available. - * - * @return Returns the next available BasicPacket or - * null if we timed out. - */ - public BasicPacket nextPacket(long timeout) - throws InterruptedException, InvalidChannelException - { - return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS); - } - - /** - * Add incoming packet to the response queue. This is to be used - * by the listener for placing incoming packets in the response - * queue. - * - * @param packet BasicPacket to be placed in the response queue. - * - */ - protected void addPacket (BasicPacket packet) - throws InterruptedException, InvalidChannelException - { - ensureValidQ().put(packet); - notifyMonitor(); - } - - /** - * A valid FS4Channel is one that has not yet been closed. - * - * @return Returns true if the FS4Channel is valid. - */ - public boolean isValid () { - return responseQueue != null; - } - - /** - * This method is called whenever we want to perform an operation - * which assumes that the FS4Channel object is valid. An exception - * is thrown if the opposite turns out to be the case. - * - * @throws InvalidChannelException if the channel is no longer valid. - */ - private void ensureValid () throws InvalidChannelException { - if (isValid()) { - return; - } - throw new InvalidChannelException("Channel is no longer valid"); - } - - /** - * This method is called whenever we want to perform an operation - * which assumes that the FS4Channel object is valid. An exception - * is thrown if the opposite turns out to be the case. - * - * @throws InvalidChannelException if the channel is no longer valid. - */ - private BlockingQueue ensureValidQ () throws InvalidChannelException { - BlockingQueue q = responseQueue; - if (q != null) { - return q; - } - throw new InvalidChannelException("Channel is no longer valid"); - } - - public String toString() { - return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]"); - } - - public void setResponseMonitor(ResponseMonitor monitor) { - this.monitor = monitor; - } - - protected void notifyMonitor() { - if(monitor != null) { - monitor.responseAvailable(this); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java deleted file mode 100644 index 7dcbefde9fa..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.fs4.mplex; - - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.LinkedList; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.BufferTooSmallException; -import com.yahoo.fs4.PacketDecoder; -import com.yahoo.fs4.PacketListener; -import com.yahoo.io.Connection; -import com.yahoo.io.Listener; -import com.yahoo.log.LogLevel; - -/** - * - * This class is used to represent a connection to an fdispatch - * - * @author Bjorn Borud - */ -public class FS4Connection implements Connection -{ - private static Logger log = Logger.getLogger(FS4Connection.class.getName()); - private Backend backend; - private Listener listener; - private SocketChannel channel; - - private boolean shouldWrite = false; - - private static int idCounter = 1; - private int idNumber; - - // outbound data - private ByteBuffer writeBuffer; - private LinkedList writeBufferList = new LinkedList<>(); - - // inbound data - private ByteBuffer fixedReadBuffer = ByteBuffer.allocateDirect(256 * 1024); - private ByteBuffer readBuffer = fixedReadBuffer; - - private volatile boolean valid = true; - - private final PacketListener packetListener; - - - /** - * Create an FS4 Connection. - */ - public FS4Connection (SocketChannel channel, Listener listener, Backend backend, PacketListener packetListener) { - this.backend = backend; - this.listener = listener; - this.channel = channel; - this.idNumber = idCounter++; - this.packetListener = packetListener; - - log.log(Level.FINER, "new: "+this+", id="+idNumber + ", address=" + backend.getAddress()); - } - - - /** - * Packet sending interface. - */ - public void sendPacket (BasicPacket packet, Integer channelId) throws IOException { - ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc()); - ByteBuffer viewForPacketListener = buffer.slice(); - synchronized (this) { - if (!(valid && channel.isOpen())) { - throw new IllegalStateException("Connection is not valid. " + - "Address = " + backend.getAddress() + - ", valid = " + valid + - ", isOpen = " + channel.isOpen()); - } - - if (writeBuffer == null) { - writeBuffer = buffer; - } else { - writeBufferList.addLast(buffer); - enableWrite(); - } - write(); - } - - if (packetListener != null) - packetListener.packetSent(backend.getChannel(channelId), packet, viewForPacketListener); - } - - - /** - * The write event handler. This can be called both from the client - * thread and from the IO thread, so it needs to be synchronized. It - * assumes that IO is nonblocking, and will attempt to keep writing - * data until the system won't accept more data. - * - */ - public synchronized void write () throws IOException { - if (! channel.isOpen()) { - throw new IllegalStateException("Channel not open in write(), address=" + backend.getAddress()); - } - - try { - int bytesWritten = 0; - boolean isFinished = false; - do { - // if writeBuffer is not set we need to fetch the next buffer - if (writeBuffer == null) { - - // if the list is empty, signal the selector we do not need - // to do any writing for a while yet and bail - if (writeBufferList.isEmpty()) { - disableWrite(); - isFinished = true; - break; - } - writeBuffer = writeBufferList.removeFirst(); - } - - // invariants: we have a writeBuffer - bytesWritten = channel.write(writeBuffer); - - // buffer drained so we forget it and see what happens when we - // go around. if indeed we go around - if (!writeBuffer.hasRemaining()) { - writeBuffer.clear(); - backend.getBufferPool().free(writeBuffer); - writeBuffer = null; - } - } while (bytesWritten > 0); - if (!isFinished) { - enableWrite(); - } - } catch (IOException e) { - log.log(LogLevel.DEBUG, "Failed writing to channel for backend " + backend.getAddress() + - ". Closing channel", e); - try { - close(); - } catch (IOException ignored) {} - - throw e; - } - } - - - private void disableWrite() { - if (shouldWrite) { - listener.modifyInterestOpsBatch(this, SelectionKey.OP_WRITE, false); - shouldWrite = false; - } - } - - - private void enableWrite() { - if (!shouldWrite) { - listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true); - shouldWrite = true; - } - } - - - - public void read () throws IOException { - if (! channel.isOpen()) { - throw new IOException("Channel not open in read(), address=" + backend.getAddress()); - } - - int bytesRead = 0; - - do { - try { - if (readBuffer == fixedReadBuffer) { - bytesRead = channel.read(readBuffer); - } else { - fixedReadBuffer.clear(); - if (readBuffer.remaining() < fixedReadBuffer.capacity()) { - fixedReadBuffer.limit(readBuffer.remaining()); - } - bytesRead = channel.read(fixedReadBuffer); - fixedReadBuffer.flip(); - readBuffer.put(fixedReadBuffer); - fixedReadBuffer.clear(); - } - } - catch (IOException e) { - // this is the "normal" way that connection closes. - log.log(Level.FINER, "Read exception address=" + backend.getAddress() + " id="+idNumber+": "+ - e.getClass().getName()+" / ", e); - bytesRead = -1; - } - - // end of file - if (bytesRead == -1) { - log.log(LogLevel.DEBUG, "Dispatch closed connection" - + " (id="+idNumber+", address=" + backend.getAddress() + ")"); - try { - close(); - } catch (Exception e) { - log.log(Level.WARNING, "Close failed, address=" + backend.getAddress(), e); - } - } - - // no more read - if (bytesRead == 0) { - // buffer too small? - if (! readBuffer.hasRemaining()) { - log.fine("Buffer possibly too small, extending"); - readBuffer.flip(); - extendReadBuffer(readBuffer.capacity() * 2); - } - } - - } while (bytesRead > 0); - - readBuffer.flip(); - - // hand off packet extraction - extractPackets(readBuffer); - } - - private void extractPackets(ByteBuffer readBuffer) { - for (;;) { - PacketDecoder.DecodedPacket packet = null; - try { - FS4Channel receiver = null; - int queryId = PacketDecoder.sniffChannel(readBuffer); - if (queryId == 0) { - if (PacketDecoder.isPongPacket(readBuffer)) - receiver = backend.getPingChannel(); - } - else { - receiver = backend.getChannel(Integer.valueOf(queryId)); - } - packet = PacketDecoder.extractPacket(readBuffer); - - if (packet != null) - packetListener.packetReceived(receiver, packet.packet, packet.consumedBytes); - } - catch (BufferTooSmallException e) { - log.fine("Unable to decode, extending readBuffer"); - extendReadBuffer(PacketDecoder.packetLength(readBuffer)); - return; - } - - // break out of loop if we did not get a packet out of the - // buffer so we can select and read some more - if (packet == null) { - - // if the buffer has been cleared, we can do a reset - // of the readBuffer - if ((readBuffer.position() == 0) - && (readBuffer.limit() == readBuffer.capacity())) - { - resetReadBuffer(); - } - break; - } - - backend.receivePacket(packet.packet); - } - } - - /** - * This is called when we close the connection to do any - * pending cleanup work. Closing a connection marks it as - * not valid. - */ - public void close () throws IOException { - valid = false; - channel.close(); - log.log(Level.FINER, "invalidated id="+idNumber + " address=" + backend.getAddress()); - } - - /** - * Upon asynchronous connect completion this method is called by - * the Listener. - */ - public void connect() throws IOException { - throw new RuntimeException("connect() was called, address=" + backend.getAddress() + ". " - + "asynchronous connect in NIO is flawed!"); - } - - /** - * Since we are performing an asynchronous connect we are initially - * only interested in the OP_CONNECT event. - */ - public int selectOps () { - return SelectionKey.OP_READ; - } - - /** - * Return the underlying SocketChannel used by this connection. - */ - public SocketChannel socketChannel() { - return channel; - } - - - public String toString () { - return FS4Connection.class.getName() + "/" + channel; - } - - - //============================================================ - //==== readbuffer management - //============================================================ - - - /** - * Extend the readBuffer. Make a new buffer of the requested size - * copy the contents of the readBuffer into it and assign reference - * to readBuffer instance variable. - * - *

- * The readBuffer needs to be in "readable" (flipped) state before - * this is called and it will be in the "writeable" state when it - * returns. - */ - private void extendReadBuffer (int size) { - // we specifically check this because packetLength() can return -1 - // and someone might alter the code so that we do in fact get -1 - // ...which never happens as the code is now - // - if (size == -1) { - throw new RuntimeException("Invalid buffer size requested: -1"); - } - - // if we get a size that is smaller than the current - // readBuffer capacity we just double it. not sure how wise this - // might be. - // - if (size < readBuffer.capacity()) { - size = readBuffer.capacity() * 2; - } - - ByteBuffer tmp = ByteBuffer.allocate(size); - tmp.put(readBuffer); - log.fine("Extended readBuffer to " + size + " bytes" - + "from " + readBuffer.capacity() + " bytes"); - readBuffer = tmp; - } - - /** - * Clear the readBuffer, and if temporarily allocated bigger - * buffer is in use: ditch it and reset the reference to the - * fixed readBuffer. - */ - private void resetReadBuffer () { - fixedReadBuffer.clear(); - if (readBuffer == fixedReadBuffer) { - return; - } - log.fine("Resetting readbuffer"); - readBuffer = fixedReadBuffer; - } - - /** - * This method is used to determine whether the connection is still - * viable or not. All connections are initially valid, but they - * become invalid if we close the connection or something bad happens - * and the connection needs to be ditched. - */ - public boolean isValid() { - return valid; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java deleted file mode 100644 index 6176d069645..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- - -package com.yahoo.fs4.mplex; - -/** - * @author Bj\u00f8rn Borud - */ -@SuppressWarnings("serial") -public class InvalidChannelException extends Exception -{ - public InvalidChannelException (String message) { - super(message); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java deleted file mode 100644 index d76d270dcb7..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.io.FatalErrorHandler; -import com.yahoo.io.Listener; - -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Pool of com.yahoo.io.Listener instances for shared use by Vespa backend - * searchers. - * - * @author baldersheim - * @since 5.3.0 - */ -public final class ListenerPool { - private final static Logger logger = Logger.getLogger(ListenerPool.class.getName()); - private final List listeners; - - public ListenerPool(String name, int numListeners) { - listeners = new ArrayList<>(numListeners); - FatalErrorHandler fatalErrorHandler = new FatalErrorHandler(); - for (int i = 0; i < numListeners; i++) { - Listener listener = new Listener(name + "-" + i); - listener.setFatalErrorHandler(fatalErrorHandler); - listener.start(); - listeners.add(listener); - } - } - - public Listener get(int index) { - return listeners.get(index); - } - - public int size() { - return listeners.size(); - } - - public void close() { - for (Listener listener : listeners) { - listener.interrupt(); - } - try { - for (Listener listener : listeners) { - listener.join(); - } - } catch (InterruptedException e) { - logger.log(Level.WARNING, "Got interrupted", e); - Thread.currentThread().interrupt(); - } - } - -} diff --git a/container-search/src/main/java/com/yahoo/prelude/Pong.java b/container-search/src/main/java/com/yahoo/prelude/Pong.java index a6bc3e7975d..a60fba9a4f7 100644 --- a/container-search/src/main/java/com/yahoo/prelude/Pong.java +++ b/container-search/src/main/java/com/yahoo/prelude/Pong.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude; -import com.yahoo.fs4.PongPacket; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.statistics.ElapsedTime; @@ -19,28 +18,19 @@ public class Pong { private String pingInfo=""; private final List errors = new ArrayList<>(1); - private final Optional pongPacket; private ElapsedTime elapsed = new ElapsedTime(); private final Optional activeDocuments; public Pong() { - this.pongPacket = Optional.empty(); this.activeDocuments = Optional.empty(); } public Pong(ErrorMessage error) { errors.add(error); - this.pongPacket = Optional.empty(); - this.activeDocuments = Optional.empty(); - } - - public Pong(PongPacket pongPacket) { - this.pongPacket = Optional.of(pongPacket); this.activeDocuments = Optional.empty(); } public Pong(long activeDocuments) { - this.pongPacket = Optional.empty(); this.activeDocuments = Optional.of(activeDocuments); } @@ -52,21 +42,14 @@ public class Pong { return errors.get(i); } - public int getErrorSize() { - return errors.size(); - } - /** Returns the number of active documents in the backend responding in this Pong, if available */ public Optional activeDocuments() { - if (activeDocuments.isPresent()) return activeDocuments; - if ( ! pongPacket.isPresent()) return Optional.empty(); - return pongPacket.get().getActiveDocuments(); + return activeDocuments; } /** Returns the number of nodes which responded to this Pong, if available */ public Optional activeNodes() { - if ( ! pongPacket.isPresent()) return Optional.empty(); - return pongPacket.get().getActiveNodes(); + return Optional.empty(); } public List getErrors() { @@ -78,16 +61,6 @@ public class Pong { return ! errors.isEmpty(); } - /** Sets information about the ping used to produce this. This is included when returning the tostring of this. */ - public void setPingInfo(String pingInfo) { - if (pingInfo==null) - pingInfo=""; - this.pingInfo=pingInfo; - } - - /** Returns information about the ping use, or "" (never null) if none */ - public String getPingInfo() { return pingInfo; } - public ElapsedTime getElapsedTime() { return elapsed; } diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java deleted file mode 100644 index c075a0f842b..00000000000 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.cluster; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.yahoo.component.provider.Freezable; -import com.yahoo.container.handler.VipStatus; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.result.ErrorMessage; - -/** - * Monitors of a cluster of remote nodes. The monitor uses an internal thread - * for node monitoring. - * - * @author bratseth - * @author Steinar Knutsen - */ -public class ClusterMonitor implements Runnable, Freezable { - - // The ping thread wil start using the system, but we cannot be guaranteed that all components - // in the system is up. As a workaround for not being able to find out when the system - // is ready to be used, we wait some time before starting the ping thread - private static final int pingThreadInitialDelayMs = 3000; - - private final MonitorConfiguration configuration; - - private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName()); - - private final ClusterSearcher nodeManager; - - private final Optional vipStatus; - - /** A map from Node to corresponding MonitoredNode */ - private final Map nodeMonitors = new java.util.IdentityHashMap<>(); - - private ScheduledFuture future; - - private boolean isFrozen = false; - - ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional vipStatus) { - configuration = new MonitorConfiguration(monitorConfig); - nodeManager = manager; - this.vipStatus = vipStatus; - log.fine("checkInterval is " + configuration.getCheckInterval() + " ms"); - } - - /** Returns the configuration of this cluster monitor */ - MonitorConfiguration getConfiguration() { - return configuration; - } - - void startPingThread() { - if ( ! isFrozen()) - throw new IllegalStateException("Do not start the monitoring thread before the set of " + - "nodes to monitor is complete/the ClusterMonitor is frozen."); - future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS); - } - - /** - * Adds a new node for monitoring. - */ - void add(VespaBackEndSearcher node) { - if (isFrozen()) - throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen."); - nodeMonitors.put(node, new NodeMonitor(node)); - updateVipStatus(); - } - - /** Called from ClusterSearcher/NodeManager when a node failed */ - void failed(VespaBackEndSearcher node, ErrorMessage error) { - NodeMonitor monitor = nodeMonitors.get(node); - boolean wasWorking = monitor.isWorking(); - monitor.failed(error); - if (wasWorking && !monitor.isWorking()) { - log.info("Failed monitoring node '" + node + "' due to '" + error); - nodeManager.failed(node); - } - updateVipStatus(); - } - - /** Called when a node responded */ - void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) { - NodeMonitor monitor = nodeMonitors.get(node); - boolean wasFailing = !monitor.isWorking(); - monitor.responded(hasSearchNodesOnline); - if (wasFailing && monitor.isWorking()) { - log.info("Failed node '" + node + "' started working again."); - nodeManager.working(node); - } - updateVipStatus(); - } - - private void updateVipStatus() { - if ( ! vipStatus.isPresent()) return; - if ( ! hasInformationAboutAllNodes()) return; - - if (hasWorkingNodesWithDocumentsOnline()) { - vipStatus.get().addToRotation(nodeManager.getId().stringValue()); - } else { - vipStatus.get().removeFromRotation(nodeManager.getId().stringValue()); - } - } - - private boolean hasInformationAboutAllNodes() { - for (NodeMonitor monitor : nodeMonitors.values()) { - if ( ! monitor.statusIsKnown()) - return false; - } - return true; - } - - private boolean hasWorkingNodesWithDocumentsOnline() { - for (NodeMonitor node : nodeMonitors.values()) { - if (node.isWorking() && node.searchNodesOnline()) - return true; - } - return false; - } - - /** - * Ping all nodes which needs pinging to discover state changes - */ - private void ping() throws InterruptedException { - for (NodeMonitor monitor : nodeMonitors.values()) { - nodeManager.ping(monitor.getNode()); - } - } - - @Override - public void run() { - log.finest("Activating ping"); - try { - ping(); - } catch (Exception e) { - log.log(Level.WARNING, "Error in monitor thread", e); - } - } - - public void shutdown() { - if (future != null) { - future.cancel(true); - } - } - - @Override - public void freeze() { - isFrozen = true; - - } - - @Override - public boolean isFrozen() { - return isFrozen; - } - -} diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index 4ffcc0a4330..b0d9c2b0002 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -2,20 +2,13 @@ package com.yahoo.prelude.cluster; import com.yahoo.cloud.config.ClusterInfoConfig; -import com.yahoo.collections.Tuple2; import com.yahoo.component.ComponentId; -import com.yahoo.component.chain.Chain; import com.yahoo.component.chain.dependencies.After; -import com.yahoo.concurrent.Receiver; -import com.yahoo.concurrent.Receiver.MessageState; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.handler.VipStatus; -import com.yahoo.fs4.mplex.Backend; import com.yahoo.jdisc.Metric; import com.yahoo.net.HostName; import com.yahoo.prelude.IndexFacts; -import com.yahoo.prelude.Ping; -import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; import com.yahoo.prelude.fastsearch.FS4ResourcePool; @@ -46,11 +39,7 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.logging.Logger; import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING; @@ -64,10 +53,6 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S @After("*") public class ClusterSearcher extends Searcher { - private final static Logger log = Logger.getLogger(ClusterSearcher.class.getName()); - - private final ClusterMonitor monitor; - private final Value cacheHitRatio; private final String clusterModelName; @@ -78,8 +63,6 @@ public class ClusterSearcher extends Searcher { // Mapping from rank profile names to document types containing them private final Map> rankProfiles = new HashMap<>(); - private final FS4ResourcePool fs4ResourcePool; - private final long maxQueryTimeout; // in milliseconds private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; @@ -88,7 +71,6 @@ public class ClusterSearcher extends Searcher { private VespaBackEndSearcher server = null; - /** * Creates a new ClusterSearcher. */ @@ -96,7 +78,6 @@ public class ClusterSearcher extends Searcher { QrSearchersConfig qrsConfig, ClusterConfig clusterConfig, DocumentdbInfoConfig documentDbConfig, - QrMonitorConfig monitorConfig, DispatchConfig dispatchConfig, ClusterInfoConfig clusterInfoConfig, Statistics manager, @@ -104,13 +85,8 @@ public class ClusterSearcher extends Searcher { FS4ResourcePool fs4ResourcePool, VipStatus vipStatus) { super(id); - this.fs4ResourcePool = fs4ResourcePool; - - Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, fs4ResourcePool, clusterInfoConfig.nodeCount(), vipStatus, metric); - monitor = (dispatcher.searchCluster().directDispatchTarget().isPresent()) // dispatcher should decide vip status instead - ? new ClusterMonitor(this, monitorConfig, Optional.empty()) - : new ClusterMonitor(this, monitorConfig, Optional.of(vipStatus)); + Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus, metric); int searchClusterIndex = clusterConfig.clusterId(); clusterModelName = clusterConfig.clusterName(); @@ -148,9 +124,8 @@ public class ClusterSearcher extends Searcher { for (int dispatcherIndex = 0; dispatcherIndex < searchClusterConfig.dispatcher().size(); dispatcherIndex++) { try { if ( ! isRemote(searchClusterConfig.dispatcher(dispatcherIndex).host())) { - Backend dispatchBackend = createBackend(searchClusterConfig.dispatcher(dispatcherIndex)); - FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool, docSumParams, - documentDbConfig, dispatchBackend, dispatcher, dispatcherIndex); + FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool.getServerId(), docSumParams, + documentDbConfig, dispatcher, dispatcherIndex); addBackendSearcher(searcher); } } catch (UnknownHostException e) { @@ -162,8 +137,6 @@ public class ClusterSearcher extends Searcher { if ( server == null ) { throw new IllegalStateException("ClusterSearcher should have a top level dispatch."); } - monitor.freeze(); - monitor.startPingThread(); } private static QrSearchersConfig.Searchcluster getSearchClusterConfigFromClusterName(QrSearchersConfig config, String name) { @@ -189,15 +162,14 @@ public class ClusterSearcher extends Searcher { } private static FastSearcher searchDispatch(int searchclusterIndex, - FS4ResourcePool fs4ResourcePool, + String serverId, SummaryParameters docSumParams, DocumentdbInfoConfig documentdbInfoConfig, - Backend backend, Dispatcher dispatcher, int dispatcherIndex) { ClusterParams clusterParams = makeClusterParams(searchclusterIndex, dispatcherIndex); - return new FastSearcher(backend, fs4ResourcePool, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); + return new FastSearcher(serverId, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); } private static VdsStreamingSearcher vdsCluster(String serverId, @@ -210,8 +182,7 @@ public class ClusterSearcher extends Searcher { throw new IllegalArgumentException("Search clusters in streaming search shall only contain a single searchdefinition : " + searchClusterConfig.searchdef()); } ClusterParams clusterParams = makeClusterParams(searchclusterIndex, 0); - VdsStreamingSearcher searcher = (VdsStreamingSearcher) VespaBackEndSearcher - .getSearcher("com.yahoo.vespa.streamingvisitors.VdsStreamingSearcher"); + VdsStreamingSearcher searcher = new VdsStreamingSearcher(); searcher.setSearchClusterConfigId(searchClusterConfig.rankprofiles().configid()); searcher.setDocumentType(searchClusterConfig.searchdef(0)); searcher.setStorageClusterRouteSpec(searchClusterConfig.storagecluster().routespec()); @@ -222,25 +193,14 @@ public class ClusterSearcher extends Searcher { /** Do not use, for internal testing purposes only. **/ ClusterSearcher(Set documentTypes) { this.documentTypes = documentTypes; - monitor = new ClusterMonitor(this, new QrMonitorConfig(new QrMonitorConfig.Builder()), Optional.of(new VipStatus())); cacheHitRatio = new Value("com.yahoo.prelude.cluster.ClusterSearcher.ClusterSearcher().dummy", Statistics.nullImplementation, new Value.Parameters()); clusterModelName = "testScenario"; - fs4ResourcePool = null; maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT; maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT; } - private Backend createBackend(QrSearchersConfig.Searchcluster.Dispatcher disp) { - return fs4ResourcePool.getBackend(disp.host(), disp.port()); - } - - ClusterMonitor getMonitor() { - return monitor; - } - void addBackendSearcher(VespaBackEndSearcher searcher) { - monitor.add(searcher); server = searcher; } @@ -479,77 +439,6 @@ public class ClusterSearcher extends Searcher { cacheHitRatio.put(0.0); } - /** NodeManager method, called from ClusterMonitor. */ - void working(VespaBackEndSearcher node) { - server = node; - } - - /** Called from ClusterMonitor. */ - void failed(VespaBackEndSearcher node) { - server = null; - } - - /** - * Pinging a node, called from ClusterMonitor. - */ - void ping(VespaBackEndSearcher node) throws InterruptedException { - log.fine("Sending ping to: " + node); - Pinger pinger = new Pinger(node); - - getExecutor().execute(pinger); - Pong pong = pinger.getPong(); // handles timeout - if (pong == null) { - monitor.failed(node, ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out.")); - } else if (pong.badResponse()) { - monitor.failed(node, pong.getError(0)); - } else { - monitor.responded(node, backendCanServeDocuments(pong)); - } - } - - private boolean backendCanServeDocuments(Pong pong) { - if ( ! pong.activeNodes().isPresent()) return true; // no information; assume true - return pong.activeNodes().get() > 0; - } - @Override - public void deconstruct() { - monitor.shutdown(); - } - - ExecutorService getExecutor() { - return fs4ResourcePool.getExecutor(); - } - - ScheduledExecutorService getScheduledExecutor() { - return fs4ResourcePool.getScheduledExecutor(); - } - - private class Pinger implements Runnable { - - private final Searcher searcher; - private final Ping pingChallenge = new Ping(monitor.getConfiguration().getRequestTimeout()); - private final Receiver pong = new Receiver<>(); - - Pinger(final Searcher searcher) { - this.searcher = searcher; - } - - @Override - public void run() { - pong.put(createExecution().ping(pingChallenge)); - } - - private Execution createExecution() { - return new Execution(new Chain<>(searcher), - new Execution.Context(null, null, null, null, null)); - } - - public Pong getPong() throws InterruptedException { - Tuple2 reply = pong.get(pingChallenge.getTimeout() + 150); - return (reply.first != MessageState.VALID) ? null : reply.second; - } - - } - + public void deconstruct() { } } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java index 524e842eacd..de07839e3e3 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java @@ -10,8 +10,6 @@ package com.yahoo.prelude.fastsearch; -import java.nio.ByteBuffer; - import com.yahoo.prelude.hitfield.RawData; import com.yahoo.data.access.simple.Value; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java index 4f52ef91725..329a9caaf91 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java @@ -6,7 +6,6 @@ import com.yahoo.log.LogLevel; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java index 6b1445229ec..f6f8006d2d2 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java @@ -1,9 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; - -import java.nio.ByteBuffer; - import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java deleted file mode 100644 index 59bc781c8b2..00000000000 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.GetDocSumsPacket; -import com.yahoo.fs4.Packet; -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.fs4.mplex.InvalidChannelException; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.FillInvoker; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Hit; - -import java.io.IOException; -import java.util.Iterator; - -import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator; - -/** - * {@link FillInvoker} implementation for FS4 nodes and fdispatch - * - * @author ollivir - */ -public class FS4FillInvoker extends FillInvoker { - private final VespaBackEndSearcher searcher; - private FS4Channel channel; - - private int expectedFillResults = 0; - - // fdispatch code path - FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) { - this.searcher = searcher; - this.channel = backend.openChannel(); - channel.setQuery(query); - } - - @Override - protected void sendFillRequest(Result result, String summaryClass) { - - if (countUnfilledFastHits(result, summaryClass) > 0) { - try { - expectedFillResults = requestSummaries(result, summaryClass); - } catch (InvalidChannelException e) { - result.hits() - .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); - } catch (IOException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError( - "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage())); - } - } else { - expectedFillResults = 0; - } - } - - - @Override - protected void getFillResults(Result result, String summaryClass) { - if (expectedFillResults == 0) { - return; - } - - Packet[] receivedPackets; - try { - receivedPackets = getSummaryResponses(result); - } catch (InvalidChannelException e1) { - result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); - return; - } catch (ChannelTimeoutException e1) { - result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); - return; - } - - if (receivedPackets.length == 0) { - result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); - return; - } - - int skippedHits; - try { - FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass); - skippedHits = fillHitsResult.skippedHits; - if (fillHitsResult.error != null) { - result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); - return; - } - } catch (TimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); - return; - } catch (IOException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError( - "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage())); - return; - } - - if (skippedHits > 0) - result.hits().addError( - ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits")); - result.analyzeHits(); - - if (channel.getQuery().getTraceLevel() >= 3) { - int hitNumber = 0; - for (Iterator i = hitIterator(result); i.hasNext();) { - com.yahoo.search.result.Hit hit = i.next(); - if (!(hit instanceof FastHit)) - continue; - FastHit fastHit = (FastHit) hit; - - String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend"); - if (!fastHit.isFilled(summaryClass)) - traceMsg += ". Error, hit, not filled"; - channel.getQuery().trace(traceMsg, false, 3); - } - } - } - - @Override - public void release() { - if (channel != null) { - channel.close(); - channel = null; - } - } - - private int countUnfilledFastHits(Result result, String summaryClass) { - int count = 0; - for (Iterator i = hitIterator(result); i.hasNext();) { - Hit hit = i.next(); - if (hit instanceof FastHit && !hit.isFilled(summaryClass)) - count++; - } - return count; - } - - private int requestSummaries(Result result, String summaryClass) throws InvalidChannelException, ClassCastException, IOException { - - boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery()); - if (result.getQuery().getTraceLevel() >= 3) - result.getQuery().trace((summaryNeedsQuery ? "FS4: Resending " : "Not resending ") + "query during document summary fetching", 3); - - GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery); - int compressionLimit = result.getQuery().properties().getInteger(FS4SearchInvoker.PACKET_COMPRESSION_LIMIT, 0); - docsumsPacket.setCompressionLimit(compressionLimit); - if (compressionLimit != 0) { - docsumsPacket.setCompressionType(result.getQuery().properties().getString(FS4SearchInvoker.PACKET_COMPRESSION_TYPE, "lz4")); - } - - boolean couldSend = channel.sendPacket(docsumsPacket); - if (!couldSend) - throw new IOException("Could not successfully send GetDocSumsPacket."); - - return docsumsPacket.getNumDocsums() + 1; - } - - private Packet[] getSummaryResponses(Result result) throws InvalidChannelException, ChannelTimeoutException { - if(expectedFillResults == 0) { - return new Packet[0]; - } - BasicPacket[] receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), expectedFillResults); - - return convertBasicPackets(receivedPackets); - } - - private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException { - // trying to cast a BasicPacket[] to Packet[] will compile, - // but lead to a runtime error. At least that's what I got - // from testing and reading the specification. I'm just happy - // if someone tells me what's the proper Java way of doing - // this. -SK - Packet[] packets = new Packet[basicPackets.length]; - - for (int i = 0; i < basicPackets.length; i++) { - packets[i] = (Packet) basicPackets[i]; - } - return packets; - } - - private String getName() { - return searcher.getName(); - } -} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java deleted file mode 100644 index 2abaf341c58..00000000000 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch; - -import com.yahoo.prelude.Pong; -import com.yahoo.search.cluster.ClusterMonitor; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.PingFactory; -import com.yahoo.search.dispatch.searchcluster.Pinger; - -import java.util.concurrent.Callable; - -/** - * FS4PingFactory constructs {@link Pinger} objects that communicate with - * content nodes or dispatchers over the fnet/FS4 protocol - * - * @author ollivir - */ -public class FS4PingFactory implements PingFactory { - private final FS4ResourcePool fs4ResourcePool; - - public FS4PingFactory(FS4ResourcePool fs4ResourcePool) { - this.fs4ResourcePool = fs4ResourcePool; - } - - @Override - public Callable createPinger(Node node, ClusterMonitor monitor) { - return new Pinger(node, monitor, fs4ResourcePool); - } -} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java index f85a4019b78..ed9eb72d7dd 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java @@ -5,14 +5,6 @@ import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.QrConfig; -import com.yahoo.container.search.Fs4Config; -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.fs4.mplex.ConnectionPool; -import com.yahoo.fs4.mplex.ListenerPool; - -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -22,7 +14,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * Provider for {@link com.yahoo.fs4.mplex.ListenerPool}. All users will get the same pool instance. + * All users will get the same pool instance. * * @author baldersheim */ @@ -32,22 +24,18 @@ public class FS4ResourcePool extends AbstractComponent { private static final AtomicInteger instanceCounter = new AtomicInteger(0); private final String serverId; private final int instanceId; - private final ListenerPool listeners; - private final Timer timer = new Timer(); // This is a timer for cleaning the closed connections - private final Map connectionPoolMap = new HashMap<>(); private final ExecutorService executor; private final ScheduledExecutorService scheduledExecutor; @Inject - public FS4ResourcePool(Fs4Config fs4Config, QrConfig config) { - this(config.discriminator(), fs4Config.numlistenerthreads()); + public FS4ResourcePool(QrConfig config) { + this(config.discriminator()); } - public FS4ResourcePool(String serverId, int listenerThreads) { + public FS4ResourcePool(String serverId) { this.serverId = serverId; instanceId = instanceCounter.getAndIncrement(); String name = "FS4-" + instanceId; - listeners = new ListenerPool(name, listenerThreads); executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(name)); scheduledExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryFactory.getDaemonThreadFactory(name + ".scheduled")); } @@ -57,28 +45,10 @@ public class FS4ResourcePool extends AbstractComponent { public ExecutorService getExecutor() { return executor; } public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; } - public Backend getBackend(String host, int port) { - String key = host + ":" + port; - synchronized (connectionPoolMap) { - Backend pool = connectionPoolMap.get(key); - if (pool == null) { - pool = new Backend(host, port, serverId, listeners, new ConnectionPool(timer)); - connectionPoolMap.put(key, pool); - } - return pool; - } - } - @Override public void deconstruct() { logger.log(Level.INFO, "Deconstructing FS4ResourcePool with id '" + instanceId + "'."); super.deconstruct(); - listeners.close(); - timer.cancel(); - for (Backend backend : connectionPoolMap.values()) { - backend.shutdown(); - backend.close(); - } executor.shutdown(); scheduledExecutor.shutdown(); try { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java deleted file mode 100644 index f3867288b29..00000000000 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch; - -import com.yahoo.data.access.simple.Value; -import com.yahoo.data.access.slime.SlimeAdapter; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.DocumentInfo; -import com.yahoo.fs4.FS4Properties; -import com.yahoo.fs4.QueryPacket; -import com.yahoo.fs4.QueryPacketData; -import com.yahoo.fs4.QueryResultPacket; -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.fs4.mplex.InvalidChannelException; -import com.yahoo.io.GrowableByteBuffer; -import com.yahoo.log.LogLevel; -import com.yahoo.prelude.ConfigurationException; -import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.InvokerResult; -import com.yahoo.search.dispatch.LeanHit; -import com.yahoo.search.dispatch.ResponseMonitor; -import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.query.Sorting; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Relevance; -import com.yahoo.search.searchchain.Execution; -import com.yahoo.searchlib.aggregation.Grouping; -import com.yahoo.slime.BinaryFormat; -import com.yahoo.vespa.objects.BufferSerializer; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.logging.Logger; - -/** - * {@link SearchInvoker} implementation for FS4 nodes and fdispatch - * - * @author ollivir - */ -public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor { - static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); - static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype"); - - private static final Logger log = Logger.getLogger(FS4SearchInvoker.class.getName()); - - private final VespaBackEndSearcher searcher; - private FS4Channel channel; - - private ErrorMessage pendingSearchError = null; - private Query query = null; - private QueryPacket queryPacket = null; - - public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, Optional node) { - super(node); - this.searcher = searcher; - this.channel = channel; - - channel.setQuery(query); - channel.setResponseMonitor(this); - } - - @Override - protected void sendSearchRequest(Query query) throws IOException { - log.finest("sending query packet"); - - this.query = query; - createQueryPacket(searcher.getServerId(), query); - - try { - boolean couldSend = channel.sendPacket(queryPacket); - if (!couldSend) { - setPendingError("Could not reach '" + getName() + "'"); - } - } catch (InvalidChannelException e) { - setPendingError("Invalid channel " + getName()); - } catch (IllegalStateException e) { - setPendingError("Illegal state in FS4: " + e.getMessage()); - } - } - - private void setPendingError(String message) { - pendingSearchError = ErrorMessage.createBackendCommunicationError(message); - responseAvailable(); - } - - @Override - protected InvokerResult getSearchResult(Execution execution) throws IOException { - if (pendingSearchError != null) { - return errorResult(query, pendingSearchError); - } - BasicPacket[] basicPackets; - - try { - basicPackets = channel.receivePackets(query.getTimeLeft(), 1); - } catch (ChannelTimeoutException e) { - return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } catch (InvalidChannelException e) { - return errorResult(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); - } - - if (basicPackets.length == 0) { - return errorResult(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); - } - - log.finest(() -> "got packets " + basicPackets.length + " packets"); - - basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); - QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; - - log.finest(() -> "got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); - - if (query.getPresentation().getSummary() == null) - query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); - - InvokerResult result = new InvokerResult(query, resultPacket.getDocumentCount()); - - addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result.getResult()); - addUnfilledHits(result.getLeanHits(), resultPacket.getDocuments(), queryPacket.getQueryPacketData()); - - return result; - } - - private QueryPacket createQueryPacket(String serverId, Query query) { - this.queryPacket = QueryPacket.create(serverId, query); - int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); - queryPacket.setCompressionLimit(compressionLimit); - if (compressionLimit != 0) { - queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); - } - log.fine(() -> "made QueryPacket: " + queryPacket); - - return queryPacket; - } - - private void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result) { - result.setTotalHitCount(resultPacket.getTotalDocumentCount()); - - addBackendTrace(query, resultPacket); - - // Grouping - if (resultPacket.getGroupData() != null) { - byte[] data = resultPacket.getGroupData(); - ArrayList list = new ArrayList<>(); - BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(ByteBuffer.wrap(data))); - int cnt = buf.getInt(null); - for (int i = 0; i < cnt; i++) { - Grouping g = new Grouping(); - g.deserialize(buf); - list.add(g); - } - GroupingListHit hit = new GroupingListHit(list, searcher.getDocsumDefinitionSet(query)); - hit.setQuery(result.getQuery()); - hit.setSource(getName()); - hit.setQueryPacketData(queryPacketData); - result.hits().add(hit); - } - - if (resultPacket.getCoverageFeature()) { - result.setCoverage(new Coverage(resultPacket.getCoverageDocs(), resultPacket.getActiveDocs(), resultPacket.getNodesReplied()) - .setSoonActive(resultPacket.getSoonActiveDocs()) - .setDegradedReason(resultPacket.getDegradedReason()) - .setNodesTried(resultPacket.getNodesQueried())); - } - } - - private void addBackendTrace(Query query, QueryResultPacket resultPacket) { - if (resultPacket.propsArray == null) return; - Value.ArrayValue traces = new Value.ArrayValue(); - for (FS4Properties properties : resultPacket.propsArray) { - if ( ! properties.getName().equals("trace")) continue; - for (FS4Properties.Entry entry : properties.getEntries()) { - traces.add(new SlimeAdapter(BinaryFormat.decode(entry.getValue()).get())); - } - } - query.trace(traces, query.getTraceLevel()); - } - - /** - * Creates unfilled hits from a List of DocumentInfo instances. - */ - private void addUnfilledHits(List result, List documents, QueryPacketData queryPacketData) { - Optional channelDistributionKey = distributionKey(); - - for (DocumentInfo document : documents) { - byte [] sortData = document.getSortData(); - LeanHit hit = (sortData == null) - ? new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getMetric()) - : new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getSortData()); - if (queryPacketData != null) { - hit.setQueryPacketData(queryPacketData); - } - result.add(hit); - } - } - - @Override - public void release() { - if (channel != null) { - channel.close(); - channel = null; - } - } - - private String getName() { - return searcher.getName(); - } - - @Override - public void responseAvailable(FS4Channel from) { - responseAvailable(); - } - -} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java index f7f2d08d713..244fad4efde 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java @@ -3,7 +3,6 @@ package com.yahoo.prelude.fastsearch; import com.yahoo.data.access.ObjectTraverser; import com.yahoo.document.GlobalId; -import com.yahoo.fs4.QueryPacketData; import com.yahoo.net.URI; import com.yahoo.search.query.Sorting; import com.yahoo.search.result.Hit; @@ -40,9 +39,6 @@ public class FastHit extends Hit { /** The global id of this document in the backend node which produced it */ private byte [] globalId; - //TODO Remove with fs4 - private transient QueryPacketData queryPacketData = null; - private transient byte[] sortData = null; // TODO I supect this one can be dropped. private transient Sorting sortDataSorting = null; @@ -147,27 +143,6 @@ public class FastHit extends Hit { /** Sets the index of the node this hit originated at */ public void setDistributionKey(int distributionKey) { this.distributionKey = distributionKey; } - /** - * Add the binary data common for the query packet to a Vespa backend and a - * summary fetch packet to a Vespa backend. This method can only be called - * once for a single hit. - * - * @param queryPacketData binary data from a query packet resulting in this hit - * @throws IllegalStateException if the method is called more than once - * @throws NullPointerException if trying to set query packet data to null - */ - public void setQueryPacketData(QueryPacketData queryPacketData) { - if (this.queryPacketData != null) - throw new IllegalStateException("Query packet data already set to " - + this.queryPacketData + ", tried to set it to " + queryPacketData); - if (queryPacketData == null) - throw new NullPointerException("Query packet data reference can not be set to null."); - this.queryPacketData = queryPacketData; - } - - /** Returns a serial encoding of the query which produced this hit, ot null if not available. */ - public QueryPacketData getQueryPacketData() { return queryPacketData; } - public void setSortData(byte[] data, Sorting sorting) { this.sortData = data; this.sortDataSorting = sorting; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 6b0041a9e86..b0b3a7800e9 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -1,23 +1,14 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.PingPacket; -import com.yahoo.fs4.PongPacket; -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.fs4.mplex.InvalidChannelException; import com.yahoo.prelude.Ping; import com.yahoo.prelude.Pong; import com.yahoo.prelude.querytransform.QueryRewrite; -import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; @@ -46,21 +37,13 @@ import static com.yahoo.container.util.Util.quote; // catch and unwrap into a results with an error in high level methods. -Jon public class FastSearcher extends VespaBackEndSearcher { - /** If this is turned on this will make search queries directly to the local search node when possible */ - private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct"); - /** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */ private final Dispatcher dispatcher; - private final Backend dispatchBackend; - private final FS4ResourcePool fs4ResourcePool; - /** * Creates a Fastsearcher. * - * @param dispatchBackend The backend object containing the connection to the dispatch node this should talk to - * over the fs4 protocol - * @param fs4ResourcePool the resource pool used to create direct connections to the local search nodes when + * @param serverId the resource pool used to create direct connections to the local search nodes when * bypassing the dispatch node * @param dispatcher the dispatcher used (when enabled) to send summary requests over the rpc protocol. * Eventually we will move everything to this protocol and never use dispatch nodes. @@ -70,13 +53,11 @@ public class FastSearcher extends VespaBackEndSearcher { * @param clusterParams the cluster number, and other cluster backend parameters * @param documentdbInfoConfig document database parameters */ - public FastSearcher(Backend dispatchBackend, FS4ResourcePool fs4ResourcePool, Dispatcher dispatcher, + public FastSearcher(String serverId, Dispatcher dispatcher, SummaryParameters docSumParams, ClusterParams clusterParams, DocumentdbInfoConfig documentdbInfoConfig) { - init(fs4ResourcePool.getServerId(), docSumParams, clusterParams, documentdbInfoConfig); - this.dispatchBackend = dispatchBackend; + init(serverId, docSumParams, clusterParams, documentdbInfoConfig); this.dispatcher = dispatcher; - this.fs4ResourcePool = fs4ResourcePool; } /** @@ -84,58 +65,7 @@ public class FastSearcher extends VespaBackEndSearcher { */ @Override public Pong ping(Ping ping, Execution execution) { - return ping(ping, dispatchBackend, getName()); - } - - public static Pong ping(Ping ping, Backend backend, String name) { - FS4Channel channel = backend.openPingChannel(); - - // If you want to change this code, you need to understand - // com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and - // com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage) - try { - PingPacket pingPacket = new PingPacket(); - try { - boolean couldSend = channel.sendPacket(pingPacket); - if ( ! couldSend) { - return new Pong(ErrorMessage.createBackendCommunicationError("Could not ping " + name)); - } - } catch (InvalidChannelException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel " + name)); - } catch (IllegalStateException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); - } catch (IOException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("IO error while sending ping: " + e.getMessage())); - } - - // We should only get a single packet - BasicPacket[] packets; - - try { - packets = channel.receivePackets(ping.getTimeout(), 1); - } catch (ChannelTimeoutException e) { - return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name)); - } catch (InvalidChannelException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name)); - } - - if (packets.length == 0) { - return new Pong(ErrorMessage.createBackendCommunicationError(name + " got no packets back")); - } - - try { - packets[0].ensureInstanceOf(PongPacket.class, name); - } catch (TimeoutException e) { - return new Pong(ErrorMessage.createTimeout(e.getMessage())); - } catch (IOException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("Unexpected packet class returned after ping: " + e.getMessage())); - } - return new Pong((PongPacket)packets[0]); - } finally { - if (channel != null) { - channel.close(); - } - } + throw new IllegalStateException("This ping should not have been called."); } @Override @@ -217,18 +147,7 @@ public class FastSearcher extends VespaBackEndSearcher { * on the same host. */ private SearchInvoker getSearchInvoker(Query query) { - Optional invoker = dispatcher.getSearchInvoker(query, this); - if (invoker.isPresent()) { - return invoker.get(); - } - - Optional direct = getDirectNode(query); - if(direct.isPresent()) { - var node = direct.get(); - Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); - return new FS4SearchInvoker(this, query, backend.openChannel(), direct); - } - return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty()); + return dispatcher.getSearchInvoker(query, this).get(); } /** @@ -237,47 +156,17 @@ public class FastSearcher extends VespaBackEndSearcher { * content nodes. */ private FillInvoker getFillInvoker(Result result) { - Query query = result.getQuery(); - Optional invoker = dispatcher.getFillInvoker(result, this); - if (invoker.isPresent()) { - return invoker.get(); - } - - Optional direct = getDirectNode(query); - if (direct.isPresent()) { - var node = direct.get(); - Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); - return new FS4FillInvoker(this, query, backend); - } - return new FS4FillInvoker(this, query, dispatchBackend); + return dispatcher.getFillInvoker(result, this).get(); } - /** - * If the query can be directed to a single local content node, returns that node. Otherwise, - * returns an empty value. - */ - private Optional getDirectNode(Query query) { - if (!query.properties().getBoolean(dispatchDirect, true)) - return Optional.empty(); - if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) - return Optional.empty(); - - Optional directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); - if (!directDispatchRecipient.isPresent()) - return Optional.empty(); - // Dispatch directly to the single, local search node - Node local = directDispatchRecipient.get(); - query.trace(false, 2, "Dispatching directly to ", local); - return Optional.of(local); - } private static Optional quotedSummaryClass(String summaryClass) { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } public String toString() { - return "fast searcher (" + getName() + ") " + dispatchBackend; + return "fast searcher (" + getName() + ") "; } protected boolean isLoggingFine() { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java index e0d569c6ae1..740b9592efc 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java @@ -3,7 +3,6 @@ package com.yahoo.prelude.fastsearch; import java.util.List; -import com.yahoo.fs4.QueryPacketData; import com.yahoo.search.result.Hit; import com.yahoo.searchlib.aggregation.Grouping; @@ -27,15 +26,4 @@ public class GroupingListHit extends Hit { private final List groupingList; private final DocsumDefinitionSet defs; - private QueryPacketData queryPacketData; - - public void setQueryPacketData(QueryPacketData queryPacketData) { - this.queryPacketData = queryPacketData; - } - - /** Returns encoded query data from the query used to create this, or null if none present */ - public QueryPacketData getQueryPacketData() { - return queryPacketData; - } - } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java index f690d9d4da4..396a84a28bd 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java @@ -5,9 +5,6 @@ */ package com.yahoo.prelude.fastsearch; - -import java.nio.ByteBuffer; - import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java index a02d9813793..bec39393359 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java @@ -5,9 +5,6 @@ */ package com.yahoo.prelude.fastsearch; - -import java.nio.ByteBuffer; - import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java index bf77c517d50..388c96b453d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java @@ -5,9 +5,6 @@ */ package com.yahoo.prelude.fastsearch; -import java.nio.ByteBuffer; - -import com.yahoo.io.SlowInflate; import com.yahoo.prelude.hitfield.RawData; import com.yahoo.data.access.simple.Value; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java index 5e3d0babe98..b94c902693a 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java @@ -5,9 +5,6 @@ */ package com.yahoo.prelude.fastsearch; - -import java.nio.ByteBuffer; - import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java index 408cbbbb62d..4df12bd82bd 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java @@ -5,10 +5,6 @@ */ package com.yahoo.prelude.fastsearch; - -import java.nio.ByteBuffer; - -import com.yahoo.text.Utf8; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index 430ad015493..8f4b49ac71e 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -3,7 +3,6 @@ package com.yahoo.prelude.fastsearch; import com.yahoo.collections.TinyIdentitySet; import com.yahoo.fs4.DocsumPacket; -import com.yahoo.fs4.Packet; import com.yahoo.prelude.query.Item; import com.yahoo.prelude.query.NullItem; import com.yahoo.prelude.query.textualrepresentation.TextualQueryRepresentation; @@ -20,8 +19,6 @@ import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; import com.yahoo.searchlib.aggregation.Grouping; -import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; @@ -51,7 +48,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { private String defaultDocsumClass = null; /** Returns an iterator which returns all hits below this result **/ - static Iterator hitIterator(Result result) { + private static Iterator hitIterator(Result result) { return result.hits().unorderedDeepIterator(); } @@ -230,7 +227,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if ((query.getTraceLevel() 0) { @@ -344,7 +341,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * @return the number of hits that we did not return data for, and an optional error message. * when things are working normally we return 0. */ - public FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException { + protected FillHitsResult fillHits(Result result, DocsumPacket[] packets, String summaryClass) { int skippedHits = 0; String lastError = null; int packetIndex = 0; @@ -354,8 +351,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if (hit instanceof FastHit && ! hit.isFilled(summaryClass)) { FastHit fastHit = (FastHit) hit; - packets[packetIndex].ensureInstanceOf(DocsumPacket.class, getName()); - DocsumPacket docsum = (DocsumPacket) packets[packetIndex]; + DocsumPacket docsum = packets[packetIndex]; packetIndex++; FillHitResult fr = fillHit(fastHit, docsum, summaryClass); @@ -384,7 +380,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return decodeSummary(summaryClass, hit, docsumdata, db.getDocsumDefinitionSet()); } - private String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) { + private static String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) { String error = docsumSet.lazyDecode(summaryClass, docsumdata, hit); if (error == null) { hit.setFilled(summaryClass); @@ -392,28 +388,6 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return error; } - @SuppressWarnings("rawtypes") - public static VespaBackEndSearcher getSearcher(String s) { - try { - Class c = Class.forName(s); - if (VespaBackEndSearcher.class.isAssignableFrom(c)) { - Constructor[] constructors = c.getConstructors(); - for (Constructor constructor : constructors) { - Class[] parameters = constructor.getParameterTypes(); - if (parameters.length == 0) { - return (VespaBackEndSearcher) constructor.newInstance(); - } - } - throw new RuntimeException("Failed initializing " + s); - - } else { - throw new RuntimeException(s + " is not com.yahoo.prelude.fastsearch.VespaBackEndSearcher"); - } - } catch (Exception e) { - throw new RuntimeException("Failure loading class " + s + ", exception :" + e); - } - } - protected boolean isLoggingFine() { return getLogger().isLoggable(Level.FINE); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java index d768dda2657..00bdc474119 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java @@ -1,8 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * Class converting data (historically XML-encoded) from a document summary field. - * This has only been used to represent geographical positions. - */ + package com.yahoo.prelude.fastsearch; import com.yahoo.data.access.Inspector; @@ -11,6 +8,8 @@ import com.yahoo.prelude.hitfield.XMLString; import com.yahoo.search.result.PositionsData; /** + * Class converting data (historically XML-encoded) from a document summary field. + * This has only been used to represent geographical positions. * @author Steinar Knutsen */ public class XMLField extends DocsumField { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index a5c6f3650e0..58f73ea52cc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -4,8 +4,6 @@ package com.yahoo.search.dispatch; import com.yahoo.component.AbstractComponent; import com.yahoo.container.handler.VipStatus; import com.yahoo.jdisc.Metric; -import com.yahoo.prelude.fastsearch.FS4PingFactory; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; @@ -53,9 +51,6 @@ public class Dispatcher extends AbstractComponent { private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3; - /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ - public static final CompoundName dispatchInternal = CompoundName.fromComponents(DISPATCH, INTERNAL); - /** If enabled, search queries will use protobuf rpc */ public static final CompoundName dispatchProtobuf = CompoundName.fromComponents(DISPATCH, PROTOBUF); @@ -64,7 +59,6 @@ public class Dispatcher extends AbstractComponent { private final LoadBalancer loadBalancer; private final boolean multilevelDispatch; - private final boolean internalDispatchByDefault; private final InvokerFactory invokerFactory; @@ -86,15 +80,13 @@ public class Dispatcher extends AbstractComponent { public static Dispatcher create(String clusterId, DispatchConfig dispatchConfig, - FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus, Metric metric) { var searchCluster = new SearchCluster(clusterId, dispatchConfig, containerClusterSize, vipStatus); var rpcFactory = new RpcInvokerFactory(new RpcResourcePool(dispatchConfig), searchCluster, !dispatchConfig.useFdispatchByDefault()); - var pingFactory = dispatchConfig.useFdispatchByDefault()? new FS4PingFactory(fs4ResourcePool) : rpcFactory; - return new Dispatcher(searchCluster, dispatchConfig, rpcFactory, pingFactory, metric); + return new Dispatcher(searchCluster, dispatchConfig, rpcFactory, rpcFactory, metric); } public Dispatcher(SearchCluster searchCluster, @@ -105,9 +97,8 @@ public class Dispatcher extends AbstractComponent { this.searchCluster = searchCluster; this.loadBalancer = new LoadBalancer(searchCluster, dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); - this.multilevelDispatch = dispatchConfig.useMultilevelDispatch(); - this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault(); this.invokerFactory = invokerFactory; + this.multilevelDispatch = dispatchConfig.useMultilevelDispatch(); this.metric = metric; this.metricContext = metric.createContext(null); @@ -125,15 +116,11 @@ public class Dispatcher extends AbstractComponent { } public Optional getFillInvoker(Result result, VespaBackEndSearcher searcher) { - Optional invoker = invokerFactory.createFillInvoker(searcher, result); - if (invoker.isPresent()) { - return invoker; - } - return Optional.empty(); + return invokerFactory.createFillInvoker(searcher, result); } public Optional getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - if (multilevelDispatch || ! query.properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { + if (multilevelDispatch) { emitDispatchMetric(Optional.empty()); return Optional.empty(); } @@ -148,7 +135,6 @@ public class Dispatcher extends AbstractComponent { query.setOffset(0); } emitDispatchMetric(invoker); - query.properties().set(dispatchInternal, true); return invoker; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java index cd8228624c5..ce4891d7c25 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerResult.java @@ -39,9 +39,6 @@ public class InvokerResult { if (hit.hasSortData()) { fh.setSortData(hit.getSortData(), sorting); } - if (hit.getQueryPacketData() != null) { - fh.setQueryPacketData(hit.getQueryPacketData()); - } fh.setQuery(query); fh.setFillable(); fh.setCached(false); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java b/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java index 63302bee8c1..0749d8476c2 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LeanHit.java @@ -1,6 +1,5 @@ package com.yahoo.search.dispatch; -import com.yahoo.fs4.QueryPacketData; import java.util.Arrays; @@ -10,8 +9,7 @@ public class LeanHit implements Comparable { private final byte [] sortData; private final int partId; private final int distributionKey; - //TODO Remove when FS4 is gone - private QueryPacketData queryPacketData; + public LeanHit(byte [] gid, int partId, int distributionKey, double relevance) { this.gid = gid; this.relevance = Double.isNaN(relevance) ? Double.NEGATIVE_INFINITY : relevance; @@ -33,12 +31,6 @@ public class LeanHit implements Comparable { public int getPartId() { return partId; } public int getDistributionKey() { return distributionKey; } - QueryPacketData getQueryPacketData() { return queryPacketData; } - - public void setQueryPacketData(QueryPacketData queryPacketData) { - this.queryPacketData = queryPacketData; - } - @Override public int compareTo(LeanHit o) { int res = (sortData != null) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 6146751f35f..d9a76965c3e 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -45,15 +45,12 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory { Query query = result.getQuery(); boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query); + boolean useProtoBuf = query.properties().getBoolean(Dispatcher.dispatchProtobuf, dispatchWithProtobuf); + boolean useDispatchDotSummaries = query.properties().getBoolean(dispatchSummaries, false); - if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, dispatchWithProtobuf)) { - return Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery)); - } - if (query.properties().getBoolean(dispatchSummaries, true) && ! summaryNeedsQuery) { - return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))); - } else { - return Optional.empty(); - } + return ((useDispatchDotSummaries || !useProtoBuf) && ! summaryNeedsQuery) + ? Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))) + : Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery)); } // for testing diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java deleted file mode 100644 index dea6f741bb0..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.searchcluster; - -import com.yahoo.prelude.Ping; -import com.yahoo.prelude.Pong; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; -import com.yahoo.prelude.fastsearch.FastSearcher; -import com.yahoo.search.cluster.ClusterMonitor; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.yolean.Exceptions; - -import java.util.concurrent.Callable; - -/** - * @author bratseth - * @author ollivir - */ -public class Pinger implements Callable { - private final Node node; - private final ClusterMonitor clusterMonitor; - private final FS4ResourcePool fs4ResourcePool; - - public Pinger(Node node, ClusterMonitor clusterMonitor, FS4ResourcePool fs4ResourcePool) { - this.node = node; - this.clusterMonitor = clusterMonitor; - this.fs4ResourcePool = fs4ResourcePool; - } - - public Pong call() { - try { - Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), - fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString()); - return pong; - } catch (RuntimeException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " - + Exceptions.toMessageString(e))); - } - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 3657e0b5c76..85cc16ade6f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -185,18 +185,12 @@ public class SearchCluster implements NodeManager { return covered; } - /** - * Returns the nodes of this cluster as an immutable map indexed by host. - * One host may contain multiple nodes (on different ports), so this is a multi-map. - */ - public ImmutableMultimap nodesByHost() { return nodesByHost; } - /** * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), * or empty if we should not dispatch directly. */ public Optional directDispatchTarget() { - if ( ! directDispatchTarget.isPresent()) return Optional.empty(); + if ( directDispatchTarget.isEmpty()) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage Group localSearchGroup = groups.get(directDispatchTarget.get().group()); @@ -229,24 +223,26 @@ public class SearchCluster implements NodeManager { private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { // update VIP status if we direct dispatch to this group and coverage status changed - if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { - if (sufficientCoverage) { - vipStatus.addToRotation(clusterId); - } else { - vipStatus.removeFromRotation(clusterId); - } - } + boolean isInRotation = vipStatus.isInRotation(); + boolean hasChanged = sufficientCoverage != group.hasSufficientCoverage(); + boolean isDirectDispatchGroupAndChange = usesDirectDispatchTo(group) && hasChanged; group.setHasSufficientCoverage(sufficientCoverage); + if ((!isInRotation || isDirectDispatchGroupAndChange) && sufficientCoverage) { + // We will set this cluster in rotation if + // - not already in rotation and one group has sufficient coverage. + vipStatus.addToRotation(clusterId); + } else if (isDirectDispatchGroupAndChange) { + // We will take it out of rotation if the group is mandatory (direct dispatch to this group) + vipStatus.removeFromRotation(clusterId); + } } private boolean usesDirectDispatchTo(Node node) { - if ( ! directDispatchTarget.isPresent()) return false; - return directDispatchTarget.get().equals(node); + return directDispatchTarget.isPresent() && directDispatchTarget.get().equals(node); } private boolean usesDirectDispatchTo(Group group) { - if ( ! directDispatchTarget.isPresent()) return false; - return directDispatchTarget.get().group() == group.id(); + return directDispatchTarget.isPresent() && directDispatchTarget.get().group() == group.id(); } /** Used by the cluster monitor to manage node status */ @@ -300,15 +296,21 @@ public class SearchCluster implements NodeManager { sumOfActiveDocuments += activeDocumentsInGroup[i]; } + boolean anyGroupsSufficientCoverage = false; for (int i = 0; i < numGroups; i++) { Group group = orderedGroups.get(i); long activeDocuments = activeDocumentsInGroup[i]; long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); - boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, - averageDocumentsInOtherGroups); + boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, averageDocumentsInOtherGroups); + anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage; updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(i, group, sufficientCoverage, averageDocumentsInOtherGroups); } + if ( ! anyGroupsSufficientCoverage && (sumOfActiveDocuments == 0)) { + // If no groups have sufficient coverage (0 might be sufficient) + // and there are no documents in any groups, then we are down. + vipStatus.removeFromRotation(clusterId); + } } private boolean isGroupCoverageSufficient(int workingNodes, int nodesInGroup, long activeDocuments, long averageDocumentsInOtherGroups) { @@ -381,7 +383,7 @@ public class SearchCluster implements NodeManager { private void trackGroupCoverageChanges(int index, Group group, boolean fullCoverage, long averageDocuments) { boolean changed = group.isFullCoverageStatusChanged(fullCoverage); - if(changed) { + if (changed) { int requiredNodes = groupSize() - dispatchConfig.maxNodesDownPerGroup(); if (fullCoverage) { log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", index, diff --git a/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java b/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java index cfed5ed00ad..e8f4d566028 100644 --- a/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java +++ b/container-search/src/main/java/com/yahoo/search/grouping/vespa/HitConverter.java @@ -54,8 +54,6 @@ class HitConverter implements ResultBuilder.HitConverter { throw new NullPointerException("Hit has no context."); hit.setSource(hitContext.getSource()); hit.setQuery(hitContext.getQuery()); - if (hitContext.getQueryPacketData() != null) - hit.setQueryPacketData(hitContext.getQueryPacketData()); return hit; } diff --git a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java index c1217f7acc2..4750bac551c 100644 --- a/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java +++ b/container-search/src/main/java/com/yahoo/vespa/streamingvisitors/VdsStreamingSearcher.java @@ -5,7 +5,6 @@ import com.yahoo.document.DocumentId; import com.yahoo.document.select.parser.ParseException; import com.yahoo.document.select.parser.TokenMgrException; import com.yahoo.fs4.DocsumPacket; -import com.yahoo.fs4.Packet; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.routing.Route; import com.yahoo.prelude.Ping; @@ -27,7 +26,6 @@ import com.yahoo.vdslib.SearchResult; import com.yahoo.vdslib.VisitorStatistics; import com.yahoo.vespa.streamingvisitors.tracing.TraceDescription; -import java.io.IOException; import java.math.BigInteger; import java.util.List; import java.util.Map; @@ -42,14 +40,13 @@ import java.util.logging.Logger; * @author baldersheim * @author Ulf Carlin */ -@SuppressWarnings("deprecation") public class VdsStreamingSearcher extends VespaBackEndSearcher { private static final CompoundName streamingUserid=new CompoundName("streaming.userid"); private static final CompoundName streamingGroupname=new CompoundName("streaming.groupname"); private static final CompoundName streamingSelection=new CompoundName("streaming.selection"); - public static final String STREAMING_STATISTICS = "streaming.statistics"; + static final String STREAMING_STATISTICS = "streaming.statistics"; private final VisitorFactory visitorFactory; private final TracingOptions tracingOptions; private static final Logger log = Logger.getLogger(VdsStreamingSearcher.class.getName()); @@ -84,13 +81,12 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { public VdsStreamingSearcher() { this(new VdsVisitorFactory()); } - - public VdsStreamingSearcher(VisitorFactory visitorFactory) { + VdsStreamingSearcher(VisitorFactory visitorFactory) { this.visitorFactory = visitorFactory; tracingOptions = TracingOptions.DEFAULT; } - public VdsStreamingSearcher(VisitorFactory visitorFactory, TracingOptions tracingOptions) { + VdsStreamingSearcher(VisitorFactory visitorFactory, TracingOptions tracingOptions) { this.visitorFactory = visitorFactory; this.tracingOptions = tracingOptions; } @@ -220,7 +216,7 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { query.trace(visitor.getStatistics().toString(), false, 2); query.getContext(true).setProperty(STREAMING_STATISTICS, visitor.getStatistics()); - Packet[] summaryPackets = new Packet [hits.size()]; + DocsumPacket[] summaryPackets = new DocsumPacket [hits.size()]; int index = 0; boolean skippedEarlierResult = false; @@ -256,19 +252,11 @@ public class VdsStreamingSearcher extends VespaBackEndSearcher { result.hits().add(groupHit); } - int skippedHits; - try { - FillHitsResult fillHitsResult = fillHits(result, summaryPackets, query.getPresentation().getSummary()); - skippedHits = fillHitsResult.skippedHits; - if (fillHitsResult.error != null) { - result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); - return result; - } - } catch (TimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); + FillHitsResult fillHitsResult = fillHits(result, summaryPackets, query.getPresentation().getSummary()); + int skippedHits = fillHitsResult.skippedHits; + if (fillHitsResult.error != null) { + result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); return result; - } catch (IOException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError("Error filling hits with summary fields")); } if (skippedHits == 0) { diff --git a/container-search/src/test/java/com/yahoo/fs4/PacketQueryTracerTestCase.java b/container-search/src/test/java/com/yahoo/fs4/PacketQueryTracerTestCase.java deleted file mode 100644 index 20d9b61c177..00000000000 --- a/container-search/src/test/java/com/yahoo/fs4/PacketQueryTracerTestCase.java +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import static org.junit.Assert.*; - -import java.io.IOException; -import java.io.StringWriter; -import java.nio.ByteBuffer; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.fs4.mplex.InvalidChannelException; -import com.yahoo.search.Query; - -/** - * Tests hex dumping of packets. - * - * @author Steinar Knutsen - */ -public class PacketQueryTracerTestCase { - - FS4Channel channel; - BasicPacket packet; - PacketListener tracer; - - static class MockChannel extends FS4Channel { - - @Override - public void setQuery(Query query) { - super.setQuery(query); - } - - @Override - public Query getQuery() { - return super.getQuery(); - } - - @Override - public Integer getChannelId() { - return 1; - } - - @Override - public void close() { - } - - @Override - public boolean sendPacket(BasicPacket packet) { - return true; - } - - @Override - public BasicPacket[] receivePackets(long timeout, int packetCount) { - return null; - } - - @Override - public BasicPacket nextPacket(long timeout) { - return null; - } - - @Override - protected void addPacket(BasicPacket packet) { - } - - @Override - public boolean isValid() { - return true; - } - - @Override - public String toString() { - return "MockChannel"; - } - } - - @Before - public void setUp() throws Exception { - channel = new MockChannel(); - channel.setQuery(new Query("/?query=a&tracelevel=11")); - packet = new PingPacket(); - tracer = new PacketQueryTracer(); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public final void testPacketSent() throws IOException { - byte[] simulatedPacket = new byte[] { 1, 2, 3 }; - tracer.packetReceived(channel, packet, ByteBuffer.wrap(simulatedPacket)); - StringWriter w = new StringWriter(); - channel.getQuery().getContext(false).render(w); - assertTrue(w.getBuffer().toString().indexOf("PingPacket: 010203") != -1); - } - - @Test - public final void testPacketReceived() throws IOException { - byte[] simulatedPacket = new byte[] { 1, 2, 3 }; - tracer.packetReceived(channel, packet, ByteBuffer.wrap(simulatedPacket)); - StringWriter w = new StringWriter(); - channel.getQuery().getContext(false).render(w); - assertTrue(w.getBuffer().toString().indexOf("PingPacket: 010203") != -1); - } - -} diff --git a/container-search/src/test/java/com/yahoo/fs4/mplex/BackendTestCase.java b/container-search/src/test/java/com/yahoo/fs4/mplex/BackendTestCase.java deleted file mode 100644 index 8696ca08d2b..00000000000 --- a/container-search/src/test/java/com/yahoo/fs4/mplex/BackendTestCase.java +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.container.QrConfig; -import com.yahoo.container.search.Fs4Config; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.PingPacket; -import com.yahoo.fs4.QueryPacket; -import com.yahoo.fs4.mplex.Backend.BackendStatistics; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; -import com.yahoo.search.Query; -import org.junit.Test; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.util.logging.Logger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Test networking code for talking to dispatch. - * - * @author Steinar Knutsen - */ -public class BackendTestCase { - private static final long TIMEOUT = 30000; - - public static class MockDispatch implements Runnable { - - public final ServerSocket socket; - public volatile Socket connection; - volatile int channelId; - - public byte[] packetData = new byte[] {0,0,0,104, - 0,0,0,217-256, - 0,0,0,1, - 0,0,0,0, - 0,0,0,2, - 0,0,0,0,0,0,0,5, - 0x40,0x39,0,0,0,0,0,0, - 0,0,0,111, - 0,0,0,97, - 0,0,0,3, 1,1,1,1,1,1,1,1,1,1,1,1, 0x40,0x37,0,0,0,0,0,23, 0,0,0,7, 0,0,0,36, - 0,0,0,4, 2,2,2,2,2,2,2,2,2,2,2,2, 0x40,0x35,0,0,0,0,0,21, 0,0,0,8, 0,0,0,37}; - - MockDispatch(ServerSocket socket) { - this.socket = socket; - } - - @Override - public void run() { - try { - connection = socket.accept(); - } catch (IOException e) { - e.printStackTrace(); - return; - } - requestRespond(); - } - - void requestRespond() { - byte[] length = new byte[4]; - try { - connection.getInputStream().read(length); - } catch (IOException e) { - e.printStackTrace(); - return; - } - int actual = ByteBuffer.wrap(length).getInt(); - - int read = 0; - int i = 0; - while (read != -1 && i < actual) { - try { - read = connection.getInputStream().read(); - ++i; - } catch (IOException e) { - e.printStackTrace(); - return; - } - } - ByteBuffer reply = ByteBuffer.wrap(packetData); - if (channelId != -1) { - reply.putInt(8, channelId); - } - try { - connection.getOutputStream().write(packetData); - } catch (IOException e) { - e.printStackTrace(); - } - } - public void setNoChannel() { - channelId = -1; - } - - } - - public static class MockServer { - public InetSocketAddress host; - public Thread worker; - public MockDispatch dispatch; - - public MockServer() throws IOException { - ServerSocket socket = new ServerSocket(0, 50, InetAddress.getLoopbackAddress()); - host = (InetSocketAddress) socket.getLocalSocketAddress(); - dispatch = new MockDispatch(socket); - worker = new Thread(dispatch); - worker.start(); - } - - } - - private Backend backend; - private MockServer server; - private Logger logger; - private boolean initUseParent; - private FS4ResourcePool listeners; - - public static final byte[] PONG = new byte[] { 0, 0, 0, 32, 0, 0, 0, 221 - 256, - 0,0,0,1, 0, 0, 0, 42, 0, 0, 0, 127, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, - 0, 0, 1 }; - - public void setUp() throws Exception { - logger = Logger.getLogger(Backend.class.getName()); - initUseParent = logger.getUseParentHandlers(); - logger.setUseParentHandlers(false); - listeners = new FS4ResourcePool(new Fs4Config(new Fs4Config.Builder()), new QrConfig(new QrConfig.Builder())); - - server = new MockServer(); - backend = listeners.getBackend(server.host.getHostString(), server.host.getPort()); - } - - public void tearDown() throws Exception { - listeners.deconstruct(); - server.dispatch.socket.close(); - if (server.dispatch.connection !=null) server.dispatch.connection.close(); - if (server.worker!=null) server.worker.join(); - if (logger !=null) logger.setUseParentHandlers(initUseParent); - } - - private BasicPacket [] read(FS4Channel channel) throws InvalidChannelException { - try { - return channel.receivePackets(TIMEOUT, 1); - } catch (ChannelTimeoutException e) { - fail("Could not get packets from simulated backend."); - } - return new BasicPacket[1]; - } - - @Test - public void testAll() throws Exception { - setUp(); - doTestBackend(); - tearDown(); - - setUp(); - doTestPinging(); - tearDown(); - - setUp(); - doRequireStatistics(); - tearDown(); - } - - private void doTestBackend() throws IOException, InvalidChannelException { - FS4Channel channel = backend.openChannel(); - Query q = new Query("/?query=a"); - int channelId = channel.getChannelId(); - server.dispatch.channelId = channelId; - - assertTrue(backend.sendPacket(QueryPacket.create("container.0", q), channelId)); - BasicPacket[] b = read(channel); - assertEquals(1, b.length); - assertEquals(217, b[0].getCode()); - channel.close(); - } - - private void doTestPinging() throws IOException, InvalidChannelException { - FS4Channel channel = backend.openPingChannel(); - server.dispatch.setNoChannel(); - server.dispatch.packetData = PONG; - - assertTrue(channel.sendPacket(new PingPacket())); - BasicPacket[] b = read(channel); - assertEquals(1, b.length); - assertEquals(221, b[0].getCode()); - channel.close(); - } - - private void doRequireStatistics() throws IOException, InvalidChannelException { - FS4Channel channel = backend.openPingChannel(); - server.dispatch.channelId = -1; - server.dispatch.packetData = PONG; - - assertTrue(channel.sendPacket(new PingPacket())); - read(channel); - BackendStatistics stats = backend.getStatistics(); - assertEquals(1, stats.totalConnections()); - } - -} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/GetDocSumsPacketTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/GetDocSumsPacketTestCase.java deleted file mode 100644 index 527a2736fee..00000000000 --- a/container-search/src/test/java/com/yahoo/fs4/test/GetDocSumsPacketTestCase.java +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.test; - -import com.yahoo.fs4.BufferTooSmallException; -import com.yahoo.fs4.GetDocSumsPacket; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.query.SessionId; -import com.yahoo.search.result.Hit; -import org.junit.Test; - -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -/** - * Tests the GetDocsumsPacket - * - * @author Bjorn Borud - */ -public class GetDocSumsPacketTestCase { - - private static final byte IGNORE = 69; - - @Test - public void testDefaultDocsumClass() { - Query query = new Query("/?query=chain"); - assertNull(query.getPresentation().getSummary()); - } - - @Test - public void testEncodingWithQuery() throws BufferTooSmallException { - Hit hit = new FastHit(); - assertPacket(true, hit, new byte[] {0, 0, 0, 53, 0, 0, 0, -37, 0, 0, 8, 21, 0, 0, 0, 0, IGNORE, IGNORE, IGNORE, - IGNORE, 7, 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 0x03, 0, 0, 0, 7, - 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 1, 0, 0, 0, 6, 4, 0, 3, 102, 111, 111}); - } - - @Test - public void testEncodingWithoutQuery() throws BufferTooSmallException { - Hit hit = new FastHit(); - assertPacket(false, hit, new byte[] { 0, 0, 0, 39, 0, 0, 0, -37, 0, 0, 8, 17, 0, 0, 0, 0, IGNORE, IGNORE, IGNORE, - IGNORE, 7, 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 0x03, 0, 0, 0, 7, 100, 101, 102, 97, 117, 108, 116 - }); - } - - @Test - public void requireThatSessionIdIsEncodedAsPropertyWhenUsingSearchSession() throws BufferTooSmallException { - Result result = new Result(new Query("?query=foo&groupingSessionCache=false")); - SessionId sessionId = result.getQuery().getSessionId("node-0"); - result.getQuery().getRanking().setQueryCache(true); - FastHit hit = new FastHit(); - result.hits().add(hit); - ByteBuffer answer = ByteBuffer.allocate(1024); - //assertEquals(0, sessionId.asUtf8String().getByteLength()); - answer.put(new byte[] { 0, 0, 0, (byte)(103+sessionId.asUtf8String().getByteLength()), 0, 0, 0, -37, 0, 0, 24, 17, 0, 0, 0, 0, - // query timeout - IGNORE, IGNORE, IGNORE, IGNORE, - // "default" - rank profile - 7, 'd', 'e', 'f', 'a', 'u', 'l', 't', 0, 0, 0, 0x03, - // "default" - summaryclass - 0, 0, 0, 7, 'd', 'e', 'f', 'a', 'u', 'l', 't', - // 2 property entries - 0, 0, 0, 2, - // rank: sessionId => qrserver.0.XXXXXXXXXXXXX.0 - 0, 0, 0, 4, 'r', 'a', 'n', 'k', 0, 0, 0, 1, 0, 0, 0, 9, 's', 'e', 's', 's', 'i', 'o', 'n', 'I', 'd'}); - answer.putInt(sessionId.asUtf8String().getByteLength()); - answer.put(sessionId.asUtf8String().getBytes()); - answer.put(new byte [] { - // caches: features => true - 0, 0, 0, 6, 'c', 'a', 'c', 'h', 'e', 's', - 0, 0, 0, 1, 0, 0, 0, 5, 'q', 'u', 'e', 'r', 'y', 0, 0, 0, 4, 't', 'r', 'u', 'e'}); - byte [] expected = new byte [answer.position()]; - answer.flip(); - answer.get(expected); - assertPacket(false, result, expected); - } - - private static void assertPacket(boolean sendQuery, Hit hit, byte[] expected) throws BufferTooSmallException { - Result result = new Result(new Query("?query=foo")); - result.hits().add(hit); - assertPacket(sendQuery, result, expected); - } - - private static void assertPacket(boolean sendQuery, Result result, byte[] expected) throws BufferTooSmallException { - GetDocSumsPacket packet = GetDocSumsPacket.create(result, "default", sendQuery); - ByteBuffer buf = ByteBuffer.allocate(1024); - packet.encode(buf); - buf.flip(); - - byte[] actual = new byte[buf.remaining()]; - buf.get(actual); - // assertEquals(Arrays.toString(expected), Arrays.toString(actual)); - - assertEquals("Equal length", expected.length, actual.length); - for (int i = 0; i < expected.length; ++i) { - if (expected[i] == IGNORE) { - actual[i] = IGNORE; - } - } - - assertArrayEquals(expected, actual); - } -} diff --git a/container-search/src/test/java/com/yahoo/fs4/test/HexByteIteratorTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/HexByteIteratorTestCase.java deleted file mode 100644 index 022feb09b2e..00000000000 --- a/container-search/src/test/java/com/yahoo/fs4/test/HexByteIteratorTestCase.java +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.test; - -import java.nio.ByteBuffer; - -import com.yahoo.fs4.HexByteIterator; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test of HexByteIterator - * - * @author Tony Vaagenes - */ -public class HexByteIteratorTestCase { - - @Test - public void testHexByteIterator() { - int[] numbers = { 0x00, 0x01, 0xDE, 0xAD, 0xBE, 0xEF, 0xFF }; - - HexByteIterator i = new HexByteIterator( - ByteBuffer.wrap(toBytes(numbers))); - - assertEquals("00", i.next()); - assertEquals("01", i.next()); - assertEquals("DE", i.next()); - assertEquals("AD", i.next()); - assertEquals("BE", i.next()); - assertEquals("EF", i.next()); - assertEquals("FF", i.next()); - assertTrue(!i.hasNext()); - } - - private byte[] toBytes(int[] ints) { - byte[] bytes = new byte[ints.length]; - for (int i=0; i entries) { - RankProperties properties = createRankPropertiesWithTensors(entries); - assertEquals(entries.size(), properties.asMap().size()); - - Map decodedProperties = decode(type, encode(properties)); - assertEquals(entries.size(), properties.asMap().size()); - assertEquals(entries.size(), decodedProperties.size()); - for (Entry entry : entries) { - assertEquals(entry.tensor, decodedProperties.get(entry.normalizedKey)); - } - } - - private static void assertTensorEncodingAndDecoding(TensorType type, String key, String normalizedKey, Tensor tensor) { - assertTensorEncodingAndDecoding(type, Arrays.asList(new Entry(key, normalizedKey, tensor))); - } - - private static RankProperties createRankPropertiesWithTensors(List entries) { - RankFeatures features = new RankFeatures(); - for (Entry entry : entries) { - features.put(entry.key, entry.tensor); - } - RankProperties properties = new RankProperties(); - features.prepare(properties); - return properties; - } - - private static byte[] encode(RankProperties properties) { - ByteBuffer buffer = ByteBuffer.allocate(512); - properties.encode(buffer, true); - byte[] result = new byte[buffer.position()]; - buffer.rewind(); - buffer.get(result); - return result; - } - - private static Map decode(TensorType type, byte[] encodedProperties) { - GrowableByteBuffer buffer = GrowableByteBuffer.wrap(encodedProperties); - byte[] mapNameBytes = new byte[buffer.getInt()]; - buffer.get(mapNameBytes); - int numEntries = buffer.getInt(); - Map result = new HashMap<>(); - for (int i = 0; i < numEntries; ++i) { - byte[] keyBytes = new byte[buffer.getInt()]; - buffer.get(keyBytes); - String key = Utf8.toString(keyBytes); - byte[] value = new byte[buffer.getInt()]; - buffer.get(value); - if (key.contains(".type")) { - result.put(key, Utf8.toString(value)); - } else { - result.put(key, TypedBinaryFormat.decode(Optional.of(type), GrowableByteBuffer.wrap(value))); - } - } - return result; - } - -} diff --git a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java index 86553a86add..9ea7276583b 100644 --- a/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/cluster/ClusterSearcherTestCase.java @@ -7,8 +7,6 @@ import com.yahoo.container.QrConfig; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.handler.VipStatus; import com.yahoo.container.protect.Error; -import com.yahoo.container.search.Fs4Config; -import com.yahoo.fs4.QueryPacket; import com.yahoo.prelude.IndexFacts; import com.yahoo.prelude.IndexModel; import com.yahoo.prelude.SearchDefinition; @@ -48,12 +46,12 @@ import static org.junit.Assert.assertTrue; * @author bratseth */ public class ClusterSearcherTestCase { + private static final double DELTA = 0.0000000000000001; @Test public void testNoBackends() { ClusterSearcher cluster = new ClusterSearcher(new LinkedHashSet<>(Arrays.asList("dummy"))); try { - cluster.getMonitor().getConfiguration().setRequestTimeout(100); Execution execution = new Execution(cluster, Execution.Context.createContextStub()); Query query = new Query("query=hello"); query.setHits(10); @@ -146,7 +144,7 @@ public class ClusterSearcherTestCase { private final String type3 = "type3"; private final Map> results = new LinkedHashMap<>(); private final boolean expectAttributePrefetch; - public static final String ATTRIBUTE_PREFETCH = "attributeprefetch"; + static final String ATTRIBUTE_PREFETCH = "attributeprefetch"; private String getId(String type, int i) { return "id:ns:" + type + "::" + i; @@ -196,7 +194,7 @@ public class ClusterSearcherTestCase { createHit(getId(type3, 2), 5))); } - public MyMockSearcher(boolean expectAttributePrefetch) { + MyMockSearcher(boolean expectAttributePrefetch) { this.expectAttributePrefetch = expectAttributePrefetch; init(); } @@ -263,8 +261,7 @@ public class ClusterSearcherTestCase { } private Execution createExecution(List docTypesList, boolean expectAttributePrefetch) { - Set documentTypes = new LinkedHashSet<>(); - documentTypes.addAll(docTypesList); + Set documentTypes = new LinkedHashSet<>(docTypesList); ClusterSearcher cluster = new ClusterSearcher(documentTypes); try { cluster.addBackendSearcher(new MyMockSearcher( @@ -277,6 +274,7 @@ public class ClusterSearcherTestCase { } } + @Test public void testThatSingleDocumentTypeCanBeSearched() { { // Explicit 1 type in restrict set Execution execution = createExecution(); @@ -285,9 +283,9 @@ public class ClusterSearcherTestCase { assertEquals(3, result.getTotalHitCount()); List hits = result.hits().asList(); assertEquals(3, hits.size()); - assertEquals(9.0, hits.get(0).getRelevance().getScore()); - assertEquals(6.0, hits.get(1).getRelevance().getScore()); - assertEquals(3.0, hits.get(2).getRelevance().getScore()); + assertEquals(9.0, hits.get(0).getRelevance().getScore(), DELTA); + assertEquals(6.0, hits.get(1).getRelevance().getScore(), DELTA); + assertEquals(3.0, hits.get(2).getRelevance().getScore(), DELTA); } { // Only 1 registered type in cluster searcher, empty restrict set // NB ! Empty restrict sets does not exist below the cluster searcher. @@ -299,10 +297,11 @@ public class ClusterSearcherTestCase { assertEquals(3, result.getTotalHitCount()); List hits = result.hits().asList(); assertEquals(3, hits.size()); - assertEquals(9.0, hits.get(0).getRelevance().getScore()); + assertEquals(9.0, hits.get(0).getRelevance().getScore(), DELTA); } } + @Test public void testThatSubsetOfDocumentTypesCanBeSearched() { Execution execution = createExecution(); Query query = new Query("?query=hello&restrict=type1,type3"); @@ -311,14 +310,15 @@ public class ClusterSearcherTestCase { assertEquals(6, result.getTotalHitCount()); List hits = result.hits().asList(); assertEquals(6, hits.size()); - assertEquals(11.0, hits.get(0).getRelevance().getScore()); - assertEquals(9.0, hits.get(1).getRelevance().getScore()); - assertEquals(8.0, hits.get(2).getRelevance().getScore()); - assertEquals(6.0, hits.get(3).getRelevance().getScore()); - assertEquals(5.0, hits.get(4).getRelevance().getScore()); - assertEquals(3.0, hits.get(5).getRelevance().getScore()); + assertEquals(11.0, hits.get(0).getRelevance().getScore(), DELTA); + assertEquals(9.0, hits.get(1).getRelevance().getScore(), DELTA); + assertEquals(8.0, hits.get(2).getRelevance().getScore(), DELTA); + assertEquals(6.0, hits.get(3).getRelevance().getScore(), DELTA); + assertEquals(5.0, hits.get(4).getRelevance().getScore(), DELTA); + assertEquals(3.0, hits.get(5).getRelevance().getScore(), DELTA); } + @Test public void testThatMultipleDocumentTypesCanBeSearchedAndFilled() { Execution execution = createExecution(); Query query = new Query("?query=hello"); @@ -327,15 +327,15 @@ public class ClusterSearcherTestCase { assertEquals(9, result.getTotalHitCount()); List hits = result.hits().asList(); assertEquals(9, hits.size()); - assertEquals(11.0, hits.get(0).getRelevance().getScore()); - assertEquals(10.0, hits.get(1).getRelevance().getScore()); - assertEquals(9.0, hits.get(2).getRelevance().getScore()); - assertEquals(8.0, hits.get(3).getRelevance().getScore()); - assertEquals(7.0, hits.get(4).getRelevance().getScore()); - assertEquals(6.0, hits.get(5).getRelevance().getScore()); - assertEquals(5.0, hits.get(6).getRelevance().getScore()); - assertEquals(4.0, hits.get(7).getRelevance().getScore()); - assertEquals(3.0, hits.get(8).getRelevance().getScore()); + assertEquals(11.0, hits.get(0).getRelevance().getScore(), DELTA); + assertEquals(10.0, hits.get(1).getRelevance().getScore(), DELTA); + assertEquals(9.0, hits.get(2).getRelevance().getScore(), DELTA); + assertEquals(8.0, hits.get(3).getRelevance().getScore(), DELTA); + assertEquals(7.0, hits.get(4).getRelevance().getScore(), DELTA); + assertEquals(6.0, hits.get(5).getRelevance().getScore(), DELTA); + assertEquals(5.0, hits.get(6).getRelevance().getScore(), DELTA); + assertEquals(4.0, hits.get(7).getRelevance().getScore(), DELTA); + assertEquals(3.0, hits.get(8).getRelevance().getScore(), DELTA); for (int i = 0; i < 9; ++i) { assertNull(hits.get(i).getField("score")); } @@ -390,7 +390,7 @@ public class ClusterSearcherTestCase { assertResult(9, Arrays.asList(5.0, 4.0), getResult(6, 2, ex)); assertResult(9, Arrays.asList(4.0, 3.0), getResult(7, 2, ex)); assertResult(9, Arrays.asList(3.0), getResult(8, 2, ex)); - assertResult(9, new ArrayList(), getResult(9, 2, ex)); + assertResult(9, new ArrayList<>(), getResult(9, 2, ex)); assertResult(9, Arrays.asList(11.0, 10.0, 9.0, 8.0, 7.0), getResult(0, 5, ex)); assertResult(9, Arrays.asList(6.0, 5.0, 4.0, 3.0), getResult(5, 5, ex)); @@ -425,11 +425,7 @@ public class ClusterSearcherTestCase { final String yahoo = "www.yahoo.com"; try { - if (null != InetAddress.getByName(yahoo)) { - canFindYahoo = true; - } else { - canFindYahoo = false; - } + canFindYahoo = (null != InetAddress.getByName(yahoo)); } catch (Exception e) { canFindYahoo = false; } @@ -542,12 +538,11 @@ public class ClusterSearcherTestCase { qrSearchersConfig.build(), clusterConfig.build(), documentDbConfig.build(), - new QrMonitorConfig.Builder().build(), new DispatchConfig.Builder().build(), createClusterInfoConfig(), Statistics.nullImplementation, new MockMetric(), - new FS4ResourcePool(new Fs4Config.Builder().build(), new QrConfig.Builder().build()), + new FS4ResourcePool(new QrConfig.Builder().build()), new VipStatus()); } @@ -590,7 +585,7 @@ public class ClusterSearcherTestCase { @Test public void testThatQueryTimeoutIsCappedWithSpecifiedMax() { - QueryTimeoutFixture f = new QueryTimeoutFixture(Double.valueOf(70), null); + QueryTimeoutFixture f = new QueryTimeoutFixture(70.0, null); f.query.setTimeout(70001); f.search(); assertEquals(70000, f.query.getTimeout()); @@ -616,7 +611,7 @@ public class ClusterSearcherTestCase { @Test public void testThatQueryCacheIsDisabledIfTimeoutIsLargerThanConfiguredMax() { - QueryTimeoutFixture f = new QueryTimeoutFixture(null, Double.valueOf(5)); + QueryTimeoutFixture f = new QueryTimeoutFixture(null, 5.0); f.query.setTimeout(5001); f.query.getRanking().setQueryCache(true); f.search(); diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/FS4SearchInvokerTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/FS4SearchInvokerTestCase.java deleted file mode 100644 index b9119528490..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/FS4SearchInvokerTestCase.java +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.prelude.fastsearch; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.fs4.mplex.InvalidChannelException; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.InterleavedSearchInvoker; -import com.yahoo.search.dispatch.MockSearchCluster; -import com.yahoo.search.dispatch.ResponseMonitor; -import com.yahoo.search.searchchain.Execution; -import org.hamcrest.Matchers; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.Optional; - -import static org.junit.Assert.assertThat; - -public class FS4SearchInvokerTestCase { - @SuppressWarnings("resource") - @Test - public void testThatConnectionErrorsAreReportedImmediately() throws IOException { - var query = new Query("?"); - query.setTimeout(1000); - - var searcher = mockSearcher(); - var cluster = new MockSearchCluster("?", 1, 1); - var fs4invoker = new FS4SearchInvoker(searcher, query, mockFailingChannel(), Optional.empty()); - var interleave = new InterleavedSearchInvoker(Collections.singleton(fs4invoker), cluster, null); - - long start = System.currentTimeMillis(); - interleave.search(query, null); - long elapsed = System.currentTimeMillis() - start; - - assertThat("Connection error should fail fast", elapsed, Matchers.lessThan(500L)); - } - - private static VespaBackEndSearcher mockSearcher() { - return new VespaBackEndSearcher() { - @Override - protected Result doSearch2(Query query, Execution execution) { - return null; - } - - @Override - protected void doPartialFill(Result result, String summaryClass) {} - }; - } - - private static FS4Channel mockFailingChannel() { - return new FS4Channel() { - @Override - public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { - // pretend there's a connection error - return false; - } - - @Override - public void setQuery(Query q) {} - - @Override - public void setResponseMonitor(ResponseMonitor monitor) {} - - @Override - public void close() {} - }; - } -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/DirectSearchTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/DirectSearchTestCase.java deleted file mode 100644 index b0662a93f62..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/DirectSearchTestCase.java +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test; - -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.search.Result; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * Tests that FastSearcher will bypass dispatch when the conditions are right - * - * @author bratseth - */ -public class DirectSearchTestCase { - - @Test - public void testDirectSearchEnabled() { - FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); - tester.search("?query=test&dispatch.direct=true"); - assertEquals("The FastSearcher has used the local search node connection", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - - @Test - public void testDirectSearchDisabled() { - FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); - tester.search("?query=test&dispatch.direct=false&dispatch.internal=false"); - assertEquals(0, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - - @Test - public void testDirectSearchEnabledByDefault() { - FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); - tester.search("?query=test"); - assertEquals("The FastSearcher has used the local search node connection", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - - @Test - public void testNoDirectSearchWhenMoreSearchNodesThanContainers() { - FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:1"); - tester.search("?query=test&dispatch.direct=true&dispatch.internal=false"); - assertEquals(0, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - - @Test - public void testDirectSearchWhenMultipleGroupsAndEnoughContainers() { - FastSearcherTester tester = new FastSearcherTester(2, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:1"); - tester.search("?query=test&dispatch.direct=true"); - assertEquals(1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - - @Test - public void testDirectSearchSummaryFetchGoToLocalNode() { - FastSearcherTester tester = new FastSearcherTester(2, "otherhost:9999:1", FastSearcherTester.selfHostname + ":9999:0"); - int localDistributionKey = tester.dispatcher().searchCluster().nodesByHost().get(FastSearcherTester.selfHostname).asList().get(0).key(); - assertEquals(1, localDistributionKey); - Result result = tester.search("?query=test&dispatch.direct=true"); - assertEquals(1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - FastHit hit = (FastHit)result.hits().get(0); - assertEquals(localDistributionKey, hit.getDistributionKey()); - } - - @Test - public void testNoDirectSearchWhenMultipleNodesPerGroup() { - FastSearcherTester tester = new FastSearcherTester(2, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:0"); - tester.search("?query=test&dispatch.direct=true&dispatch.internal=false"); - assertEquals(0, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - - @Test - public void testNoDirectSearchWhenLocalNodeIsDown() { - FastSearcherTester tester = new FastSearcherTester(2, FastSearcherTester.selfHostname + ":9999:0", "otherhost:9999:1"); - assertTrue(tester.vipStatus().isInRotation()); - tester.setResponding(FastSearcherTester.selfHostname, false); - assertFalse(tester.vipStatus().isInRotation()); - assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.search("?query=test&dispatch.direct=true&nocache"); - assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.setResponding(FastSearcherTester.selfHostname, true); - assertTrue(tester.vipStatus().isInRotation()); - assertEquals("2 ping requests, 0 search request", 2, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.search("?query=test&dispatch.direct=true&nocache"); - assertEquals("2 ping requests, 1 search request", 3, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - - @Test - public void testNoDirectDispatchWhenInsufficientCoverage() { - FastSearcherTester tester = new FastSearcherTester(3, - FastSearcherTester.selfHostname + ":9999:0", - "host1:9999:1", - "host2:9999:2"); - double k = 38.78955; // multiply all document counts by some number > 1 to test that we compute % correctly - - tester.setActiveDocuments(FastSearcherTester.selfHostname, (long) (96 * k)); - tester.setActiveDocuments("host1", (long) (100 * k)); - tester.setActiveDocuments("host2", (long) (100 * k)); - assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.search("?query=test&dispatch.direct=true&nocache"); - assertEquals("Still 1 ping request, 0 search requests because the default coverage is 97%, and we only have 96% locally", - 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.waitForInRotationIs(false); - - tester.setActiveDocuments(FastSearcherTester.selfHostname, (long) (99 * k)); - assertEquals("2 ping request, 0 search requests", 2, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.search("?query=test&dispatch.direct=true&nocache"); - assertEquals("2 ping request, 1 search requests because we now have 99% locally", - 3, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.waitForInRotationIs(true); - - tester.setActiveDocuments("host1", (long) (104 * k)); - assertEquals("2 ping request, 1 search requests", 3, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.search("?query=test&dispatch.direct=true&nocache"); - assertEquals("2 ping request, 2 search requests because 99/((104+100)/2) > 0.97", - 4, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.waitForInRotationIs(true); - - tester.setActiveDocuments("host2", (long) (102 * k)); - assertEquals("2 ping request, 2 search requests", 4, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.search("?query=test&dispatch.direct=true&nocache"); - assertEquals("Still 2 ping request, 2 search requests because 99/((104+102)/2) < 0.97", - 4, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.waitForInRotationIs(false); - } - - @Test - public void testCoverageWithSingleGroup() { - FastSearcherTester tester = new FastSearcherTester(1, FastSearcherTester.selfHostname + ":9999:0"); - - tester.setActiveDocuments(FastSearcherTester.selfHostname, 100); - assertEquals("1 ping request, 0 search requests", 1, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - tester.search("?query=test&dispatch.direct=true&nocache"); - assertEquals("1 ping request, 1 search requests", 2, tester.requestCount(FastSearcherTester.selfHostname, 9999)); - } - -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index c6e87170f07..eb4d65693bb 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -3,64 +3,34 @@ package com.yahoo.prelude.fastsearch.test; import com.google.common.collect.ImmutableList; import com.yahoo.component.chain.Chain; -import com.yahoo.config.subscription.ConfigGetter; -import com.yahoo.container.QrConfig; -import com.yahoo.container.handler.VipStatus; import com.yahoo.container.protect.Error; -import com.yahoo.container.search.Fs4Config; -import com.yahoo.document.GlobalId; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.Packet; -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.fs4.mplex.BackendTestCase; -import com.yahoo.fs4.test.QueryTestCase; import com.yahoo.language.simple.SimpleLinguistics; -import com.yahoo.prelude.Ping; -import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; -import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.FastSearcher; import com.yahoo.prelude.fastsearch.SummaryParameters; -import com.yahoo.prelude.fastsearch.test.fs4mock.MockBackend; -import com.yahoo.prelude.fastsearch.test.fs4mock.MockFS4ResourcePool; -import com.yahoo.prelude.fastsearch.test.fs4mock.MockFSChannel; -import com.yahoo.processing.execution.Execution.Trace; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.Searcher; -import com.yahoo.search.dispatch.rpc.MockRpcResourcePoolBuilder; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.AllOperation; import com.yahoo.search.grouping.request.EachOperation; import com.yahoo.search.grouping.request.GroupingOperation; -import com.yahoo.search.query.SessionId; import com.yahoo.search.rendering.RendererRegistry; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; -import com.yahoo.yolean.trace.TraceNode; -import com.yahoo.yolean.trace.TraceVisitor; import org.junit.Test; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; -import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; + /** * Tests the Fast searcher @@ -70,30 +40,12 @@ import static org.junit.Assert.assertTrue; public class FastSearcherTestCase { private final static DocumentdbInfoConfig documentdbInfoConfig = new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder()); - private MockBackend mockBackend; - @Test - public void testNoNormalizing() { - Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); - FastSearcher fastSearcher = new FastSearcher(new MockBackend(), - new FS4ResourcePool("container.0", 1), - MockDispatcher.create(Collections.emptyList()), - new SummaryParameters(null), - new ClusterParams("testhittype"), - documentdbInfoConfig); - - MockFSChannel.setEmptyDocsums(false); - - Result result = doSearch(fastSearcher, new Query("?query=ignored"), 0, 10); - - assertTrue(result.hits().get(0).getRelevance().getScore() > 1000); - } @Test public void testNullQuery() { Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); - FastSearcher fastSearcher = new FastSearcher(new MockBackend(), - new FS4ResourcePool("container.0", 1), + FastSearcher fastSearcher = new FastSearcher("container.0", MockDispatcher.create(Collections.emptyList()), new SummaryParameters(null), new ClusterParams("testhittype"), @@ -109,152 +61,6 @@ public class FastSearcherTestCase { assertEquals(Error.NULL_QUERY.code, message.getCode()); } - @Test - public void testDispatchDotSummaries() { - Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); - DocumentdbInfoConfig documentdbConfigWithOneDb = - new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder().documentdb(new DocumentdbInfoConfig.Documentdb.Builder() - .name("testDb") - .summaryclass(new DocumentdbInfoConfig.Documentdb.Summaryclass.Builder().name("simple").id(7)) - .rankprofile(new DocumentdbInfoConfig.Documentdb.Rankprofile.Builder() - .name("simpler").hasRankFeatures(false).hasSummaryFeatures(false)))); - - List nodes = new ArrayList<>(); - nodes.add(new Node(0, "host1", 5000, 0)); - nodes.add(new Node(1, "host2", 5000, 0)); - - var mockFs4ResourcePool = new MockFS4ResourcePool(); - var mockRpcResourcePool = new MockRpcResourcePoolBuilder().connection(0).connection(1).build(); - - FastSearcher fastSearcher = new FastSearcher(new MockBackend(), - mockFs4ResourcePool, - MockDispatcher.create(nodes, mockFs4ResourcePool, mockRpcResourcePool, 1, new VipStatus()), - new SummaryParameters(null), - new ClusterParams("testhittype"), - documentdbConfigWithOneDb); - - { // No direct.summaries - String query = "?query=sddocname:a&summary=simple&timeout=20s"; - Result result = doSearch(fastSearcher, new Query(query), 0, 10); - doFill(fastSearcher, result); - ErrorMessage error = result.hits().getError(); - assertNull("Since we don't route to the dispatcher we hit the mock backend, so no error", error); - } - - { // direct.summaries due to query cache - String query = "?query=sddocname:a&ranking.queryCache&timeout=20s"; - Result result = doSearch(fastSearcher, new Query(query), 0, 10); - doFill(fastSearcher, result); - ErrorMessage error = result.hits().getError(); - assertEquals("Since we don't actually run summary backends we get this error when the Dispatcher is used", - "getDocsums(..) attempted for node X", error.getDetailedMessage().replaceAll("\\d", "X")); - } - - { // direct.summaries due to no summary features - String query = "?query=sddocname:a&dispatch.summaries&summary=simple&ranking=simpler&timeout=20s"; - Result result = doSearch(fastSearcher, new Query(query), 0, 10); - doFill(fastSearcher, result); - ErrorMessage error = result.hits().getError(); - assertEquals("Since we don't actually run summary backends we get this error when the Dispatcher is used", - "getDocsums(..) attempted for node X", error.getDetailedMessage().replaceAll("\\d", "X")); - } - } - - @Test - public void testQueryWithRestrict() { - mockBackend = new MockBackend(); - DocumentdbInfoConfig documentdbConfigWithOneDb = - new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder().documentdb(new DocumentdbInfoConfig.Documentdb.Builder().name("testDb"))); - FastSearcher fastSearcher = new FastSearcher(mockBackend, - new FS4ResourcePool("container.0", 1), - MockDispatcher.create(Collections.emptyList()), - new SummaryParameters(null), - new ClusterParams("testhittype"), - documentdbConfigWithOneDb); - - Query query = new Query("?query=foo&model.restrict=testDb&groupingSessionCache=false"); - query.prepare(); - doSearch(fastSearcher, query, 0, 10); - - Packet receivedPacket = mockBackend.getChannel().getLastQueryPacket(); - byte[] encoded = QueryTestCase.packetToBytes(receivedPacket); - byte[] correct = new byte[] { - 0, 0, 0, 100, 0, 0, 0, -38, 0, 0, 0, 0, 0, 16, 0, 6, 0, 10, - QueryTestCase.ignored, QueryTestCase.ignored, QueryTestCase.ignored, QueryTestCase.ignored, // time left - 0, 0, 0x40, 0x03, 7, 100, 101, 102, 97, 117, 108, 116, 0, 0, 0, 1, 0, 0, 0, 5, 109, 97, 116, 99, 104, 0, 0, 0, 1, 0, 0, 0, 24, 100, 111, 99, 117, 109, 101, 110, 116, 100, 98, 46, 115, 101, 97, 114, 99, 104, 100, 111, 99, 116, 121, 112, 101, 0, 0, 0, 6, 116, 101, 115, 116, 68, 98, 0, 0, 0, 1, 0, 0, 0, 7, 68, 1, 0, 3, 102, 111, 111 - }; - QueryTestCase.assertEqualArrays(correct, encoded); - } - - @Test - public void testSearch() { - FastSearcher fastSearcher = createFastSearcher(); - - Result result = doSearch(fastSearcher, new Query("?query=ignored"), 0, 10); - - Execution execution = new Execution(chainedAsSearchChain(fastSearcher), Execution.Context.createContextStub()); - assertEquals(2, result.getHitCount()); - execution.fill(result); - assertCorrectHit1((FastHit)result.hits().get(0)); - assertCorrectTypes1((FastHit)result.hits().get(0)); - for (int idx = 0; idx < result.getHitCount(); idx++) { - assertTrue(!result.hits().get(idx).isCached()); - } - - // Repeat the request a couple of times, to verify whether the packet cache works - result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 10); - assertEquals(2, result.getHitCount()); - execution.fill(result); - assertCorrectHit1((FastHit) result.hits().get(0)); - for (int i = 0; i < result.getHitCount(); i++) { - assertFalse(result.hits().get(i) + " should never be cached", - result.hits().get(i).isCached()); - } - - result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 10); - assertEquals(2, result.getHitCount()); - execution.fill(result); - assertCorrectHit1((FastHit) result.hits().get(0)); - assertTrue("All hits are not cached", !result.isCached()); - for (int i = 0; i < result.getHitCount(); i++) { - assertTrue(!result.hits().get(i).isCached()); - } - - // Test that partial result sets can be retrieved from the cache - result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 1); - assertEquals(1, result.getConcreteHitCount()); - execution.fill(result); - - result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 2); - assertEquals(2, result.getConcreteHitCount()); - execution.fill(result); - // No hit should be cached - assertFalse(result.hits().get(0).isCached()); - assertFalse(result.hits().get(1).isCached()); - - // Still nothing cached - result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 2); - assertEquals(2, result.getConcreteHitCount()); - execution.fill(result); - // both first and second should now be cached - assertFalse(result.hits().get(0).isCached()); - assertFalse(result.hits().get(1).isCached()); - - // Tests that the cache _hit_ is not returned if _another_ - // hit is requested - - result = doSearch(fastSearcher,new Query("?query=ignored"), 0, 1); - assertEquals(1, result.getConcreteHitCount()); - - result = doSearch(fastSearcher,new Query("?query=ignored"), 1, 1); - assertEquals(1, result.getConcreteHitCount()); - - for (int i = 0; i < result.getHitCount(); i++) { - assertFalse("Hit " + i + " should not be cached.", - result.hits().get(i).isCached()); - } - } - private Chain chainedAsSearchChain(Searcher topOfChain) { List searchers = new ArrayList<>(); searchers.add(topOfChain); @@ -272,79 +78,9 @@ public class FastSearcherTestCase { return new Execution(chainedAsSearchChain(searcher), context); } - private void doFill(Searcher searcher, Result result) { - createExecution(searcher).fill(result); - } - - @Test - public void testThatPropertiesAreReencoded() throws Exception { - FastSearcher fastSearcher = createFastSearcher(); - - Query query = new Query("?query=ignored&dispatch.summaries=false&groupingSessionCache=false"); - query.getRanking().setQueryCache(true); - Result result = doSearch(fastSearcher, query, 0, 10); - - Execution execution = new Execution(chainedAsSearchChain(fastSearcher), Execution.Context.createContextStub()); - assertEquals(2, result.getHitCount()); - execution.fill(result); - - BasicPacket receivedPacket = mockBackend.getChannel().getLastReceived(); - ByteBuffer buf = ByteBuffer.allocate(1000); - receivedPacket.encode(buf); - buf.flip(); - byte[] actual = new byte[buf.remaining()]; - buf.get(actual); - - SessionId sessionId = query.getSessionId(); - byte IGNORE = 69; - ByteBuffer answer = ByteBuffer.allocate(1024); - answer.put(new byte[] { 0, 0, 0, (byte)(141+sessionId.asUtf8String().getByteLength()), 0, 0, 0, -37, 0, 0, 16, 17, 0, 0, 0, 0, - // query timeout - IGNORE, IGNORE, IGNORE, IGNORE, - // "default" - rank profile - 7, 'd', 'e', 'f', 'a', 'u', 'l', 't', 0, 0, 0, 0x03, - // 3 property entries (rank, match, caches) - 0, 0, 0, 3, - // rank: sessionId => qrserver.0.XXXXXXXXXXXXX.0 - 0, 0, 0, 4, 'r', 'a', 'n', 'k', 0, 0, 0, 1, 0, 0, 0, 9, 's', 'e', 's', 's', 'i', 'o', 'n', 'I', 'd'}); - answer.putInt(sessionId.asUtf8String().getBytes().length); - answer.put(sessionId.asUtf8String().getBytes()); - answer.put(new byte [] { - // match: documentdb.searchdoctype => test - 0, 0, 0, 5, 'm', 'a', 't', 'c', 'h', 0, 0, 0, 1, 0, 0, 0, 24, 'd', 'o', 'c', 'u', 'm', 'e', 'n', 't', 'd', 'b', '.', 's', 'e', 'a', 'r', 'c', 'h', 'd', 'o', 'c', 't', 'y', 'p', 'e', 0, 0, 0, 4, 't', 'e', 's', 't', - // sessionId => qrserver.0.XXXXXXXXXXXXX.0 - 0, 0, 0, 6, 'c', 'a', 'c', 'h', 'e', 's', 0, 0, 0, 1, 0, 0, 0, 5, 'q', 'u', 'e', 'r', 'y', 0, 0, 0, 4, 't', 'r', 'u', 'e'}); - byte [] expected = new byte [answer.position()]; - answer.flip(); - answer.get(expected); - - for (int i = 0; i < expected.length; ++i) { - if (expected[i] == IGNORE) { - actual[i] = IGNORE; - } - } - assertArrayEquals(expected, actual); - } - - private FastSearcher createFastSearcher() { - mockBackend = new MockBackend(); - ConfigGetter getter = new ConfigGetter<>(DocumentdbInfoConfig.class); - DocumentdbInfoConfig config = getter.getConfig("file:src/test/java/com/yahoo/prelude/fastsearch/test/documentdb-info.cfg"); - - MockFSChannel.resetDocstamp(); - Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); - return new FastSearcher(mockBackend, - new FS4ResourcePool("container.0", 1), - MockDispatcher.create(Collections.emptyList()), - new SummaryParameters(null), - new ClusterParams("testhittype"), - config); - } - @Test public void testSinglePassGroupingIsForcedWithSingleNodeGroups() { - FastSearcher fastSearcher = new FastSearcher(new MockBackend(), - new FS4ResourcePool("container.0", 1), + FastSearcher fastSearcher = new FastSearcher("container.0", MockDispatcher.create(Collections.singletonList(new Node(0, "host0", 123, 0))), new SummaryParameters(null), new ClusterParams("testhittype"), @@ -368,8 +104,7 @@ public class FastSearcherTestCase { public void testSinglePassGroupingIsNotForcedWithSingleNodeGroups() { MockDispatcher dispatcher = MockDispatcher.create(ImmutableList.of(new Node(0, "host0", 123, 0), new Node(2, "host1", 123, 0))); - FastSearcher fastSearcher = new FastSearcher(new MockBackend(), - new FS4ResourcePool("container.0", 1), + FastSearcher fastSearcher = new FastSearcher("container.0", dispatcher, new SummaryParameters(null), new ClusterParams("testhittype"), @@ -401,95 +136,4 @@ public class FastSearcherTestCase { assertForceSinglePassIs(expected, child); } - @Test - public void testPing() throws IOException, InterruptedException { - Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); - BackendTestCase.MockServer server = new BackendTestCase.MockServer(); - FS4ResourcePool listeners = new FS4ResourcePool(new Fs4Config(new Fs4Config.Builder()), new QrConfig(new QrConfig.Builder())); - Backend backend = listeners.getBackend(server.host.getHostString(),server.host.getPort()); - FastSearcher fastSearcher = new FastSearcher(backend, - new FS4ResourcePool("container.0", 1), - MockDispatcher.create(Collections.emptyList()), - new SummaryParameters(null), - new ClusterParams("testhittype"), - documentdbInfoConfig); - server.dispatch.packetData = BackendTestCase.PONG; - server.dispatch.setNoChannel(); - Chain chain = new Chain<>(fastSearcher); - Execution e = new Execution(chain, Execution.Context.createContextStub()); - Pong pong = e.ping(new Ping()); - backend.shutdown(); - server.dispatch.socket.close(); - server.dispatch.connection.close(); - server.worker.join(); - pong.setPingInfo("blbl"); - assertEquals("Result of pinging using blbl", pong.toString()); - } - - private void assertCorrectTypes1(FastHit hit) { - assertEquals(String.class, hit.getField("TITLE").getClass()); - assertEquals(Integer.class, hit.getField("BYTES").getClass()); - } - - private void assertCorrectHit1(FastHit hit) { - assertEquals( - "StudyOfMadonna.com - Interviews, Articles, Reviews, Quotes, Essays and more..", - hit.getField("TITLE")); - assertEquals("352", hit.getField("WORDS").toString()); - assertEquals(2003., hit.getRelevance().getScore(), 0.01d); - assertEquals("index:testhittype/234/" + asHexString(hit.getGlobalId()), hit.getId().toString()); - assertEquals("9190", hit.getField("BYTES").toString()); - assertEquals("testhittype", hit.getSource()); - } - - private static String asHexString(GlobalId gid) { - StringBuilder sb = new StringBuilder(); - byte[] rawGid = gid.getRawId(); - for (byte b : rawGid) { - String hex = Integer.toHexString(0xFF & b); - if (hex.length() == 1) - sb.append('0'); - sb.append(hex); - } - return sb.toString(); - } - - @Test - public void null_summary_is_included_in_trace() { - String summary = null; - assertThat(getTraceString(summary), containsString("summary=[null]")); - } - - @Test - public void non_null_summary_is_included_in_trace() { - String summary = "all"; - assertThat(getTraceString(summary), containsString("summary='all'")); - } - - private String getTraceString(String summary) { - FastSearcher fastSearcher = createFastSearcher(); - - Query query = new Query("?query=ignored"); - query.getPresentation().setSummary(summary); - query.setTraceLevel(2); - - Result result = doSearch(fastSearcher, query, 0, 10); - doFill(fastSearcher, result); - - Trace trace = query.getContext(false).getTrace(); - final AtomicReference fillTraceString = new AtomicReference<>(); - - - trace.traceNode().accept(new TraceVisitor() { - @Override - public void visit(TraceNode traceNode) { - if (traceNode.payload() instanceof String && traceNode.payload().toString().contains("fill to dispatch")) - fillTraceString.set((String) traceNode.payload()); - - } - }); - - return fillTraceString.get(); - } - } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java deleted file mode 100644 index 6eab16045c2..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test; - -import com.google.common.util.concurrent.MoreExecutors; -import com.yahoo.container.QrSearchersConfig; -import com.yahoo.container.handler.VipStatus; -import com.yahoo.net.HostName; -import com.yahoo.prelude.fastsearch.ClusterParams; -import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; -import com.yahoo.prelude.fastsearch.FastSearcher; -import com.yahoo.prelude.fastsearch.SummaryParameters; -import com.yahoo.prelude.fastsearch.test.fs4mock.MockBackend; -import com.yahoo.prelude.fastsearch.test.fs4mock.MockFS4ResourcePool; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.rpc.MockRpcResourcePoolBuilder; -import com.yahoo.search.dispatch.rpc.RpcResourcePool; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.searchchain.Execution; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * @author bratseth - */ -class FastSearcherTester { - - public static final String selfHostname = HostName.getLocalhost(); - - private final MockFS4ResourcePool mockFS4ResourcePool; - private final RpcResourcePool mockRpcResourcePool; - private final FastSearcher fastSearcher; - private final MockDispatcher mockDispatcher; - private final VipStatus vipStatus; - - public FastSearcherTester(int containerClusterSize, Node searchNode) { - this(containerClusterSize, Collections.singletonList(searchNode)); - } - - public FastSearcherTester(int containerClusterSize, String... hostAndPortAndGroupStrings) { - this(containerClusterSize, toNodes(hostAndPortAndGroupStrings)); - } - - public FastSearcherTester(int containerClusterSize, List searchNodes) { - String clusterId = "a"; - - var b = new QrSearchersConfig.Builder(); - var searchClusterB = new QrSearchersConfig.Searchcluster.Builder(); - searchClusterB.name(clusterId); - b.searchcluster(searchClusterB); - vipStatus = new VipStatus(b.build()); - - mockFS4ResourcePool = new MockFS4ResourcePool(); - var builder = new MockRpcResourcePoolBuilder(); - searchNodes.forEach(node -> builder.connection(node.key())); - mockRpcResourcePool = builder.build(); - mockDispatcher = MockDispatcher.create(searchNodes, mockFS4ResourcePool, mockRpcResourcePool, containerClusterSize, vipStatus); - fastSearcher = new FastSearcher(new MockBackend(selfHostname, 0L, true), - mockFS4ResourcePool, - mockDispatcher, - new SummaryParameters(null), - new ClusterParams("testhittype"), - new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder())); - } - - private static List toNodes(String... hostAndPortAndGroupStrings) { - List nodes = new ArrayList<>(); - int key = 0; - for (String s : hostAndPortAndGroupStrings) { - String[] parts = s.split(":"); - nodes.add(new Node(key++, parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]))); - } - return nodes; - } - - public Result search(String query) { - Result result = fastSearcher.search(new Query(query), new Execution(Execution.Context.createContextStub())); - assertEquals(null, result.hits().getError()); - return result; - } - - /** Returns the number of times a backend for this hostname and port has been requested */ - public int requestCount(String hostname, int port) { - return mockFS4ResourcePool.requestCount(hostname, port); - } - - public MockDispatcher dispatcher() { return mockDispatcher; } - - /** Sets the response status of a node and ping it to update the monitor status */ - public void setResponding(String hostname, boolean responding) { - // Start/stop returning a failing backend - mockFS4ResourcePool.setResponding(hostname, responding); - - // Make the search cluster monitor notice right now in this thread - Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); - mockDispatcher.searchCluster().ping(node, MoreExecutors.directExecutor()); - } - - /** Sets the response status of a node and ping it to update the monitor status */ - public void setActiveDocuments(String hostname, long activeDocuments) { - mockFS4ResourcePool.setActiveDocuments(hostname, activeDocuments); - - // Make the search cluster monitor notice right now in this thread - Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); - mockDispatcher.searchCluster().ping(node, MoreExecutors.directExecutor()); - mockDispatcher.searchCluster().pingIterationCompleted(); - } - - public VipStatus vipStatus() { return vipStatus; } - - /** Retrying is needed because earlier pings from the monitoring thread may interfere with the testing thread */ - public void waitForInRotationIs(boolean expectedRotationStatus) { - int triesLeft = 9000; - while (vipStatus.isInRotation() != expectedRotationStatus && triesLeft > 0) { - triesLeft--; - try { Thread.sleep(10); } catch (InterruptedException e) {} - } - if (triesLeft == 0) - fail("Did not reach VIP in rotation status = " + expectedRotationStatus + " after trying for 90 seconds"); - } - -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java index ccb265b799b..afb9cf6f571 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java @@ -2,8 +2,6 @@ package com.yahoo.prelude.fastsearch.test; import com.yahoo.container.handler.VipStatus; -import com.yahoo.prelude.fastsearch.FS4PingFactory; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -15,23 +13,24 @@ import java.util.List; class MockDispatcher extends Dispatcher { public static MockDispatcher create(List nodes) { - var fs4ResourcePool = new FS4ResourcePool("container.0", 1); var rpcResourcePool = new RpcResourcePool(toDispatchConfig(nodes)); - return create(nodes, fs4ResourcePool, rpcResourcePool, 1, new VipStatus()); + return create(nodes, rpcResourcePool, 1, new VipStatus()); } - public static MockDispatcher create(List nodes, FS4ResourcePool fs4ResourcePool, RpcResourcePool rpcResourcePool, + public static MockDispatcher create(List nodes, RpcResourcePool rpcResourcePool, int containerClusterSize, VipStatus vipStatus) { var dispatchConfig = toDispatchConfig(nodes); var searchCluster = new SearchCluster("a", dispatchConfig, containerClusterSize, vipStatus); - return new MockDispatcher(searchCluster, dispatchConfig, fs4ResourcePool, rpcResourcePool); + return new MockDispatcher(searchCluster, dispatchConfig, rpcResourcePool); } - private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, - RpcResourcePool rpcResourcePool) { - super(searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig.dispatchWithProtobuf()), - new FS4PingFactory(fs4ResourcePool), new MockMetric()); + private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcResourcePool rpcResourcePool) { + this(searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig.dispatchWithProtobuf())); + } + + private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcInvokerFactory invokerFactory) { + super(searchCluster, dispatchConfig, invokerFactory, invokerFactory, new MockMetric()); } private static DispatchConfig toDispatchConfig(List nodes) { diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/DispatchThread.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/DispatchThread.java deleted file mode 100644 index d09e8856ee7..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/DispatchThread.java +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- -// -// -package com.yahoo.prelude.fastsearch.test.fs4mock; - - -import com.yahoo.prelude.ConfigurationException; - - -/** - * Thread-wrapper for MockFDispatch - * - * @author Bjorn Borud - */ -public class DispatchThread extends Thread { - int listenPort; - long replyDelay; - long byteDelay; - MockFDispatch dispatch; - Object barrier = new Object(); - - /** - * Instantiate MockFDispatch; if the wanted port is taken we - * bump the port number. Note that the delays are not - * accurate: in reality they will be significantly longer for - * low values. - * - * @param listenPort Wanted port number, note that this may be - * bumped if someone is already running something - * on this port, so it is a starting point for - * scanning only - * @param replyDelay how many milliseconds we should delay when - * replying - * @param byteDelay how many milliseconds we delay for each byte - * written - */ - - public DispatchThread(int listenPort, long replyDelay, long byteDelay) { - this.listenPort = listenPort; - this.replyDelay = replyDelay; - this.byteDelay = byteDelay; - dispatch = new MockFDispatch(listenPort, replyDelay, byteDelay); - dispatch.setBarrier(barrier); - } - - /** - * Run the MockFDispatch and anticipate multiple instances of - * same running. - */ - public void run() { - int maxTries = 20; - // the following section is here to make sure that this - // test is somewhat robust, ie. if someone is already - // listening to the port in question, we'd like to NOT - // fail, but keep probing until we find a port we can use. - boolean up = false; - - while ((!up) && (maxTries-- != 0)) { - try { - dispatch.run(); - up = true; - } catch (ConfigurationException e) { - listenPort++; - dispatch.setListenPort(listenPort); - } - } - } - - /** - * Wait until MockFDispatch is ready to accept connections - * or we time out and indicate which of the two outcomes it was. - * - * @return If we time out we return false. Else we - * return true - * - */ - public boolean waitOnBarrier(long timeout) throws InterruptedException { - long start = System.currentTimeMillis(); - - synchronized (barrier) { - barrier.wait(timeout); - } - long diff = System.currentTimeMillis() - start; - - return (diff < timeout); - } - - /** - * Return the port on which the MockFDispatch actually listens. - * use this instead of assuming where it is since, if more than - * one application tries to use the port we've assigned to it - * we might have to up the port number. - * - * @return port number of active MockFDispatch instance - * - */ - public int listenPort() { - return listenPort; - } -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockBackend.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockBackend.java deleted file mode 100644 index d3fbf8f3645..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockBackend.java +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test.fs4mock; - -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.fs4.mplex.FS4Channel; - -/** - * @author bratseth - */ -public class MockBackend extends Backend { - - private String hostname; - private final long activeDocumentsInBackend; - private final boolean working; - - /** Created lazily as we want to have just one but it depends on the channel */ - private MockFSChannel channel = null; - - public MockBackend() { - this("", 0L, true); - } - - public MockBackend(String hostname, long activeDocumentsInBackend, boolean working) { - super(); - this.hostname = hostname; - this.activeDocumentsInBackend = activeDocumentsInBackend; - this.working = working; - } - - @Override - public FS4Channel openChannel() { - if (channel == null) - channel = working ? new MockFSChannel(activeDocumentsInBackend, this) - : new NonWorkingMockFSChannel(this); - return channel; - } - - @Override - public FS4Channel openPingChannel() { return openChannel(); } - - @Override - public String getHost() { return hostname; } - - /** Returns the channel in use or null if no channel has been used yet */ - public MockFSChannel getChannel() { return channel; } - - public void shutdown() {} - - @Override - public boolean probeConnection() { - return working; - } -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java deleted file mode 100644 index 6956f288d1a..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFDispatch.java +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test.fs4mock; - - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.HashSet; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.yahoo.prelude.ConfigurationException; -import com.yahoo.prelude.fastsearch.test.DocsumDefinitionTestCase; - - -/** - * A server which replies to any query with the same query result after - * a configurable delay, with a configurable slowness (delay between each byte). - * Connections are never timed out. - * - * @author bratseth - */ -public class MockFDispatch { - - private static int connectionCount = 0; - - private static Logger log = Logger.getLogger(MockFDispatch.class.getName()); - - /** The port we accept incoming requests at */ - private int listenPort = 0; - - private long replyDelay; - - private long byteDelay; - - private Object barrier; - - private static byte[] queryResultPacketData = new byte[] { - 0, 0, 0, 64, 0, 0, - 0, 214 - 256, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 5, 0, 0, 0, - 25, 0, 0, 0, 111, 0, 0, 0, 97, 0, 0, 0, 3, 0, 0, 0, 23, 0, 0, 0, 7, 0, 0, - 0, 36, 0, 0, 0, 4, 0, 0, 0, 21, 0, 0, 0, 8, 0, 0, 0, 37}; - - private static byte[] docsumData = DocsumDefinitionTestCase.makeDocsum(); - - private static byte[] docsumHeadPacketData = new byte[] { - 0, 0, 3, 39, 0, 0, - 0, 205 - 256, 0, 0, 0, 1, 0, 0, 0, 0}; - - private static byte[] eolPacketData = new byte[] { - 0, 0, 0, 8, 0, 0, 0, - 200 - 256, 0, 0, 0, 1 }; - - private Set connectionThreads = new HashSet<>(); - - public MockFDispatch(int listenPort, long replyDelay, long byteDelay) { - this.replyDelay = replyDelay; - this.byteDelay = byteDelay; - this.listenPort = listenPort; - } - - public void setBarrier(Object barrier) { - this.barrier = barrier; - } - - public void setListenPort(int listenPort) { - this.listenPort = listenPort; - } - - public void run() { - try { - ServerSocketChannel channel = createServerSocket(listenPort); - - channel.socket().setReuseAddress(true); - while (!Thread.currentThread().isInterrupted()) { - try { - // notify those waiting at the barrier that they - // can now proceed and talk to us - synchronized (barrier) { - if (barrier != null) { - barrier.notify(); - } - } - SocketChannel socketChannel = channel.accept(); - - connectionThreads.add(new ConnectionThread(socketChannel)); - } catch (ClosedByInterruptException e) {// We'll exit - } catch (ClosedChannelException e) { - return; - } catch (Exception e) { - log.log(Level.WARNING, "Unexpected error reading request", e); - } - } - channel.close(); - } catch (IOException e) { - throw new ConfigurationException("Socket channel failure", e); - } - } - - private ServerSocketChannel createServerSocket(int listenPort) - throws IOException { - ServerSocketChannel channel = ServerSocketChannel.open(); - ServerSocket socket = channel.socket(); - - socket.bind( - new InetSocketAddress(InetAddress.getLocalHost(), listenPort)); - String host = socket.getInetAddress().getHostName(); - - log.fine("Accepting dfispatch requests at " + host + ":" + listenPort); - return channel; - } - - public static void main(String[] args) { - log.setLevel(Level.FINE); - MockFDispatch m = new MockFDispatch(7890, Integer.parseInt(args[0]), - Integer.parseInt(args[1])); - - m.run(); - } - - private class ConnectionThread extends Thread { - - private ByteBuffer writeBuffer = ByteBuffer.allocate(2000); - - private ByteBuffer readBuffer = ByteBuffer.allocate(2000); - - private int connectionNr = 0; - - private SocketChannel channel; - - public ConnectionThread(SocketChannel channel) { - this.channel = channel; - fillBuffer(writeBuffer); - start(); - } - - private void fillBuffer(ByteBuffer buffer) { - buffer.clear(); - buffer.put(queryResultPacketData); - buffer.put(docsumHeadPacketData); - buffer.put(docsumData); - buffer.put(docsumHeadPacketData); - buffer.put(docsumData); - buffer.put(eolPacketData); - } - - public void run() { - connectionNr = connectionCount++; - log.fine("Opened connection " + connectionNr); - - try { - long lastRequest = System.currentTimeMillis(); - - while ((System.currentTimeMillis() - lastRequest) <= 5000 - && (!isInterrupted())) { - readBuffer.clear(); - channel.read(readBuffer); - lastRequest = System.currentTimeMillis(); - delay(replyDelay); - - if (byteDelay > 0) { - writeSlow(writeBuffer); - } else { - write(writeBuffer); - } - log.fine( - "Replied in " - + (System.currentTimeMillis() - lastRequest) - + " ms"); - } - - log.fine("Closing timed out connection " + connectionNr); - connectionCount--; - channel.close(); - } catch (IOException e) {} - } - - private void write(ByteBuffer writeBuffer) throws IOException { - writeBuffer.flip(); - channel.write(writeBuffer); - } - - private void writeSlow(ByteBuffer writeBuffer) throws IOException { - writeBuffer.flip(); - int dataSize = writeBuffer.limit(); - - for (int i = 0; i < dataSize; i++) { - writeBuffer.position(i); - writeBuffer.limit(i + 1); - channel.write(writeBuffer); - delay(byteDelay); - } - writeBuffer.limit(dataSize); - } - - private void delay(long delay) { - - try { - Thread.sleep(delay); - } catch (InterruptedException e) {} - } - - } - -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java deleted file mode 100644 index 0d756cbeff3..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test.fs4mock; - -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * @author bratseth - */ -public class MockFS4ResourcePool extends FS4ResourcePool { - - private final Map requestsPerBackend = new HashMap<>(); - private final Set nonRespondingBackends = new HashSet<>(); - private final Map activeDocumentsInBackend = new HashMap<>(); - private final long testingThreadId; - - public MockFS4ResourcePool() { - super("container.0", 1); - this.testingThreadId = Thread.currentThread().getId(); - } - - @Override - public Backend getBackend(String hostname, int port) { - countRequest(hostname + ":" + port); - if (nonRespondingBackends.contains(hostname)) - return new MockBackend(hostname, 0L, false); - else - return new MockBackend(hostname, activeDocumentsInBackend.getOrDefault(hostname, 0L), true); - } - - /** - * Returns the number of times a backend for this hostname and port has been requested - * from the thread creating this - */ - public int requestCount(String hostname, int port) { - return requestsPerBackend.getOrDefault(hostname + ":" + port, 0); - } - - /** Sets the number of active documents the given host will report to have in ping responses */ - public void setActiveDocuments(String hostname, long activeDocuments) { - activeDocumentsInBackend.put(hostname, activeDocuments); - } - - private void countRequest(String hostAndPort) { - // ignore requests from the ping thread to avoid timing issues - if (Thread.currentThread().getId() != testingThreadId) return; - - requestsPerBackend.put(hostAndPort, requestsPerBackend.getOrDefault(hostAndPort, 0) + 1); - } - - public void setResponding(String hostname, boolean responding) { - if (responding) - nonRespondingBackends.remove(hostname); - else - nonRespondingBackends.add(hostname); - } - -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFSChannel.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFSChannel.java deleted file mode 100644 index db14a2894db..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFSChannel.java +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test.fs4mock; - -import com.yahoo.document.GlobalId; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.BufferTooSmallException; -import com.yahoo.fs4.DocumentInfo; -import com.yahoo.fs4.EolPacket; -import com.yahoo.fs4.GetDocSumsPacket; -import com.yahoo.fs4.Packet; -import com.yahoo.fs4.PacketDecoder; -import com.yahoo.fs4.PingPacket; -import com.yahoo.fs4.PongPacket; -import com.yahoo.fs4.QueryPacket; -import com.yahoo.fs4.QueryResultPacket; -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.prelude.fastsearch.test.DocsumDefinitionTestCase; - -import java.nio.ByteBuffer; -import java.util.List; - -/** - * A channel which returns hardcoded packets of the same type as fdispatch - */ -public class MockFSChannel extends FS4Channel { - - /** The number of active documents this should report in ping reponses */ - private final long activeDocuments; - - MockFSChannel(Backend backend) { - this(0, backend); - } - - MockFSChannel(long activeDocuments, Backend backend) { - super(backend, 0); - this.activeDocuments = activeDocuments; - } - - private BasicPacket lastReceived = null; - - private static QueryPacket lastQueryPacket = null; - - /** Initial value of docstamp */ - private static int docstamp = 1088490666; - - private static boolean emptyDocsums = false; - - @Override - public synchronized boolean sendPacket(BasicPacket packet) { - try { - if (packet instanceof Packet) - packet.encode(ByteBuffer.allocate(65536), 0); - } catch (BufferTooSmallException e) { - throw new RuntimeException("Too small buffer to encode packet in mock backend."); - } - - if (packet instanceof QueryPacket) - lastQueryPacket = (QueryPacket) packet; - - lastReceived = packet; - notifyMonitor(); - return true; - } - - /** Change docstamp to invalidate cache */ - public static void resetDocstamp() { - docstamp = 1088490666; - } - - /** Flip sending (in)valid docsums */ - public static void setEmptyDocsums(boolean d) { - emptyDocsums = d; - } - - /** Returns the last query packet received or null if none */ - public QueryPacket getLastQueryPacket() { - return lastQueryPacket; - } - - public BasicPacket getLastReceived() { - return lastReceived; - } - - public BasicPacket[] receivePackets(long timeout, int packetCount) { - List packets = new java.util.ArrayList<>(); - - if (lastReceived instanceof QueryPacket) { - lastQueryPacket = (QueryPacket) lastReceived; - QueryResultPacket result = QueryResultPacket.create(); - - result.setDocstamp(docstamp); - result.setChannel(0); - result.setTotalDocumentCount(2); - result.setOffset(lastQueryPacket.getOffset()); - - if (lastQueryPacket.getOffset() == 0 - && lastQueryPacket.getLastOffset() >= 1) { - result.addDocument( - new DocumentInfo(DocsumDefinitionTestCase.createGlobalId(123), - 2003, 234, 0)); - } - if (lastQueryPacket.getOffset() <= 1 - && lastQueryPacket.getLastOffset() >= 2) { - result.addDocument( - new DocumentInfo(DocsumDefinitionTestCase.createGlobalId(456), - 1855, 234, 1)); - } - packets.add(result); - } - else if (lastReceived instanceof GetDocSumsPacket) { - addDocsums(packets, lastQueryPacket); - } - else if (lastReceived instanceof PingPacket) { - packets.add(new PongPacket(activeDocuments)); - } - while (packetCount >= 0 && packets.size() > packetCount) { - packets.remove(packets.size() - 1); - } - - return packets.toArray(new BasicPacket[packets.size()]); - } - - /** Adds the number of docsums requested in queryPacket.getHits() */ - private void addDocsums(List packets, QueryPacket queryPacket) { - int numHits = queryPacket.getHits(); - - if (lastReceived instanceof GetDocSumsPacket) { - numHits = ((GetDocSumsPacket) lastReceived).getNumDocsums(); - } - for (int i = 0; i < numHits; i++) { - ByteBuffer buffer; - - if (emptyDocsums) { - buffer = createEmptyDocsumPacketData(); - } else { - int[] docids = { - 123, 456, 789, 789, 789, 789, 789, 789, 789, - 789, 789, 789 }; - - buffer = createDocsumPacketData(docids[i], DocsumDefinitionTestCase.makeDocsum()); - } - buffer.position(0); - packets.add(PacketDecoder.decode(buffer)); - } - packets.add(EolPacket.create()); - } - - private ByteBuffer createEmptyDocsumPacketData() { - ByteBuffer buffer = ByteBuffer.allocate(16); - - buffer.limit(buffer.capacity()); - buffer.position(0); - buffer.putInt(12); // length - buffer.putInt(205); // a code for docsumpacket - buffer.putInt(0); // channel - buffer.putInt(0); // dummy location - return buffer; - } - - private ByteBuffer createDocsumPacketData(int docid, byte[] docsumData) { - ByteBuffer buffer = ByteBuffer.allocate(docsumData.length + 4 + 8 + GlobalId.LENGTH); - - buffer.limit(buffer.capacity()); - buffer.position(0); - buffer.putInt(docsumData.length + 8 + GlobalId.LENGTH); - buffer.putInt(205); // Docsum packet code - buffer.putInt(0); - byte[] rawGid = DocsumDefinitionTestCase.createGlobalId(docid).getRawId(); - buffer.put(rawGid); - buffer.put(docsumData); - return buffer; - } - - public void close() {} -} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/NonWorkingMockFSChannel.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/NonWorkingMockFSChannel.java deleted file mode 100644 index c7425afd611..00000000000 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/NonWorkingMockFSChannel.java +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test.fs4mock; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.mplex.Backend; - -/** - * @author bratseth - */ -public class NonWorkingMockFSChannel extends MockFSChannel { - - public NonWorkingMockFSChannel(Backend backend) { - super(backend); - } - - @Override - public synchronized boolean sendPacket(BasicPacket bPacket) { - return false; - } - -} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index 42a22f6f86b..7ee62ae9978 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -88,11 +88,6 @@ public class MockSearchCluster extends SearchCluster { } } - @Override - public ImmutableMultimap nodesByHost() { - return nodesByHost; - } - @Override public Optional directDispatchTarget() { return Optional.empty(); diff --git a/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java b/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java index 6ed8a209cc5..7bdf1916d85 100644 --- a/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/grouping/vespa/HitConverterTestCase.java @@ -3,7 +3,6 @@ package com.yahoo.search.grouping.vespa; import com.yahoo.document.DocumentId; import com.yahoo.document.GlobalId; -import com.yahoo.fs4.QueryPacketData; import com.yahoo.net.URI; import com.yahoo.prelude.fastsearch.GroupingListHit; import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; @@ -61,19 +60,6 @@ public class HitConverterTestCase { assertEquals(ctxHit.getSource(), hit.getSource()); } - @Test - public void requireThatHitTagIsCopiedFromGroupingListContext() { - QueryPacketData ctxTag = new QueryPacketData(); - GroupingListHit ctxHit = context(); - ctxHit.setQueryPacketData(ctxTag); - - HitConverter converter = new HitConverter(new MySearcher(), new Query()); - Hit hit = converter.toSearchHit("default", new FS4Hit(1, createGlobalId(2), 3).setContext(ctxHit)); - assertNotNull(hit); - assertTrue(hit instanceof FastHit); - assertSame(ctxTag, ((FastHit)hit).getQueryPacketData()); - } - @Test public void requireThatSummaryClassIsSet() { Searcher searcher = new MySearcher(); diff --git a/container-search/src/test/java/com/yahoo/search/query/test/RankFeaturesTestCase.java b/container-search/src/test/java/com/yahoo/search/query/test/RankFeaturesTestCase.java new file mode 100644 index 00000000000..8aff81a90db --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/query/test/RankFeaturesTestCase.java @@ -0,0 +1,140 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.query.test; + +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.search.query.ranking.RankFeatures; +import com.yahoo.search.query.ranking.RankProperties; +import com.yahoo.tensor.Tensor; +import com.yahoo.tensor.TensorType; +import com.yahoo.tensor.serialization.TypedBinaryFormat; +import com.yahoo.text.Utf8; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; + +/** + * @author geirst + */ +public class RankFeaturesTestCase { + + @Test + public void requireThatRankPropertiesTakesBothStringAndObject() { + RankProperties p = new RankProperties(); + p.put("string", "b"); + p.put("object", 7); + assertEquals("7", p.get("object").get(0)); + assertEquals("b", p.get("string").get(0)); + } + + @Test + public void requireThatRankFeaturesUsingDoubleAndDoubleToStringEncodeTheSameWay() { + RankFeatures withDouble = new RankFeatures(); + withDouble.put("query(myDouble)", 3.8); + assertEquals(3.8, withDouble.getDouble("query(myDouble)").getAsDouble(), 0.000001); + + RankFeatures withString = new RankFeatures(); + withString.put("query(myDouble)", String.valueOf(3.8)); + + RankProperties withDoubleP = new RankProperties(); + withDouble.prepare(withDoubleP); + RankProperties withStringP = new RankProperties(); + withString.prepare(withStringP); + + byte[] withDoubleEncoded = encode(withDoubleP); + byte[] withStringEncoded = encode(withStringP); + assertEquals(Arrays.toString(withStringEncoded), Arrays.toString(withDoubleEncoded)); + } + + @Test + public void requireThatSingleTensorIsBinaryEncoded() { + TensorType type = new TensorType.Builder().mapped("x").mapped("y").mapped("z").build(); + Tensor tensor = Tensor.from(type, "{ {x:a, y:b, z:c}:2.0, {x:a, y:b, z:c2}:3.0 }"); + assertTensorEncodingAndDecoding(type, "query(my_tensor)", "my_tensor", tensor); + assertTensorEncodingAndDecoding(type, "$my_tensor", "my_tensor", tensor); + } + + @Test + public void requireThatMultipleTensorsAreBinaryEncoded() { + TensorType type = new TensorType.Builder().mapped("x").mapped("y").mapped("z").build(); + Tensor tensor1 = Tensor.from(type, "{ {x:a, y:b, z:c}:2.0, {x:a, y:b, z:c2}:3.0 }"); + Tensor tensor2 = Tensor.from(type, "{ {x:a, y:b, z:c}:5.0 }"); + assertTensorEncodingAndDecoding(type, Arrays.asList( + new Entry("query(tensor1)", "tensor1", tensor1), + new Entry("$tensor2", "tensor2", tensor2))); + } + + private static class Entry { + final String key; + final String normalizedKey; + final Tensor tensor; + Entry(String key, String normalizedKey, Tensor tensor) { + this.key = key; + this.normalizedKey = normalizedKey; + this.tensor = tensor; + } + } + + private static void assertTensorEncodingAndDecoding(TensorType type, List entries) { + RankProperties properties = createRankPropertiesWithTensors(entries); + assertEquals(entries.size(), properties.asMap().size()); + + Map decodedProperties = decode(type, encode(properties)); + assertEquals(entries.size(), properties.asMap().size()); + assertEquals(entries.size(), decodedProperties.size()); + for (Entry entry : entries) { + assertEquals(entry.tensor, decodedProperties.get(entry.normalizedKey)); + } + } + + private static void assertTensorEncodingAndDecoding(TensorType type, String key, String normalizedKey, Tensor tensor) { + assertTensorEncodingAndDecoding(type, Arrays.asList(new Entry(key, normalizedKey, tensor))); + } + + private static RankProperties createRankPropertiesWithTensors(List entries) { + RankFeatures features = new RankFeatures(); + for (Entry entry : entries) { + features.put(entry.key, entry.tensor); + } + RankProperties properties = new RankProperties(); + features.prepare(properties); + return properties; + } + + private static byte[] encode(RankProperties properties) { + ByteBuffer buffer = ByteBuffer.allocate(512); + properties.encode(buffer, true); + byte[] result = new byte[buffer.position()]; + buffer.rewind(); + buffer.get(result); + return result; + } + + private static Map decode(TensorType type, byte[] encodedProperties) { + GrowableByteBuffer buffer = GrowableByteBuffer.wrap(encodedProperties); + byte[] mapNameBytes = new byte[buffer.getInt()]; + buffer.get(mapNameBytes); + int numEntries = buffer.getInt(); + Map result = new HashMap<>(); + for (int i = 0; i < numEntries; ++i) { + byte[] keyBytes = new byte[buffer.getInt()]; + buffer.get(keyBytes); + String key = Utf8.toString(keyBytes); + byte[] value = new byte[buffer.getInt()]; + buffer.get(value); + if (key.contains(".type")) { + result.put(key, Utf8.toString(value)); + } else { + result.put(key, TypedBinaryFormat.decode(Optional.of(type), GrowableByteBuffer.wrap(value))); + } + } + return result; + } + +} -- cgit v1.2.3