From 250f7e16efa36fe2b5e24bf75fe3f0af5cb4fed8 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 18 Sep 2019 10:47:22 +0200 Subject: Revert "Revert "Revert "Revert "Balder/no more fs4 dispatching from fastsearcher"""" --- .../src/main/java/com/yahoo/fs4/BasicPacket.java | 315 --------------- .../com/yahoo/fs4/BufferTooSmallException.java | 17 - .../com/yahoo/fs4/ChannelTimeoutException.java | 23 -- .../src/main/java/com/yahoo/fs4/DocsumPacket.java | 33 +- .../src/main/java/com/yahoo/fs4/DocumentInfo.java | 74 ---- .../src/main/java/com/yahoo/fs4/EolPacket.java | 35 -- .../src/main/java/com/yahoo/fs4/ErrorPacket.java | 48 --- .../src/main/java/com/yahoo/fs4/FS4Properties.java | 60 --- .../main/java/com/yahoo/fs4/GetDocSumsPacket.java | 202 +-------- .../main/java/com/yahoo/fs4/HexByteIterator.java | 42 -- .../src/main/java/com/yahoo/fs4/Packet.java | 114 ------ .../src/main/java/com/yahoo/fs4/PacketDecoder.java | 200 --------- .../src/main/java/com/yahoo/fs4/PacketDumper.java | 135 ------- .../main/java/com/yahoo/fs4/PacketListener.java | 16 - .../yahoo/fs4/PacketNotificationsBroadcaster.java | 34 -- .../main/java/com/yahoo/fs4/PacketQueryTracer.java | 53 --- .../src/main/java/com/yahoo/fs4/PingPacket.java | 26 -- .../src/main/java/com/yahoo/fs4/PongPacket.java | 92 ----- .../src/main/java/com/yahoo/fs4/QueryPacket.java | 215 ---------- .../main/java/com/yahoo/fs4/QueryPacketData.java | 91 ----- .../main/java/com/yahoo/fs4/QueryResultPacket.java | 228 ----------- .../src/main/java/com/yahoo/fs4/mplex/Backend.java | 449 --------------------- .../java/com/yahoo/fs4/mplex/ConnectionPool.java | 90 ----- .../main/java/com/yahoo/fs4/mplex/FS4Channel.java | 255 ------------ .../java/com/yahoo/fs4/mplex/FS4Connection.java | 371 ----------------- .../yahoo/fs4/mplex/InvalidChannelException.java | 15 - .../java/com/yahoo/fs4/mplex/ListenerPool.java | 56 --- 27 files changed, 3 insertions(+), 3286 deletions(-) delete mode 100644 container-search/src/main/java/com/yahoo/fs4/BasicPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/EolPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/FS4Properties.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/Packet.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketDumper.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketListener.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PingPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/PongPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/QueryPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java delete mode 100644 container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java (limited to 'container-search/src/main/java/com/yahoo/fs4') diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java deleted file mode 100644 index f87721dc503..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java +++ /dev/null @@ -1,315 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.log.LogLevel; -import com.yahoo.prelude.fastsearch.TimeoutException; -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.logging.Logger; - -/** - * Superclass of fs4 packets - * - * @author bratseth - */ -public abstract class BasicPacket { - - private final Compressor compressor = new Compressor(); - - private static Logger log = Logger.getLogger(QueryResultPacket.class.getName()); - private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024); - public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags. - - private byte[] encodedBody; - - /** The length of this packet in bytes or -1 if not known */ - protected int length = -1; - - /** - * A timestamp which can be set or inspected by clients of this class - * but which is never updated by the class itself. This is mostly - * a convenience for when you need to queue packets or retain them - * in some structure where their validity is limited by a timeout - * or similar. - */ - private long timeStamp = -1; - - private int compressionLimit = 0; - - private CompressionType compressionType; - - /** - * Sets the number of bytes the package must be before activating compression. - * A value of 0 means no compression. - * - * @param limit smallest package size that triggers compression. - */ - public void setCompressionLimit(int limit) { compressionLimit = limit; } - - public void setCompressionType(String type) { - compressionType = CompressionType.valueOf(type); - } - - /** - * Fills this package from a byte buffer positioned at the first byte of the package - * - * @return this for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public BasicPacket decode(ByteBuffer buffer) { - length = buffer.getInt()+4; // Streamed packet length is the length-4 - int code = buffer.getInt(); - - decodeAndDecompressBody(buffer, code, length - 2*4); - return this; - } - - protected void decodeAndDecompressBody(ByteBuffer buffer, int code, int packetLength) { - byte compressionType = (byte)((code & ~CODE_MASK) >> 24); - boolean isCompressed = compressionType != 0; - codeDecodedHook(code & CODE_MASK); - if (isCompressed) { - int uncompressedSize = buffer.getInt(); - int compressedSize = packetLength - 4; - int offset = 0; - byte[] compressedData; - if (buffer.hasArray()) { - compressedData = buffer.array(); - offset = buffer.arrayOffset() + buffer.position(); - buffer.position(buffer.position() + compressedSize); - } else { - compressedData = new byte[compressedSize]; - buffer.get(compressedData); - } - byte[] body = compressor.decompress(CompressionType.valueOf(compressionType), compressedData, offset, - uncompressedSize, Optional.of(compressedSize)); - ByteBuffer bodyBuffer = ByteBuffer.wrap(body); - length += uncompressedSize - (compressedSize + 4); - decodeBody(bodyBuffer); - } else { - decodeBody(buffer); - } - } - - /** - * Decodes the body of this package from a byte buffer - * positioned at the first byte of the package. - * - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public void decodeBody(ByteBuffer buffer) { - throw new UnsupportedOperationException("Decoding of " + this + " is not implemented"); - } - - /** - * Called when the packet code is decoded. - * This default implementation just throws an exception if the code - * is not the code of this packet. Packets which has several possible codes - * will use this method to store the code. - */ - protected void codeDecodedHook(int code) { - if (code != getCode()) - throw new RuntimeException("Can not decode " + code + " into " + this); - } - - /** - *

Encodes this package onto the given buffer at the current position. - * The position of the buffer after encoding is the byte following - * the last encoded byte.

- * - *

This method will ensure that everything is written provided - * sufficient capacity regardless of the buffer limit. - * When returning, the limit is at the end of the package (qual to the - * position).

- * - * @return this for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public BasicPacket encode(ByteBuffer buffer) throws BufferTooSmallException { - int oldLimit = buffer.limit(); - int startPosition = buffer.position(); - - buffer.limit(buffer.capacity()); - try { - buffer.putInt(4); // Real length written later, when we know it - buffer.putInt(getCode()); - - encodeAndCompressBody(buffer, startPosition); - } - catch (java.nio.BufferOverflowException e) { - // reset buffer to expected state - buffer.position(startPosition); - buffer.limit(oldLimit); - throw new BufferTooSmallException("Destination buffer too small while encoding packet"); - } - - return this; - } - - protected void encodeAndCompressBody(ByteBuffer buffer, int startPosition) { - int startOfBody = buffer.position(); - encodeBody(buffer); - setEncodedBody(buffer, startOfBody, buffer.position() - startOfBody); - length = buffer.position() - startPosition; - - if (compressionLimit != 0 && length-4 > compressionLimit) { - byte[] compressedBody; - compressionType = CompressionType.LZ4; - LZ4Factory factory = LZ4Factory.fastestInstance(); - LZ4Compressor compressor = factory.fastCompressor(); - compressedBody = compressor.compress(encodedBody); - - log.log(LogLevel.DEBUG, "Uncompressed size: " + encodedBody.length + ", Compressed size: " + compressedBody.length); - if (compressedBody.length + 4 < encodedBody.length) { - buffer.position(startPosition); - buffer.putInt(compressedBody.length + startOfBody - startPosition + 4 - 4); // +4 for compressed size - buffer.putInt(getCompressedCode(compressionType)); - buffer.position(startOfBody); - buffer.putInt(encodedBody.length); - buffer.put(compressedBody); - buffer.limit(buffer.position()); - return; - } - } - buffer.putInt(startPosition, length - 4); // Encoded length 4 less than actual length - buffer.limit(buffer.position()); - } - - private int getCompressedCode(CompressionType compression) { - int code = compression.getCode(); - return getCode() | (code << 24); - } - - /** - * Encodes the body of this package onto the given buffer at the current position. - * The position of the buffer after encoding is the byte following - * the last encoded byte. - * - * @throws UnsupportedOperationException if not implemented in the subclass - */ - protected void encodeBody(ByteBuffer buffer) { - throw new UnsupportedOperationException("Encoding of " + this + " is not implemented"); - } - - private void setEncodedBody(ByteBuffer b, int start, int length) { - encodedBody = new byte[length]; - b.position(start); - b.get(encodedBody); - } - - public boolean isEncoded() { - return encodedBody != null; - } - - /** - * Just a place holder to make the APIs simpler. - */ - public Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException { - throw new UnsupportedOperationException("This class does not support a channel ID"); - } - - /** - * Allocate the needed buffers and encode the packet using the given - * channel ID (if pertinent). - * - * If this packet does not use a channel ID, the ID will be ignored. - */ - private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) { - while (true) { - try { - if (hasChannelId()) { - encode(buffer, channelId); - } else { - encode(buffer); - } - buffer.flip(); - break; - } - catch (BufferTooSmallException e) { - buffer = ByteBuffer.allocate(buffer.capacity()*2); - } - } - return buffer; - } - - /** - * Return buffer containing the encoded form of this package and - * remove internal reference to it. - */ - public final ByteBuffer grantEncodingBuffer(int channelId) { - return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE)); - } - - public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) { - return allocateAndEncode(channelId, buffer); - } - - /** Returns the code of this package */ - public abstract int getCode(); - - /** - * Returns the length of this body (including header (8 bytes) and body), - * or -1 if not known. - * Note that the streamed packet format length is 4 bytes less than this length, - * for unknown reasons. - * The length is always known when decodeBody is called. - */ - public int getLength() { - return length; - } - - /** - * Set the timestamp field of the packet. - * - * A timestamp which can be set or inspected by clients of this class - * but which is never updated by the class itself. This is mostly - * a convenience for when you need to queue packets or retain them - * in some structure where their validity is limited by a timeout - * or similar. - */ - public void setTimestamp (long timeStamp) { - this.timeStamp = timeStamp; - } - - /** - * Get the timestamp field of this packet. Note that this is - * not part of the FS4 protocol. @see #setTimestamp for - * more information - * - */ - public long getTimestamp () { - return timeStamp; - } - - public String toString() { - return "packet with code " + getCode(); - } - - /** Whether this is a packets which can encode a channel ID. */ - public boolean hasChannelId() { - return false; - } - - /** - * Throws an IOException if the packet is not of the expected type - */ - public void ensureInstanceOf(Class type, String name) throws IOException { - if ((type.isAssignableFrom(getClass()))) return; - - if (this instanceof ErrorPacket) { - ErrorPacket errorPacket = (ErrorPacket) this; - if (errorPacket.getErrorCode() == 8) - throw new TimeoutException("Query timed out in " + name); - else - throw new IOException("Received error from backend in " + name + ": " + this); - } else { - throw new IOException("Received " + this + " when expecting " + type); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java b/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java deleted file mode 100644 index 38f312e6c8d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/BufferTooSmallException.java +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- -// -// - -package com.yahoo.fs4; -/** - * Signal that the buffer used to hold a packet is too small - * - * @author Bjorn Borud - */ -@SuppressWarnings("serial") -public class BufferTooSmallException extends Exception { - public BufferTooSmallException (String message) { - super(message); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java b/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java deleted file mode 100644 index fe86e7273c0..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/ChannelTimeoutException.java +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- -// - -package com.yahoo.fs4; - - -/** - * Signal that a timeout occurred in the Channel2 communiction - * - * @author Bjorn Borud - * - */ -@SuppressWarnings("serial") -public class ChannelTimeoutException extends Exception -{ - public ChannelTimeoutException (String msg) { - super(msg); - } - - public ChannelTimeoutException () { - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java b/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java index ad679e0c53c..3105b645cd0 100644 --- a/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/DocsumPacket.java @@ -1,10 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4; -import com.yahoo.document.GlobalId; - -import java.nio.ByteBuffer; - /** * An "extended query result" packet. This is the query result * packets used today, they allow more flexible sets of parameters @@ -12,15 +8,10 @@ import java.nio.ByteBuffer; * * @author bratseth */ -public class DocsumPacket extends Packet { - - private GlobalId globalId = new GlobalId(new byte[GlobalId.LENGTH]); +public class DocsumPacket { private byte[] data; - private DocsumPacket() { - } - /** * Constructor used by streaming search */ @@ -28,31 +19,11 @@ public class DocsumPacket extends Packet { data = buffer.clone(); } - public static DocsumPacket create() { - return new DocsumPacket(); - } - - public int getCode() { return 205; } - - /** - * Fills this packet from a byte buffer positioned at the - * first byte of the packet - */ - public void decodeBody(ByteBuffer buffer) { - byte[] rawGid = new byte[GlobalId.LENGTH]; - buffer.get(rawGid); - globalId = new GlobalId(rawGid); - data=new byte[getLength()-12-GlobalId.LENGTH]; - buffer.get(data); - } - - public GlobalId getGlobalId() { return globalId; } public byte[] getData() { return data; } public String toString() { - return "docsum packet [globalId: " + globalId.toString() + - ", size: " + (data==null ? "(no data)" : data.length + " bytes") + " ]"; + return "docsum packet size: " + (data==null ? "(no data)" : data.length + " bytes") + " ]"; } } diff --git a/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java b/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java deleted file mode 100644 index 8294ae5796d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.document.GlobalId; - -import java.nio.ByteBuffer; - -/** - * Meta attributes on documents (not the document summaries themselves). - * Used in query results and get docusum packages - * - * @author bratseth - */ -public class DocumentInfo implements Cloneable { - - private final byte [] globalId; - private final double metric; - private final int partId; - private final int distributionKey; - private final byte[] sortData; - - DocumentInfo(ByteBuffer buffer, QueryResultPacket owner, byte[] sortData) { - globalId = new byte[GlobalId.LENGTH]; - buffer.get(globalId); - metric = decodeMetric(buffer); - partId = owner.getMldFeature() ? buffer.getInt() : 0; - distributionKey = owner.getMldFeature() ? buffer.getInt() : 0; - this.sortData = sortData; - } - - public DocumentInfo(GlobalId globalId, int metric, int partId, int distributionKey) { - this.globalId = globalId.getRawId(); - this.metric = metric; - this.partId = partId; - this.distributionKey = distributionKey; - this.sortData = null; - } - - private double decodeMetric(ByteBuffer buffer) { - return buffer.getDouble(); - } - - public GlobalId getGlobalId() { return new GlobalId(globalId); } - public byte [] getRawGlobalId() { return globalId; } - - /** Raw rank score */ - public double getMetric() { return metric; } - - /** Partition this document resides on */ - public int getPartId() { return partId; } - - /** Unique key for the node this document resides on */ - public int getDistributionKey() { return distributionKey; } - - public byte[] getSortData() { - return sortData; - } - - public String toString() { - return "document info [globalId=" + new GlobalId(globalId).toString() + ", metric=" + metric + "]"; - } - - /** - * Implements the Cloneable interface - */ - public Object clone() { - try { - return super.clone(); - } - catch (CloneNotSupportedException e) { - throw new RuntimeException("Someone inserted a nonclonable superclass"); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/EolPacket.java b/container-search/src/main/java/com/yahoo/fs4/EolPacket.java deleted file mode 100644 index 1e907f67696..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/EolPacket.java +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * A EOL packet signaling end of transmission. - * This package has no body. - * - * @author bratseth - */ -public class EolPacket extends Packet { - - private EolPacket() { - } - - public static EolPacket create() { - return new EolPacket(); - } - - public int getCode() { return 200; } - - public void decodeBody(ByteBuffer buffer) { - // No body - } - - public void encodeBody(ByteBuffer buffer) { - // No body - } - - public String toString() { - return "EOL packet"; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java b/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java deleted file mode 100644 index f21663272d4..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/ErrorPacket.java +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.text.Utf8; - -/** - * - * An error packet signaling that an error occurred. - * - * @author Bjørn Borud - */ -public class ErrorPacket extends Packet { - private int errorCode; - private int errmsgLen; - private String message; - - private ErrorPacket() { - } - - public static ErrorPacket create() { - return new ErrorPacket(); - } - - public int getCode() { return 203; } - - public void decodeBody(ByteBuffer buffer) { - errorCode = buffer.getInt(); - errmsgLen = buffer.getInt(); - - byte[] tmp = new byte[errmsgLen]; - buffer.get(tmp); - - message = Utf8.toString(tmp); - } - - public int getErrorCode () { return errorCode; } - - public void encodeBody(ByteBuffer buffer) { - // No body - } - - public String toString() { - return (message + " (" + errorCode + ")"); - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java b/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java deleted file mode 100644 index f5f1fca0801..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/FS4Properties.java +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.text.Utf8; - -import java.nio.ByteBuffer; - -public class FS4Properties { - private String name; - - static public class Entry { - public final String key; - private final byte [] val; - public Entry(byte[] k, byte[] v) { - key = Utf8.toString(k); - val = v; - } - public final byte [] getValue() { return val; } - }; - - private Entry[] entries; - - void decode(ByteBuffer buffer) { - int nameLen = buffer.getInt(); - byte[] utf8name = new byte[nameLen]; - buffer.get(utf8name); - this.setName(Utf8.toString(utf8name)); - - int n = buffer.getInt(); - setEntries(new Entry[n]); - for (int j = 0; j < n; j++) { - int keyLen = buffer.getInt(); - byte[] key = new byte[keyLen]; - buffer.get(key); - - int valLen = buffer.getInt(); - byte[] value = new byte[valLen]; - buffer.get(value); - - getEntries()[j] = new Entry(key, value); - } - } - - public Entry[] getEntries() { - return entries; - } - - public void setEntries(Entry[] entries) { - this.entries = entries; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java b/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java index 7353d0730a4..6a808e17b5c 100644 --- a/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/GetDocSumsPacket.java @@ -1,215 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4; -import com.yahoo.document.GlobalId; -import com.yahoo.log.LogLevel; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.query.Item; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.result.Hit; -import com.yahoo.text.Utf8; - -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.logging.Logger; - /** *

A packet for requesting a list of document summaries. * This packet can be encoded only.

* * @author bratseth */ -public class GetDocSumsPacket extends Packet { +public class GetDocSumsPacket { /** Session id key. Yep, putting this here is ugly as hell */ public static final String sessionIdKey = "sessionId"; - private static final Logger log = Logger.getLogger(GetDocSumsPacket.class.getName()); - private final Result result; - private final Query query; - private final String summaryClass; - private QueryPacketData queryPacketData = null; - private int flags = 0; - - /** - * True if we should send the query with this docsum, false otherwise. - * Sending the query is necessary if we need to return summary features or generate a dynamic summary - */ - private final boolean sendQuery; - - private GetDocSumsPacket(Result result, String summaryClass, boolean sendQuery) { - this.result = result; - this.query = result.getQuery(); - this.summaryClass = summaryClass; - this.sendQuery = sendQuery; - } - - /** - * Creates a get docsums packet for a certain result - */ - public static GetDocSumsPacket create(Result result, String summaryClass, boolean sendQuery) { - return new GetDocSumsPacket(result, summaryClass, sendQuery); - } - - /** - * features bits, as given in searchlib/src/searchlib/common/packets.h - * definition of enum getdocsums_features - */ - public static final int GDF_MLD = 0x00000001; - public static final int GDF_QUERYSTACK = 0x00000004; - public static final int GDF_RANKP_QFLAGS = 0x00000010; - public static final int GDF_LOCATION = 0x00000080; - public static final int GDF_RESCLASSNAME = 0x00000800; - public static final int GDF_PROPERTIES = 0x00001000; - public static final int GDF_FLAGS = 0x00002000; - - public void encodeBody(ByteBuffer buffer) { - setFieldsFromHits(); - - boolean useQueryCache = query.getRanking().getQueryCache(); - // If feature cache is used we need to include the sessionId as key. - if (useQueryCache) { // TODO: Move this decision (and the key) to ranking - query.getRanking().getProperties().put(sessionIdKey, query.getSessionId().toString()); - } - - // set the default features - long features = GDF_MLD; - if (sendQuery) - features |= GDF_QUERYSTACK; - features |= GDF_RANKP_QFLAGS; - - // do we want a specific result class? - if (summaryClass != null) - features |= GDF_RESCLASSNAME; - if (query.getRanking().getLocation() != null) - features |= GDF_LOCATION; - if (query.hasEncodableProperties()) - features |= GDF_PROPERTIES; - if (flags != 0) { - features |= GDF_FLAGS; - } - buffer.putInt((int)features); - buffer.putInt(0); //Unused, was docstamp - long timeLeft = query.getTimeLeft(); - buffer.putInt(Math.max(1, (int)timeLeft)); // Safety to avoid sending down 0 or negative number - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, "Timeout from query(" + query.getTimeout() + "), sent to backend: " + timeLeft); - } - - if (queryPacketData != null) - encodeQueryFromPacketData(buffer, useQueryCache); - else - encodeQuery(buffer); - - if (flags != 0) - buffer.putInt(flags); - encodeDocIds(buffer); - } - - private void setFieldsFromHits() { - for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { - Hit h = i.next(); - if (h instanceof FastHit) { - FastHit hit = (FastHit)h; - QueryPacketData tag = hit.getQueryPacketData(); - if (tag != null) { - this.queryPacketData = tag; - break; - } - } - } - } - - private void encodeQueryFromPacketData(ByteBuffer buffer, boolean reencodePropertyMaps) { - queryPacketData.encodeRankProfile(buffer); - queryPacketData.encodeQueryFlags(buffer); - - encodeSummaryClass(buffer); - - if (reencodePropertyMaps || ! sendQuery) // re-encode when we're not sending query, to avoid resending all encoded properties - query.encodeAsProperties(buffer, sendQuery); - else - queryPacketData.encodePropertyMaps(buffer); - - if (sendQuery) - queryPacketData.encodeQueryStack(buffer); - queryPacketData.encodeLocation(buffer); - } - - private void encodeSummaryClass(ByteBuffer buffer) { - if (summaryClass != null) { - byte[] tmp = Utf8.toBytes(summaryClass); - buffer.putInt(tmp.length); - buffer.put(tmp); - } - } - - private void encodeQuery(ByteBuffer buffer) { - Item.putString(query.getRanking().getProfile(), buffer); - buffer.putInt(QueryPacket.getQueryFlags(query)); - - encodeSummaryClass(buffer); - - query.encodeAsProperties(buffer, sendQuery); - - if (sendQuery) { - // The stack must be resubmitted to generate dynamic docsums - int itemCountPosition = buffer.position(); - buffer.putInt(0); - int dumpLengthPosition = buffer.position(); - buffer.putInt(0); - int count = query.encode(buffer); - buffer.putInt(itemCountPosition, count); - buffer.putInt(dumpLengthPosition, buffer.position() - dumpLengthPosition - 4); - } - - if (query.getRanking().getLocation() != null) { - int locationLengthPosition = buffer.position(); - buffer.putInt(0); - int locationLength = query.getRanking().getLocation().encode(buffer); - buffer.putInt(locationLengthPosition, locationLength); - } - } - - private void encodeDocIds(ByteBuffer buffer) { - byte[] emptyGid = new byte[GlobalId.LENGTH]; - for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { - Hit hit = i.next(); - if (hit instanceof FastHit && !hit.isFilled(summaryClass)) { - FastHit fastHit = (FastHit)hit; - buffer.put(fastHit.getGlobalId() != null ? fastHit.getRawGlobalId() : emptyGid); - buffer.putInt(fastHit.getPartId()); - buffer.putInt(0); //Unused, was docstamp - } - } - } - - public int getCode() { - return 219; - } - - public String toString() { - return "Get docsums x packet fetching " + getNumDocsums() + " docsums and packet length of " + getLength() + " bytes."; - } - - public int getNumDocsums() { - int num = 0; - for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext(); ) { - Hit hit = i.next(); - if (hit instanceof FastHit && !hit.isFilled(summaryClass)) { - num++; - } - } - return num; - } - - /** - * Return the document summary class we want the fdispatch - * to use when replying to us - */ - @SuppressWarnings("UnusedDeclaration") - public String getSummaryClass() { - return summaryClass; - } } diff --git a/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java b/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java deleted file mode 100644 index 78ba857c475..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/HexByteIterator.java +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -/** - * Provides sequential access to each byte of a buffer - * as a hexadecimal string of length 2. - * - * @author Tony Vaagenes - */ -public final class HexByteIterator implements Iterator { - private final ByteBuffer buffer; - - private String hexByte(byte b) { - final int unsignedValue = ((int)b) & 0xff; - String s = Integer.toHexString(unsignedValue).toUpperCase(); - - boolean singleChar = unsignedValue < 0x10; - if (singleChar) - return '0' + s; - else - return s; - } - - public boolean hasNext() { - return buffer.hasRemaining(); - } - - public String next() { - return hexByte(buffer.get()); - } - - public void remove() { - throw new UnsupportedOperationException(); - } - - public HexByteIterator(ByteBuffer buffer) { - this.buffer = buffer.slice(); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/Packet.java b/container-search/src/main/java/com/yahoo/fs4/Packet.java deleted file mode 100644 index 1e9deede59d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/Packet.java +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.util.logging.Logger; - -/** - * Superclass of fs4 packets containing channel/query ID - * - * @author bratseth - */ -public abstract class Packet extends BasicPacket { - - private static Logger log = Logger.getLogger(Packet.class.getName()); - - /** - * The channel at which this packet will be sent or was received, - * or -1 when this is not known - */ - protected int channel = -1; - - /** - * Fills this package from a byte buffer positioned at the first - * byte of the package - * - * @return this Packet (as a BasicPacket) for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public BasicPacket decode(ByteBuffer buffer) { - int originalPos = buffer.position(); - length = buffer.getInt()+4; // Streamed packet length is the length-4 - int packetLength = length; - try { - int code = buffer.getInt(); - channel = buffer.getInt(); - - decodeAndDecompressBody(buffer, code, length - 3*4); - } - finally { - int targetPosition = (originalPos + packetLength); - if (buffer.position() != targetPosition) { - log.warning("Position in buffer is " + buffer.position() + " should be " + targetPosition); - buffer.position(targetPosition); - } - } - - return this; - } - - /** - *

Encodes this package onto the given buffer at the current - * position. The position of the buffer after encoding is the - * byte following the last encoded byte.

- * - *

This method will ensure that everything is written provided - * sufficient capacity regardless of the buffer limit. - * When returning, the limit is at the end of the package (qual to the - * position).

- * - * @return this for convenience - * @throws UnsupportedOperationException if not implemented in the subclass - */ - public final Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException { - this.channel = channel; - int oldLimit = buffer.limit(); - int startPosition = buffer.position(); - - buffer.limit(buffer.capacity()); - try { - buffer.putInt(8); // Real length written later, when we know it - buffer.putInt(getCode()); - buffer.putInt(channel); - - encodeAndCompressBody(buffer, startPosition); - } - catch (java.nio.BufferOverflowException e) { - // reset buffer to expected state - buffer.position(startPosition); - buffer.limit(oldLimit); - throw new BufferTooSmallException("Destination buffer too small while encoding packet"); - } - return this; - } - - /** - * Get the channel id of the packet. In the FS4 transport protocol, - * there is the concept of a channel. This must not be confused - * with all the other channels we have floating around this code (aargh!). - *

- * The channel can be thought of as a way to pair up requests and - * responses in the FS4 protocol: A response always belongs to - * to a channel and it is the clients responsibility to not re-use - * channel ids within the same connection. - *

- * Summary: This "channel" means "session id" - * - * @return FS4 channel id - * - */ - public int getChannel() { return channel; } - - public void setChannel(int channel) { this.channel=channel; } - - - /** Informs that this packets needs a channel ID. */ - public boolean hasChannelId() { - return true; - } - - public String toString() { - return "packet with code " + getCode() + ", channelId=" + getChannel(); - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java b/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java deleted file mode 100644 index 3e673717d02..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketDecoder.java +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * Returns the correct package for a package byte stream - * - * @author bratseth - * @author Bj\u00f8rn Borud - */ -public class PacketDecoder { - - /** Represents a packet and the data used to construct it */ - public static class DecodedPacket { - public BasicPacket packet; - public ByteBuffer consumedBytes; - - DecodedPacket(BasicPacket packet, ByteBuffer consumedBytes) { - this.packet = packet; - this.consumedBytes = consumedBytes; - } - } - - private PacketDecoder() {} - - /** - * Returns the package starting at the current position in the buffer - * - * @throws IllegalArgumentException if an unknown package code is - * encountered - * @throws java.nio.BufferUnderflowException if the buffer contains too little - * data to decode the pcode. - */ - public static BasicPacket decode(ByteBuffer buffer) { - int packetCode = buffer.getInt(buffer.position()+4); - packetCode &= BasicPacket.CODE_MASK; - - switch (packetCode) { - case 200: - return EolPacket.create().decode(buffer); - - case 203: - return ErrorPacket.create().decode(buffer); - - case 205: - return DocsumPacket.create().decode(buffer); - - case 217: - return QueryResultPacket.create().decode(buffer); - - case 221: - return PongPacket.create().decode(buffer); - - default: - throw new IllegalArgumentException("No support for packet " + packetCode); - } - } - - /** Gives the packet along with the bytes consumed to construct it. */ - public static DecodedPacket decodePacket(ByteBuffer buffer) { - ByteBuffer dataUsed = buffer.slice(); - int start = buffer.position(); - - BasicPacket packet = decode(buffer); - dataUsed.limit(buffer.position() - start); - return new DecodedPacket(packet, dataUsed); - } - - /** Sniff channel ID for query result packets */ - public static int sniffChannel(ByteBuffer buffer) { - int remaining = buffer.remaining(); - if (remaining < 12) { - return 0; - } - int packetCode = buffer.getInt(buffer.position()+4); - packetCode &= BasicPacket.CODE_MASK; - switch (packetCode) { - case 202: - case 208: - case 214: - case 217: - return buffer.getInt(buffer.position()+8); - default: - return 0; - } - } - - /** - * Test whether the buffer contains (the start of) a pong packet. - * - * Returns false if there is not enough data to determine the - * answer. - */ - public static boolean isPongPacket(ByteBuffer buffer) { - - int remaining = buffer.remaining(); - if (remaining < 8) - return false; - int packetCode = buffer.getInt(buffer.position()+4); - packetCode &= BasicPacket.CODE_MASK; - if (packetCode == 221) - return true; - else - return false; - } - - /** - * Note that it assumes that the position of the ByteBuffer is at the - * start of a packet and that we have enough data to actually read - * an integer out of the buffer. - * - * @return Return the length of the fs4 packet. Returns -1 if length - * could not be determined because we had too little - * data in the buffer. - * - */ - public static int packetLength(ByteBuffer buffer) - { - if (buffer.remaining() < 4) { - return -1; - } - return (buffer.getInt(buffer.position()) + 4); - } - - /** - * Takes a buffer possibly containing a packet. - * - *

- * If we return a packet when we return: - *

    - *
  • the buffer is positioned at the beginning of the next - * packet when we return. - *
  • limit is unchanged - *
- * - *

- * If we return null there were no more packets - * there to decode and the following is true of the buffer - *

    - *
  • the buffer is compacted, ie. partial packet is - * moved to the start, or if no more data is available - * the buffer is cleared. - *
  • the position is set to the next byte after the valid - * data so the buffer is ready for reading. - *
- * - * If there are no packets to be returned the buffer is compacted - * (ie. content is moved to the start and read-pointer is positioned - * - * @return Returns the next available packet from the buffer or - * null if there are no more complete - * packets in the buffer at this time. - */ - public static DecodedPacket extractPacket(ByteBuffer buffer) - throws BufferTooSmallException - { - int remaining = buffer.remaining(); - - // if we are empty we can reset the buffer - if (remaining == 0) { - buffer.clear(); - return null; - } - - // if we can't figure out the size because we have less than - // 4 bytes we prepare the buffer for more data reading. - if (remaining < 4) { - buffer.compact(); - return null; - } - - int plen = packetLength(buffer); - - // -1 means that we do not have enough data to read the packet - // size yet - if (plen == -1) { - buffer.compact(); - return null; - } - - // if we haven't read an entire packet yet, we compact and return - // (same as above but separate code for clarity). note that this - // also occurs when there is no physical room for the packet, so - // clients of this API need to be aware of this and check for it - if (remaining < plen) { - - // if the read buffer is too small we must take drastic action - if (buffer.capacity() < plen) { - throw new BufferTooSmallException("Buffer too small to hold packet"); - } - - buffer.compact(); - return null; - } - - return PacketDecoder.decodePacket(buffer); - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java b/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java deleted file mode 100644 index 6b2c792837a..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketDumper.java +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Logger; - -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.log.LogLevel; -import com.yahoo.search.Query; - -/** - * Responsible for dumping query & query result packets - * - * @author Tony Vaagenes - */ -public class PacketDumper implements PacketListener { - /** High level representation of packet types (e.g. query, result, ...) */ - public static enum PacketType { - query(QueryPacket.class), - result(QueryResultPacket.class); - - Class implementationType; - - PacketType(Class implementationType) { - this.implementationType = implementationType; - } - } - - private static Logger log = Logger.getLogger(PacketDumper.class.getSimpleName()); - - private volatile boolean disabled = true; - private final File logDirectory; - private final Map, DataOutputStream> dumpFiles = - new HashMap<>(); - private final String fileNamePattern; - - private void handlePacket(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm, String direction) { - //minimize overhead when disabled: - if (disabled) - return; - - try { - DataOutputStream stream = getOutputStream(packet); - if (stream != null) { - synchronized (stream) { - stream.writeChars(packet.getTimestamp() + " " + direction + " packet on channel " + channel.getChannelId()); - String indent = " "; - Query query = channel.getQuery(); - if (query != null) - stream.writeChars('\n' + indent + "Query: '" + query.getModel().getQueryString()); - hexDump(indent, stream, serializedForm); - - stream.writeChar('\n'); - stream.flush(); - } - } - } catch (IOException e) { - log.log(LogLevel.WARNING, "Could not log packet.", e); - } - } - - private void hexDump(String indent, DataOutputStream stream, ByteBuffer serializedForm) throws IOException { - HexByteIterator hexByteIterator = new HexByteIterator(serializedForm); - - long count = 0; - final int maxNumCharacters = 80; - while (hexByteIterator.hasNext()) { - if (count++ % maxNumCharacters == 0) - stream.writeChar('\n'); - stream.writeChars(hexByteIterator.next()); - } - } - - private synchronized DataOutputStream getOutputStream(BasicPacket packet) { - return dumpFiles.get(packet.getClass()); - } - - public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - handlePacket(channel, packet, serializedForm, "Sent"); - } - - public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - handlePacket(channel, packet, serializedForm, "Received"); - } - - public synchronized void dumpPackets(PacketType packetType, boolean on) throws IOException { - OutputStream stream = dumpFiles.get(packetType.implementationType); - if (!on && stream != null) - closeFile(stream, packetType); - else if (on && stream == null) - openFile(packetType); - } - - private void openFile(PacketType packetType) throws FileNotFoundException { - if (!logDirectory.exists() || - logDirectory.mkdirs()) { - - throw new RuntimeException("PacketDumper: Could not create log directory " + logDirectory); - } - String fileName = fileNamePattern.replace("%s", packetType.toString()); - boolean append = true; - DataOutputStream outputStream = - new DataOutputStream( - new BufferedOutputStream( - new FileOutputStream(new File(logDirectory, fileName), append))); - dumpFiles.put(packetType.implementationType, outputStream); - - disabled = dumpFiles.isEmpty(); - } - - private void closeFile(OutputStream stream, PacketType packetType) throws IOException { - try { - synchronized (stream) { - stream.close(); - } - } finally { - dumpFiles.remove(packetType.implementationType); - disabled = dumpFiles.isEmpty(); - } - } - - public PacketDumper(File logDirectory, String fileNamePattern) { - this.logDirectory = logDirectory; - this.fileNamePattern = fileNamePattern; - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketListener.java b/container-search/src/main/java/com/yahoo/fs4/PacketListener.java deleted file mode 100644 index 113da03b420..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketListener.java +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.fs4.mplex.FS4Channel; - -/** - * Interface for recieving notifications of packets sent or recieved. - * - * @author Tony Vaagenes - */ -public interface PacketListener { - void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm); - void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm); -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java b/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java deleted file mode 100644 index 1be79031d1b..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketNotificationsBroadcaster.java +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.fs4.mplex.FS4Channel; - -/** - * Broadcasts packet notifications to a list of listeners. - * - * @author Tony Vaagenes - */ -public class PacketNotificationsBroadcaster implements PacketListener { - - private final PacketListener[] listeners; - - public PacketNotificationsBroadcaster(PacketListener... listeners) { - this.listeners = listeners; - } - - @Override - public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - if (channel == null) return; - for (PacketListener listener : listeners) - listener.packetSent(channel, packet, serializedForm); - } - - @Override - public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - if (channel == null) return; - for (PacketListener listener : listeners) - listener.packetReceived(channel, packet, serializedForm); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java b/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java deleted file mode 100644 index b577ef31ad8..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PacketQueryTracer.java +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.search.Query; - -/** - * Adds packets to the query context - * - * @author Tony Vaagenes - */ -public class PacketQueryTracer implements PacketListener { - - private final static int traceLevel = 10; - - private void addTrace(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - Query query = channel.getQuery(); - if (query != null && query.getTraceLevel() >= traceLevel) { - StringBuilder traceString = new StringBuilder(); - traceString.append(packet.getClass().getSimpleName()).append(": "); - hexDump(serializedForm, traceString); - - final boolean includeQuery = true; - query.trace(traceString.toString(), includeQuery, traceLevel); - } - } - - private void hexDump(ByteBuffer serializedForm, StringBuilder traceString) { - HexByteIterator hexByteIterator = new HexByteIterator(serializedForm); - - long count = 0; - final int maxNumCharacters = 80; - while (hexByteIterator.hasNext()) { - if (++count % maxNumCharacters == 0) - traceString.append('\n'); - traceString.append(hexByteIterator.next()); - } - } - - @Override - public void packetSent(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - addTrace(channel, packet, serializedForm); - } - - @Override - public void packetReceived(FS4Channel channel, BasicPacket packet, ByteBuffer serializedForm) { - addTrace(channel, packet, serializedForm); - } - -} - diff --git a/container-search/src/main/java/com/yahoo/fs4/PingPacket.java b/container-search/src/main/java/com/yahoo/fs4/PingPacket.java deleted file mode 100644 index 7c1f8df1101..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PingPacket.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * A ping packet for FS4. This packet has no data. It maps to - * PCODE_MONITORQUERY the C++ implementation of the protocol. - * - * @author Steinar Knutsen - */ -public class PingPacket extends BasicPacket { - - public int getCode() { return 220; } - - public void encodeBody(ByteBuffer buffer) { - buffer.putInt(MQF_QFLAGS); - buffer.putInt(MQFLAG_REPORT_ACTIVEDOCS); - } - - /** feature bits, taken from searchlib/common/transport.h */ - static final int MQF_QFLAGS = 0x00000002; - - /** flag bits, taken from searchlib/common/transport.h */ - static final int MQFLAG_REPORT_ACTIVEDOCS = 0x00000020; -} diff --git a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java deleted file mode 100644 index 37aaf7067a9..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.util.Optional; - -/** - * A pong packet for FS4. It maps to PCODE_MONITORRESULTX - * in the C++ implementation of the protocol. - * - * @author Steinar Knutsen - */ -public class PongPacket extends BasicPacket { - - private int dispatchTimestamp; - - @SuppressWarnings("unused") - private int totalNodes; // configured nodes - private Optional activeNodes = Optional.empty(); // number of nodes that are up - @SuppressWarnings("unused") - private int totalPartitions; // configured partitions - private Optional activePartitions = Optional.empty(); // number of partitions that are up - - private Optional activeDocuments = Optional.empty(); // how many documents are searchable (sum) - - public PongPacket() { - } - - /** For testing */ - public PongPacket(long activeDocuments) { - this.activeDocuments = Optional.of(activeDocuments); - } - - private int code; - protected void codeDecodedHook(int code) { this.code = code; } - public int getCode() { return code; } - - public void decodeBody(ByteBuffer buffer) { - int features = buffer.getInt(); - buffer.getInt(); // Unused lowPartitionId - dispatchTimestamp = buffer.getInt(); - if ((features & MRF_MLD) != 0) { - totalNodes = buffer.getInt(); - activeNodes = Optional.of(buffer.getInt()); - totalPartitions = buffer.getInt(); - activePartitions = Optional.of(buffer.getInt()); - } - if ((features & MRF_RFLAGS) != 0) { - buffer.getInt(); // ignore rflags (historical field) - } - if ((features & MRF_ACTIVEDOCS) != 0) { - activeDocuments = Optional.of(Long.valueOf(buffer.getLong())); - } - } - - public static PongPacket create() { - return new PongPacket(); - } - - /** - * Return current docstamp for backend to make cache invalidation - * possible. - * */ - public int getDocstamp() { - return dispatchTimestamp; - } - - /** - * retrieve the reported number of active (searchable) documents - * in the monitored backend. - **/ - public Optional getActiveDocuments() { - return activeDocuments; - } - - public Optional getActiveNodes() { - return activeNodes; - } - - public Optional getActivePartitions() { - return activePartitions; - } - - /** feature bits, taken from searchlib/common/transport.h */ - static final int MRF_MLD = 0x00000001; - static final int MRF_RFLAGS = 0x00000008; - static final int MRF_ACTIVEDOCS = 0x00000010; - - /** packet codes, taken from searchlib/common/transport.h */ - static final int PCODE_MONITORRESULTX = 221; - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java deleted file mode 100644 index d1d8b9a0f77..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import com.yahoo.compress.IntegerCompressor; -import com.yahoo.io.GrowableByteBuffer; -import com.yahoo.prelude.query.Item; -import com.yahoo.search.Query; -import com.yahoo.search.dispatch.Dispatcher; -import com.yahoo.search.grouping.vespa.GroupingExecutor; -import com.yahoo.search.query.Ranking; -import com.yahoo.searchlib.aggregation.Grouping; -import com.yahoo.text.Utf8String; -import com.yahoo.vespa.objects.BufferSerializer; - -import java.nio.ByteBuffer; -import java.util.List; - -/** - * An "extended query" packet. This is the query packets used today, - * they allow more flexible sets of parameters to be shipped with queries. - * This packet can be encoded only. - * - * @author bratseth - * @author Bjørn Borud - */ -public class QueryPacket extends Packet { - - private final String serverId; - private final Query query; - - private QueryPacketData queryPacketData; - - private QueryPacket(String serverId, Query query) { - this.serverId = serverId; - this.query = query; - } - - /** Returns the query from which this packet is populated */ - public Query getQuery() { - return query; - } - - /** - * Creates and returns a query packet - * - * @param query the query to convert to a packet - */ - public static QueryPacket create(String serverId, Query query) { - return new QueryPacket(serverId, query); - } - - - /** Returns the first offset requested */ - public int getOffset() { - return query.getOffset(); - } - - /** - * Returns the last offset requested (inclusively), that is - * getOffset() + getHits() - */ - public int getLastOffset() { - return getOffset() + getHits(); - } - - /** Returns the number of hits requested */ - public int getHits() { - return query.getHits(); - } - - public void encodeBody(ByteBuffer buffer) { - queryPacketData = new QueryPacketData(); - - boolean sendSessionKey = query.getGroupingSessionCache() || query.getRanking().getQueryCache(); - int featureFlag = getFeatureInt(sendSessionKey); - buffer.putInt(featureFlag); - - IntegerCompressor.putCompressedPositiveNumber(getOffset(), buffer); - IntegerCompressor.putCompressedPositiveNumber(getHits(), buffer); - buffer.putInt(Math.max(1, (int)query.getTimeLeft())); // Safety to avoid sending down 0 or negative number - buffer.putInt(getFlagInt()); - int startOfFieldToSave = buffer.position(); - Item.putString(query.getRanking().getProfile(), buffer); - queryPacketData.setRankProfile(buffer, startOfFieldToSave); - - if ( (featureFlag & QF_PROPERTIES) != 0) { - startOfFieldToSave = buffer.position(); - query.encodeAsProperties(buffer, true); - queryPacketData.setPropertyMaps(buffer, startOfFieldToSave); - } - - // Language not needed when sending query stacks - - if ((featureFlag & QF_SORTSPEC) != 0) { - int sortSpecLengthPosition=buffer.position(); - buffer.putInt(0); - int sortSpecLength = query.getRanking().getSorting().encode(buffer); - buffer.putInt(sortSpecLengthPosition, sortSpecLength); - } - - if ( (featureFlag & QF_GROUPSPEC) != 0) { - List groupingList = GroupingExecutor.getGroupingList(query); - BufferSerializer gbuf = new BufferSerializer(new GrowableByteBuffer()); - gbuf.putInt(null, groupingList.size()); - for (Grouping g: groupingList){ - g.serialize(gbuf); - } - gbuf.getBuf().flip(); - byte[] blob = new byte [gbuf.getBuf().limit()]; - gbuf.getBuf().get(blob); - buffer.putInt(blob.length); - buffer.put(blob); - } - - if (sendSessionKey) { - Utf8String key = query.getSessionId(serverId).asUtf8String(); - buffer.putInt(key.getByteLength()); - buffer.put(key.getBytes()); - } - - if ((featureFlag & QF_LOCATION) != 0) { - startOfFieldToSave = buffer.position(); - int locationLengthPosition=buffer.position(); - buffer.putInt(0); - int locationLength= query.getRanking().getLocation().encode(buffer); - buffer.putInt(locationLengthPosition, locationLength); - queryPacketData.setLocation(buffer, startOfFieldToSave); - } - - startOfFieldToSave = buffer.position(); - int stackItemPosition=buffer.position(); - buffer.putInt(0); // Number of stack items written below - int stackLengthPosition = buffer.position(); - buffer.putInt(0); - int stackPosition = buffer.position(); - int stackItemCount=query.encode(buffer); - int stackLength = buffer.position() - stackPosition; - buffer.putInt(stackItemPosition,stackItemCount); - buffer.putInt(stackLengthPosition, stackLength); - queryPacketData.setQueryStack(buffer, startOfFieldToSave); - } - - /** - * feature bits, taken from searchlib/common/transport.h - */ - private static final int QF_PARSEDQUERY = 0x00000002; - private static final int QF_RANKP = 0x00000004; - private static final int QF_SORTSPEC = 0x00000080; - private static final int QF_LOCATION = 0x00000800; - private static final int QF_PROPERTIES = 0x00100000; - private static final int QF_GROUPSPEC = 0x00400000; - private static final int QF_SESSIONID = 0x00800000; - - private int getFeatureInt(boolean sendSessionId) { - int features = QF_PARSEDQUERY | QF_RANKP; // this bitmask means "parsed query" in query packet. - // And rank properties. Both are always present - - features |= (query.getRanking().getSorting() != null) ? QF_SORTSPEC : 0; - features |= (query.getRanking().getLocation() != null) ? QF_LOCATION : 0; - features |= (query.hasEncodableProperties()) ? QF_PROPERTIES : 0; - features |= GroupingExecutor.hasGroupingList(query) ? QF_GROUPSPEC : 0; - features |= (sendSessionId) ? QF_SESSIONID : 0; - - return features; - } - - /** - * query flag bits, taken from searchlib/common/transport.h - */ - private static final int QFLAG_EXTENDED_COVERAGE = 0x00000001; - private static final int QFLAG_COVERAGE_NODES = 0x00000002; - private static final int QFLAG_ESTIMATE = 0x00000080; - private static final int QFLAG_DROP_SORTDATA = 0x00004000; - private static final int QFLAG_NO_RESULTCACHE = 0x00010000; - private static final int QFLAG_DUMP_FEATURES = 0x00040000; - - private int getFlagInt() { - int flags = getQueryFlags(query); - queryPacketData.setQueryFlags(flags); - flags |= query.properties().getBoolean(Dispatcher.dispatchInternal, false) ? 0 : QFLAG_DROP_SORTDATA; - return flags; - } - - - public int getCode() { - return 218; - } - - public String toString() { - return "Query x packet [query: " + query + "]"; - } - - static int getQueryFlags(Query query) { - int flags = QFLAG_EXTENDED_COVERAGE | QFLAG_COVERAGE_NODES; - - flags |= query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE) ? QFLAG_ESTIMATE : 0; - flags |= query.getNoCache() ? QFLAG_NO_RESULTCACHE : 0; - flags |= query.properties().getBoolean(Ranking.RANKFEATURES, false) ? QFLAG_DUMP_FEATURES : 0; - return flags; - } - - /** - * Fetch a binary wrapper containing data from encoding process for use in - * creating a summary request. - * - * @return wrapper object suitable for creating a summary fetch packet - * @throws IllegalStateException if no wrapper has been generated - */ - public QueryPacketData getQueryPacketData() { - if (queryPacketData == null) - throw new IllegalStateException("Trying to fetch a hit tag without having encoded the packet first."); - return queryPacketData; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java b/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java deleted file mode 100644 index 673b9cc0c47..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/QueryPacketData.java +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; - -/** - * Class for storing data which has to be constant between query and summary - * fetch for a Vespa hit. Used to avoid to tagging Vespa summary hits with - * the entire query as an immutable. - * - * @author Steinar Knutsen - */ -public final class QueryPacketData { - - private byte[] rankProfile = null; - private int queryFlags = 0; - private byte[] queryStack = null; - private byte[] location = null; - private byte[] propertyMaps = null; - - /** - * Given src.position() bigger than startOfField, allocate a fresh byte - * array, and copy the data from startOfField to src.position() into it. - * - * @param src - * the ByteBuffer to copy from - * @param startOfField - * the position of the buffer at which the field starts - * @return a copy of the data between startOfField and the buffer position - * before invokation - * @throws IllegalArgumentException - * if startOfField is somewhere after src.position() - */ - private byte[] copyField(final ByteBuffer src, final int startOfField) { - if (startOfField > src.position()) { - throw new IllegalArgumentException("startOfField after src.position()"); - } - final byte[] dst = new byte[src.position() - startOfField]; - - src.position(startOfField); - src.get(dst); - return dst; - } - - ByteBuffer encodeRankProfile(final ByteBuffer buffer) { - return buffer.put(rankProfile); - } - - void setRankProfile(final ByteBuffer src, final int startOfField) { - rankProfile = copyField(src, startOfField); - } - - ByteBuffer encodeQueryFlags(final ByteBuffer buffer) { - return buffer.putInt(queryFlags); - } - - void setQueryFlags(final int queryFlags) { - this.queryFlags = queryFlags; - } - - ByteBuffer encodeQueryStack(final ByteBuffer buffer) { - return buffer.put(queryStack); - } - - void setQueryStack(final ByteBuffer src, final int startOfField) { - queryStack = copyField(src, startOfField); - } - - ByteBuffer encodePropertyMaps(final ByteBuffer buffer) { - if (propertyMaps != null) { - buffer.put(propertyMaps); - } - return buffer; - } - - void setPropertyMaps(final ByteBuffer src, final int startOfField) { - propertyMaps = copyField(src, startOfField); - } - - void setLocation(final ByteBuffer src, final int startOfField) { - this.location = copyField(src, startOfField); - } - - ByteBuffer encodeLocation(final ByteBuffer buffer) { - if (location != null) { - buffer.put(location); - } - return buffer; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java deleted file mode 100644 index 6a27beefb5e..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4; - -import java.nio.ByteBuffer; -import java.nio.IntBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * A query result packet (code 217). This packet can be decoded only. - * - * @author bratseth - */ -public class QueryResultPacket extends Packet { - - /** The code of this type of package */ - private static final int code = 217; - - /** Whether mld data is included in this result */ - private boolean mldFeature = false; - - /** Whether sort data is included in this result */ - private boolean sortData = false; - - /** Whether coverage information is included in this result */ - private boolean coverageNodes = false; - private long coverageDocs = 0; - private long activeDocs = 0; - private long soonActiveDocs = 0; - private int degradedReason = 0; - private short nodesQueried = 0; - private short nodesReplied = 0; - - /** Whether the result contains grouping results **/ - private boolean groupDataFeature = false; - - /** Whether the result contains properties **/ - private boolean propsFeature = false; - - private long totalDocumentCount; - - private Number maxRank; - - private int docstamp; - - private byte[] groupData = null; - - private List documents=new ArrayList<>(10); - - public FS4Properties[] propsArray; - - private int offset; - - private QueryResultPacket() { } - - public static QueryResultPacket create() { - return new QueryResultPacket(); - } - - public void setDocstamp(int docstamp){ this.docstamp=docstamp; } - - public int getDocstamp() { return docstamp; } - - /** Returns whether this has the mysterious mld feature */ - public boolean getMldFeature() { return mldFeature; } - - public boolean getCoverageFeature() { return true; } - - public long getCoverageDocs() { return coverageDocs; } - - public long getActiveDocs() { return activeDocs; } - - public long getSoonActiveDocs() { return soonActiveDocs; } - - public int getDegradedReason() { return degradedReason; } - - public boolean getCoverageFull() { - return coverageDocs == activeDocs; - } - - - /** @return offset returned by backend */ - public int getOffset() { return offset; } - - /** Only for testing. */ - public void setOffset(int offset) { - this.offset = offset; - } - - @Override - public void decodeBody(ByteBuffer buffer) { - IntBuffer ints = buffer.asIntBuffer(); - decodeFeatures(ints); - offset = ints.get(); - int documentCount = ints.get(); - buffer.position(buffer.position() + ints.position() * 4); - totalDocumentCount = buffer.getLong(); - maxRank = decodeMaxRank(buffer); - ints = buffer.asIntBuffer(); - docstamp = ints.get(); - buffer.position(buffer.position() + ints.position() * 4); - // do not access "ints" below here! - - if (coverageNodes) { - nodesQueried = buffer.getShort(); - nodesReplied = buffer.getShort(); - } - - byte[][] documentSortData = null; - if (sortData && documentCount > 0) { - documentSortData = decodeSortData(buffer, documentCount); - } - - if (groupDataFeature) { - int len = buffer.getInt(); - groupData = new byte[len]; - buffer.get(groupData); - } - - coverageDocs = buffer.getLong(); - activeDocs = buffer.getLong(); - soonActiveDocs = buffer.getLong(); - degradedReason = buffer.getInt(); - - decodeDocuments(buffer, documentCount, documentSortData); - if (propsFeature) { - int numMaps = buffer.getInt(); - propsArray = new FS4Properties[numMaps]; - for (int i = 0; i < numMaps; i++) { - propsArray[i] = new FS4Properties(); - propsArray[i].decode(buffer); - } - } - } - - private byte[][] decodeSortData(ByteBuffer buffer, int documentCount) { - int[] indexes = new int[documentCount]; - indexes[0] = 0; - for (int i = 1; i < documentCount; i++) { - indexes[i] = buffer.getInt(); - } - - int sortDataLengthInBytes = buffer.getInt(); - byte[][] ret = new byte[indexes.length][]; - - for (int i = 0; i < indexes.length; i++) { - int end = i + 1 >= indexes.length ? sortDataLengthInBytes : indexes[i + 1]; - int len = end - indexes[i]; - ret[i] = new byte[len]; - buffer.get(ret[i], 0, len); - } - return ret; - } - - private Number decodeMaxRank(ByteBuffer buffer) { - return Double.valueOf(buffer.getDouble()); - } - - /** - * feature bits - */ - private static final int QRF_MLD = 0x00000001; - private static final int QRF_COVERAGE_NODES = 0x00000002; - private static final int QRF_SORTDATA = 0x00000010; - private static final int QRF_UNUSED_1 = 0x00000020; - private static final int QRF_UNUSED_2 = 0x00000040; - private static final int QRF_GROUPDATA = 0x00000200; - private static final int QRF_PROPERTIES = 0x00000400; - - /** Decodes the feature int of this package data into boolean feature fields */ - private void decodeFeatures(IntBuffer buffer) { - int features = buffer.get(); - mldFeature = (QRF_MLD & features) != 0; - sortData = (QRF_SORTDATA & features) != 0; - coverageNodes = (QRF_COVERAGE_NODES & features) != 0; - groupDataFeature = (QRF_GROUPDATA & features) != 0; - propsFeature = (QRF_PROPERTIES & features) != 0; - } - - private void decodeDocuments(ByteBuffer buffer, int documentCount, byte[][] documentSortData) { - for (int i = 0; i < documentCount; i++) { - byte[] sort = documentSortData == null ? null : documentSortData[i]; - documents.add(new DocumentInfo(buffer, this, sort)); - } - } - - public int getCode() { return code; } - - protected void codeDecodedHook(int code) { - if ( code != QueryResultPacket.code) - throw new RuntimeException("Programming error, packet " + getCode() + "Not expected."); - } - - public int getDocumentCount() { return documents.size(); } - - public String toString() { - return "Query result x packet [" + getDocumentCount() + " documents]"; - } - - /** Returns the opaque grouping results **/ - public byte[] getGroupData() { return groupData; } - - - /** Returns the total number of documents avalable for this query */ - public long getTotalDocumentCount() { return totalDocumentCount; } - - /** Only for testing. */ - public void setTotalDocumentCount(long totalDocumentCount) { - this.totalDocumentCount = totalDocumentCount; - } - - /** Returns a read-only list containing the DocumentInfo objects of this result */ - public List getDocuments() { - return Collections.unmodifiableList(documents); - } - - public void addDocument(DocumentInfo document) { - documents.add(document); - } - - // TODO: Handle new maxRank intelligently - public int getMaxRank() { return maxRank.intValue(); } - - public short getNodesQueried() { return nodesQueried; } - public short getNodesReplied() { return nodesReplied; } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java deleted file mode 100644 index 12f8e9e387d..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java +++ /dev/null @@ -1,449 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.Packet; -import com.yahoo.fs4.PacketDumper; -import com.yahoo.fs4.PacketListener; -import com.yahoo.fs4.PacketNotificationsBroadcaster; -import com.yahoo.fs4.PacketQueryTracer; -import com.yahoo.io.Connection; -import com.yahoo.io.ConnectionFactory; -import com.yahoo.io.Listener; -import com.yahoo.vespa.defaults.Defaults; -import com.yahoo.yolean.Exceptions; -import com.yahoo.yolean.concurrent.ConcurrentResourcePool; -import com.yahoo.yolean.concurrent.ResourceFactory; -import com.yahoo.yolean.concurrent.ResourcePool; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * @author Bjorn Borud - */ -public class Backend implements ConnectionFactory { - - private static int DEFAULT_BUFFER_SIZE = 0x8000; - - public static final class BackendStatistics { - - public final int activeConnections; - public final int passiveConnections; - - public BackendStatistics(int activeConnections, int passiveConnections) { - this.activeConnections = activeConnections; - this.passiveConnections = passiveConnections; - } - - @Override - public String toString() { - return activeConnections + "/" + totalConnections(); - } - - public int totalConnections() { - return activeConnections + passiveConnections; - } - } - - private static final Logger log = Logger.getLogger(Backend.class.getName()); - - private final ListenerPool listeners; - private final InetSocketAddress address; - private final String host; - private final int port; - private final Map activeChannels = new HashMap<>(); - private int channelId = 0; - private boolean shutdownInitiated = false; - - /** Whether we are currently in the state of not being able to connect, to avoid repeated logging */ - private boolean areInSocketNotConnectableState = false; - - private final LinkedList pingChannels = new LinkedList<>(); - private final PacketListener packetListener; - private final ConnectionPool connectionPool; - private final PacketDumper packetDumper; - private final AtomicInteger connectionCount = new AtomicInteger(0); - private final ConcurrentResourcePool byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() { - @Override - public ByteBuffer create() { - return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - } - }); - - /** - * For unit testing. do not use - */ - protected Backend() { - listeners = null; - host = null; - port = 0; - packetListener = null; - packetDumper = null; - address = null; - connectionPool = new ConnectionPool(); - } - - public Backend(String host, - int port, - String serverDiscriminator, - ListenerPool listenerPool, - ConnectionPool connectionPool) { - String fileNamePattern = "qrs." + serverDiscriminator + '.' + host + ":" + port + ".%s" + ".dump"; - packetDumper = new PacketDumper(new File(Defaults.getDefaults().underVespaHome("logs/vespa/qrs/")), - fileNamePattern); - packetListener = new PacketNotificationsBroadcaster(packetDumper, new PacketQueryTracer()); - this.listeners = listenerPool; - this.host = host; - this.port = port; - address = new InetSocketAddress(host, port); - this.connectionPool = connectionPool; - } - - private void logWarning(String attemptDescription, Exception e) { - log.log(Level.WARNING, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); - } - - private void logInfo(String attemptDescription, Exception e) { - log.log(Level.INFO, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); - } - - // ============================================================ - // ==== connection pool stuff - // ============================================================ - - /** - * Fetch a connection from the connection pool. If the pool - * is empty we create a connection. - */ - private FS4Connection getConnection() throws IOException { - FS4Connection connection = connectionPool.getConnection(); - if (connection == null) { - // if pool was empty create one: - connection = createConnection(); - } - return connection; - } - - ConcurrentResourcePool getBufferPool() { - return byteBufferResourcePool; - } - - /** - * Return a connection to the connection pool. If the - * connection is not valid anymore we drop it, ie. do not - * put it into the pool. - */ - public void returnConnection(FS4Connection connection) { - connectionPool.releaseConnection(connection); - } - - /** - * Create a new connection to the target for this backend. - */ - private FS4Connection createConnection() throws IOException { - SocketChannel socket = SocketChannel.open(); - try { - connectSocket(socket); - } catch (Exception e) { - // was warning, see VESPA-1922 - if ( ! areInSocketNotConnectableState) { - logInfo("connecting to", e); - } - areInSocketNotConnectableState = true; - socket.close(); - return null; - } - areInSocketNotConnectableState = false; - int listenerId = connectionCount.getAndIncrement()%listeners.size(); - Listener listener = listeners.get(listenerId); - FS4Connection connection = new FS4Connection(socket, listener, this, packetListener); - listener.registerConnection(connection); - - log.fine("Created new connection to " + host + ":" + port); - connectionPool.createdConnection(); - return connection; - } - - private void connectSocket(SocketChannel socket) throws IOException { - socket.configureBlocking(false); - - boolean connected = socket.connect(address); - - // wait for connection - if (!connected) { - long timeBarrier = System.currentTimeMillis() + 20L; - while (true) { - try { - Thread.sleep(5L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Received InterruptedException while waiting for socket to connect.", e); - } - // don't care whether it's spurious wakeup - connected = socket.finishConnect(); - if (connected || System.currentTimeMillis() > timeBarrier) { - break; - } - } - } - - // did we get a connection? - if ( !connected) { - throw new IllegalArgumentException("Could not create connection to dispatcher on " - + address.getHostName() + ":" + address.getPort()); - } - socket.socket(). - setTcpNoDelay(true); - } - - - //============================================================ - //==== channel management - //============================================================ - - /** Opens a new channel to fdispatch. Analogous to the "Channel" concept as used in FS4. */ - public FS4Channel openChannel() { - int cachedChannelId; - synchronized (this) { - if (channelId >= ((1 << 31) - 2)) { - channelId = 0; - } - cachedChannelId = channelId; - channelId += 2; - } - Integer id = cachedChannelId; - FS4Channel chan = new FS4Channel(this, id); - synchronized (activeChannels) { - activeChannels.put(id, chan); - } - return chan; - } - - public FS4Channel openPingChannel() { - FS4Channel chan = FS4Channel.createPingChannel(this); - synchronized (pingChannels) { - pingChannels.add(chan); - } - return chan; - } - - /** - * Get the remote address for this Backend. This method - * has package access only, because it is really only of - * importance to FS4Channel for writing slightly more sensible - * log messages. - * @return Returns the address (host, port) for this Backend. - */ - InetSocketAddress getAddress() { - return address; - } - - /** - * Get an active channel by id. - * - * @param id the (fs4) channel id - * @return returns the (fs4) channel associated with this id - * or null if the channel is not in the - * set of active channels. - */ - public FS4Channel getChannel(Integer id) { - synchronized (activeChannels) { - return activeChannels.get(id); - } - } - - /** - * Return the first channel in the queue waiting for pings or - * null if none. - */ - public FS4Channel getPingChannel() { - synchronized (pingChannels) { - return (pingChannels.isEmpty()) ? null : pingChannels.getFirst(); - } - } - - /** - * Get an active channel by id. This is a wrapper for the method - * that takes the id as an Integer. - * - * @param id The (fs4) channel id - * @return Returns the (fs4) channel associated with this id - * or null if the channel is not in the - * set of active channels. - */ - public FS4Channel getChannel(int id) { - return getChannel(Integer.valueOf(id)); - } - - /** - * Remove a channel. We do not want this method to be called - * directly by the client -- removal of channels should be done - * by calling the close() method of the channel. - * - * @param id The (fs4) channel id - * @return Removes and returns the (fs4) channel associated - * with this id or null if the channel is - * not in the set of active channels. - */ - protected FS4Channel removeChannel(Integer id) { - synchronized (activeChannels) { - return activeChannels.remove(id); - } - } - - /** - * Remove a ping channel. We do not want this method to be called - * directly by the client -- removal of channels should be done - * by calling the close() method of the channel. - * - * @return Removes and returns the (fs4) channel first in - * the queue of ping channels or null - * if there are no active ping channels. - */ - protected FS4Channel removePingChannel() { - synchronized (pingChannels) { - if (pingChannels.isEmpty()) - return null; - return pingChannels.removeFirst(); - } - } - //============================================================ - //==== packet sending and reception - //============================================================ - - protected boolean sendPacket(BasicPacket packet, Integer channelId) throws IOException { - if (shutdownInitiated) { - log.fine("Tried to send packet after shutdown initiated. Ignored."); - return false; - } - - FS4Connection connection = null; - try { - connection = getConnection(); - if (connection == null) { - return false; - } - connection.sendPacket(packet, channelId); - } - finally { - if (connection != null) { - returnConnection(connection); - } - } - - return true; - } - - /** - * When a connection receives a packet, it uses this method to - * dispatch the packet to the right FS4Channel. If the corresponding - * FS4Channel does not exist the packet is dropped and a message is - * logged saying so. - */ - protected void receivePacket(BasicPacket packet) { - FS4Channel fs4; - if (packet.hasChannelId()) - fs4 = getChannel(((Packet)packet).getChannel()); - else - fs4 = getPingChannel(); - - // channel does not exist - if (fs4 == null) { - return; - } - try { - fs4.addPacket(packet); - } - catch (InterruptedException e) { - log.info("Interrupted during packet adding. Packet = " + packet.toString()); - Thread.currentThread().interrupt(); - } - catch (InvalidChannelException e) { - log.log(Level.WARNING, "Channel was invalid. Packet = " + packet.toString() - + " Backend probably sent data pertaining an old request," - + " system may be overloaded."); - } - } - - /** - * Attempt to establish a connection without sending messages and then - * return it to the pool. The assumption is that if the probing is - * successful, the connection will be used soon after. There should be - * minimal overhead since the connection is cached. - */ - public boolean probeConnection() { - if (shutdownInitiated) { - return false; - } - - FS4Connection connection = null; - try { - connection = getConnection(); - } catch (IOException ignored) { - // connection is null - } finally { - if (connection != null) { - returnConnection(connection); - } - } - - return connection != null; - } - - /** - * This method should be used to ensure graceful shutdown of the backend. - */ - public void shutdown() { - log.fine("shutting down"); - if (shutdownInitiated) { - throw new IllegalStateException("Shutdown already in progress"); - } - shutdownInitiated = true; - } - - public void close() { - for (Connection c = connectionPool.getConnection(); c != null; c = connectionPool.getConnection()) { - try { - c.close(); - } catch (IOException e) { - logWarning("closing", e); - } - } - } - - /** - * Connection factory used by the Listener class. - */ - public Connection newConnection(SocketChannel channel, Listener listener) { - return new FS4Connection(channel, listener, this, packetListener); - } - - public String toString () { - return("Backend/" + host + ":" + port); - } - - public BackendStatistics getStatistics() { - synchronized (connectionPool) { //ensure consistent values - return new BackendStatistics(connectionPool.activeConnections(), connectionPool.passiveConnections()); - } - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java deleted file mode 100644 index e84adfbef2c..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Logger; -import java.util.Iterator; -import java.util.Timer; -import java.util.TimerTask; - -import com.yahoo.log.LogLevel; -/** - * Pool of FS4 connections. - * - * @author Tony Vaagenes - */ -public class ConnectionPool { - - private final static int CLEANINGPERIOD = 1000; // Execute every second - private final Queue connections = new ConcurrentLinkedQueue<>(); - private final AtomicInteger activeConnections = new AtomicInteger(0); - private final AtomicInteger passiveConnections = new AtomicInteger(0); - private static final Logger log = Logger.getLogger(ConnectionPool.class.getName()); - - class PoolCleanerTask extends TimerTask { - private final ConnectionPool connectionPool; - public PoolCleanerTask(ConnectionPool connectionPool) { - this.connectionPool = connectionPool; - } - - public void run() { - try { - connectionPool.dropInvalidConnections(); - } catch (Exception e) { - log.log(LogLevel.WARNING, - "Caught exception in connection pool cleaner, ignoring.", - e); - } - } - } - - public ConnectionPool() { - } - - public ConnectionPool(Timer timer) { - timer.schedule(new PoolCleanerTask(this), CLEANINGPERIOD, CLEANINGPERIOD); - } - - private void dropInvalidConnections() { - for (Iterator i = connections.iterator(); i.hasNext();) { - FS4Connection connection = i.next(); - if (!connection.isValid()) { - i.remove(); - } - } - } - - private FS4Connection registerAsActiveIfNonZero(FS4Connection connection) { - activeConnections.incrementAndGet(); - passiveConnections.decrementAndGet(); - return connection; - } - - public FS4Connection getConnection() { - return registerAsActiveIfNonZero(connections.poll()); - } - - void releaseConnection(FS4Connection connection) { - assert(connection != null); - activeConnections.decrementAndGet(); - if (connection.isValid()) { - passiveConnections.incrementAndGet(); - connections.add(connection); - } - } - - void createdConnection() { - activeConnections.incrementAndGet(); - } - - int activeConnections() { - return activeConnections.get(); - } - - //unused connections in the pool - int passiveConnections() { - return passiveConnections.get(); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java deleted file mode 100644 index adfc63d02f7..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java +++ /dev/null @@ -1,255 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.concurrent.SystemTimer; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.Packet; -import com.yahoo.search.Query; -import com.yahoo.search.dispatch.ResponseMonitor; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * This class is used to represent a "channel" in the FS4 protocol. - * A channel represents a session between a client and the fdispatch. - * Internally this class has a response queue used by the backend - * for queueing up FS4 packets that belong to this channel (or - * session, which might be a more appropriate name for it). - * Outbound packets are handed off to the FS4Connection. - * - * @author Bjorn Borud - */ -public class FS4Channel { - - private static Logger log = Logger.getLogger(FS4Channel.class.getName()); - - private Integer channelId; - private Backend backend; - volatile private BlockingQueue responseQueue; - private Query query; - private boolean isPingChannel = false; - private ResponseMonitor monitor; - - /** for unit testing. do not use */ - protected FS4Channel () { - } - - protected FS4Channel(Backend backend, Integer channelId) { - this.channelId = channelId; - this.backend = backend; - this.responseQueue = new LinkedBlockingQueue<>(); - } - - static public FS4Channel createPingChannel(Backend backend) { - FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0)); - pingChannel.isPingChannel = true; - return pingChannel; - } - - /** Set the query currently associated with this channel */ - public void setQuery(Query query) { - this.query = query; - } - - /** Get the query currently associated with this channel */ - public Query getQuery() { - return query; - } - - /** Returns the (fs4) channel id */ - public Integer getChannelId () { - return channelId; - } - - /** - * Closes the channel - */ - public void close () { - BlockingQueue q = responseQueue; - responseQueue = null; - query = null; - if (isPingChannel) { - backend.removePingChannel(); - } else { - backend.removeChannel(channelId); - } - if (q != null) { - q.clear(); - } - } - - /** - * Legacy interface. - */ - public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { - ensureValid(); - return backend.sendPacket(packet, channelId); - } - - /** - * Receives the given number of packets and returns them, OR - *
    - *
  • Returns a smaller number of packets if an error or eol packet is received - *
  • Throws a ChannelTimeoutException if timeout occurs before all packets - * are received. Packets received with the wrong channel id are ignored. - *
- * - * @param timeout the number of ms to attempt to get packets before throwing an exception - * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error - */ - public BasicPacket[] receivePackets(long timeout, int packetCount) - throws InvalidChannelException, ChannelTimeoutException { - ensureValid(); - - List packets = new ArrayList<>(12); - long startTime = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeout; - - try { - while (timeLeft >= 0) { - BasicPacket p = nextPacket(timeLeft); - if (p == null) throw new ChannelTimeoutException("Timed out"); - - if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) { - log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId()); - continue; - } - - packets.add(p); - if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) { - BasicPacket[] packetArray = new BasicPacket[packets.size()]; - packets.toArray(packetArray); - return packetArray; - } - - // doing this last might save us one system call for the last - // packet. - timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime); - } - } - catch (InvalidChannelException e) { - // nop. if we get this we want to return the default - // zero length packet array indicating that we have no - // valid response - log.info("FS4Channel was invalid. timeLeft=" - + timeLeft + ", timeout=" + timeout); - } - catch (InterruptedException e) { - log.info("FS4Channel was interrupted. timeLeft=" - + timeLeft + ", timeout=" + timeout); - Thread.currentThread().interrupt(); - } - - // default return, we only hit this if we timed out and - // did not get the end of the packet stream - throw new ChannelTimeoutException(); - } - - private static boolean hasEnoughPackets(int packetCount,List packets) { - if (packetCount<0) return false; - return packets.size()>=packetCount; - } - - /** - * Returns true if we will definitely receive more packets on this stream - * - * Shouldn't that be "_not_ receive more packets"? - */ - private static boolean isLastPacket (BasicPacket packet) { - if (packet instanceof com.yahoo.fs4.ErrorPacket) return true; - if (packet instanceof com.yahoo.fs4.EolPacket) return true; - if (packet instanceof com.yahoo.fs4.PongPacket) return true; - return false; - } - - /** - * Return the next available packet from the response queue. If there - * are no packets available we wait a maximum of timeout - * milliseconds before returning a null - * - * @param timeout Number of milliseconds to wait for a packet - * to become available. - * - * @return Returns the next available BasicPacket or - * null if we timed out. - */ - public BasicPacket nextPacket(long timeout) - throws InterruptedException, InvalidChannelException - { - return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS); - } - - /** - * Add incoming packet to the response queue. This is to be used - * by the listener for placing incoming packets in the response - * queue. - * - * @param packet BasicPacket to be placed in the response queue. - * - */ - protected void addPacket (BasicPacket packet) - throws InterruptedException, InvalidChannelException - { - ensureValidQ().put(packet); - notifyMonitor(); - } - - /** - * A valid FS4Channel is one that has not yet been closed. - * - * @return Returns true if the FS4Channel is valid. - */ - public boolean isValid () { - return responseQueue != null; - } - - /** - * This method is called whenever we want to perform an operation - * which assumes that the FS4Channel object is valid. An exception - * is thrown if the opposite turns out to be the case. - * - * @throws InvalidChannelException if the channel is no longer valid. - */ - private void ensureValid () throws InvalidChannelException { - if (isValid()) { - return; - } - throw new InvalidChannelException("Channel is no longer valid"); - } - - /** - * This method is called whenever we want to perform an operation - * which assumes that the FS4Channel object is valid. An exception - * is thrown if the opposite turns out to be the case. - * - * @throws InvalidChannelException if the channel is no longer valid. - */ - private BlockingQueue ensureValidQ () throws InvalidChannelException { - BlockingQueue q = responseQueue; - if (q != null) { - return q; - } - throw new InvalidChannelException("Channel is no longer valid"); - } - - public String toString() { - return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]"); - } - - public void setResponseMonitor(ResponseMonitor monitor) { - this.monitor = monitor; - } - - protected void notifyMonitor() { - if(monitor != null) { - monitor.responseAvailable(this); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java deleted file mode 100644 index 7dcbefde9fa..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.fs4.mplex; - - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.LinkedList; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.BufferTooSmallException; -import com.yahoo.fs4.PacketDecoder; -import com.yahoo.fs4.PacketListener; -import com.yahoo.io.Connection; -import com.yahoo.io.Listener; -import com.yahoo.log.LogLevel; - -/** - * - * This class is used to represent a connection to an fdispatch - * - * @author Bjorn Borud - */ -public class FS4Connection implements Connection -{ - private static Logger log = Logger.getLogger(FS4Connection.class.getName()); - private Backend backend; - private Listener listener; - private SocketChannel channel; - - private boolean shouldWrite = false; - - private static int idCounter = 1; - private int idNumber; - - // outbound data - private ByteBuffer writeBuffer; - private LinkedList writeBufferList = new LinkedList<>(); - - // inbound data - private ByteBuffer fixedReadBuffer = ByteBuffer.allocateDirect(256 * 1024); - private ByteBuffer readBuffer = fixedReadBuffer; - - private volatile boolean valid = true; - - private final PacketListener packetListener; - - - /** - * Create an FS4 Connection. - */ - public FS4Connection (SocketChannel channel, Listener listener, Backend backend, PacketListener packetListener) { - this.backend = backend; - this.listener = listener; - this.channel = channel; - this.idNumber = idCounter++; - this.packetListener = packetListener; - - log.log(Level.FINER, "new: "+this+", id="+idNumber + ", address=" + backend.getAddress()); - } - - - /** - * Packet sending interface. - */ - public void sendPacket (BasicPacket packet, Integer channelId) throws IOException { - ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc()); - ByteBuffer viewForPacketListener = buffer.slice(); - synchronized (this) { - if (!(valid && channel.isOpen())) { - throw new IllegalStateException("Connection is not valid. " + - "Address = " + backend.getAddress() + - ", valid = " + valid + - ", isOpen = " + channel.isOpen()); - } - - if (writeBuffer == null) { - writeBuffer = buffer; - } else { - writeBufferList.addLast(buffer); - enableWrite(); - } - write(); - } - - if (packetListener != null) - packetListener.packetSent(backend.getChannel(channelId), packet, viewForPacketListener); - } - - - /** - * The write event handler. This can be called both from the client - * thread and from the IO thread, so it needs to be synchronized. It - * assumes that IO is nonblocking, and will attempt to keep writing - * data until the system won't accept more data. - * - */ - public synchronized void write () throws IOException { - if (! channel.isOpen()) { - throw new IllegalStateException("Channel not open in write(), address=" + backend.getAddress()); - } - - try { - int bytesWritten = 0; - boolean isFinished = false; - do { - // if writeBuffer is not set we need to fetch the next buffer - if (writeBuffer == null) { - - // if the list is empty, signal the selector we do not need - // to do any writing for a while yet and bail - if (writeBufferList.isEmpty()) { - disableWrite(); - isFinished = true; - break; - } - writeBuffer = writeBufferList.removeFirst(); - } - - // invariants: we have a writeBuffer - bytesWritten = channel.write(writeBuffer); - - // buffer drained so we forget it and see what happens when we - // go around. if indeed we go around - if (!writeBuffer.hasRemaining()) { - writeBuffer.clear(); - backend.getBufferPool().free(writeBuffer); - writeBuffer = null; - } - } while (bytesWritten > 0); - if (!isFinished) { - enableWrite(); - } - } catch (IOException e) { - log.log(LogLevel.DEBUG, "Failed writing to channel for backend " + backend.getAddress() + - ". Closing channel", e); - try { - close(); - } catch (IOException ignored) {} - - throw e; - } - } - - - private void disableWrite() { - if (shouldWrite) { - listener.modifyInterestOpsBatch(this, SelectionKey.OP_WRITE, false); - shouldWrite = false; - } - } - - - private void enableWrite() { - if (!shouldWrite) { - listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true); - shouldWrite = true; - } - } - - - - public void read () throws IOException { - if (! channel.isOpen()) { - throw new IOException("Channel not open in read(), address=" + backend.getAddress()); - } - - int bytesRead = 0; - - do { - try { - if (readBuffer == fixedReadBuffer) { - bytesRead = channel.read(readBuffer); - } else { - fixedReadBuffer.clear(); - if (readBuffer.remaining() < fixedReadBuffer.capacity()) { - fixedReadBuffer.limit(readBuffer.remaining()); - } - bytesRead = channel.read(fixedReadBuffer); - fixedReadBuffer.flip(); - readBuffer.put(fixedReadBuffer); - fixedReadBuffer.clear(); - } - } - catch (IOException e) { - // this is the "normal" way that connection closes. - log.log(Level.FINER, "Read exception address=" + backend.getAddress() + " id="+idNumber+": "+ - e.getClass().getName()+" / ", e); - bytesRead = -1; - } - - // end of file - if (bytesRead == -1) { - log.log(LogLevel.DEBUG, "Dispatch closed connection" - + " (id="+idNumber+", address=" + backend.getAddress() + ")"); - try { - close(); - } catch (Exception e) { - log.log(Level.WARNING, "Close failed, address=" + backend.getAddress(), e); - } - } - - // no more read - if (bytesRead == 0) { - // buffer too small? - if (! readBuffer.hasRemaining()) { - log.fine("Buffer possibly too small, extending"); - readBuffer.flip(); - extendReadBuffer(readBuffer.capacity() * 2); - } - } - - } while (bytesRead > 0); - - readBuffer.flip(); - - // hand off packet extraction - extractPackets(readBuffer); - } - - private void extractPackets(ByteBuffer readBuffer) { - for (;;) { - PacketDecoder.DecodedPacket packet = null; - try { - FS4Channel receiver = null; - int queryId = PacketDecoder.sniffChannel(readBuffer); - if (queryId == 0) { - if (PacketDecoder.isPongPacket(readBuffer)) - receiver = backend.getPingChannel(); - } - else { - receiver = backend.getChannel(Integer.valueOf(queryId)); - } - packet = PacketDecoder.extractPacket(readBuffer); - - if (packet != null) - packetListener.packetReceived(receiver, packet.packet, packet.consumedBytes); - } - catch (BufferTooSmallException e) { - log.fine("Unable to decode, extending readBuffer"); - extendReadBuffer(PacketDecoder.packetLength(readBuffer)); - return; - } - - // break out of loop if we did not get a packet out of the - // buffer so we can select and read some more - if (packet == null) { - - // if the buffer has been cleared, we can do a reset - // of the readBuffer - if ((readBuffer.position() == 0) - && (readBuffer.limit() == readBuffer.capacity())) - { - resetReadBuffer(); - } - break; - } - - backend.receivePacket(packet.packet); - } - } - - /** - * This is called when we close the connection to do any - * pending cleanup work. Closing a connection marks it as - * not valid. - */ - public void close () throws IOException { - valid = false; - channel.close(); - log.log(Level.FINER, "invalidated id="+idNumber + " address=" + backend.getAddress()); - } - - /** - * Upon asynchronous connect completion this method is called by - * the Listener. - */ - public void connect() throws IOException { - throw new RuntimeException("connect() was called, address=" + backend.getAddress() + ". " - + "asynchronous connect in NIO is flawed!"); - } - - /** - * Since we are performing an asynchronous connect we are initially - * only interested in the OP_CONNECT event. - */ - public int selectOps () { - return SelectionKey.OP_READ; - } - - /** - * Return the underlying SocketChannel used by this connection. - */ - public SocketChannel socketChannel() { - return channel; - } - - - public String toString () { - return FS4Connection.class.getName() + "/" + channel; - } - - - //============================================================ - //==== readbuffer management - //============================================================ - - - /** - * Extend the readBuffer. Make a new buffer of the requested size - * copy the contents of the readBuffer into it and assign reference - * to readBuffer instance variable. - * - *

- * The readBuffer needs to be in "readable" (flipped) state before - * this is called and it will be in the "writeable" state when it - * returns. - */ - private void extendReadBuffer (int size) { - // we specifically check this because packetLength() can return -1 - // and someone might alter the code so that we do in fact get -1 - // ...which never happens as the code is now - // - if (size == -1) { - throw new RuntimeException("Invalid buffer size requested: -1"); - } - - // if we get a size that is smaller than the current - // readBuffer capacity we just double it. not sure how wise this - // might be. - // - if (size < readBuffer.capacity()) { - size = readBuffer.capacity() * 2; - } - - ByteBuffer tmp = ByteBuffer.allocate(size); - tmp.put(readBuffer); - log.fine("Extended readBuffer to " + size + " bytes" - + "from " + readBuffer.capacity() + " bytes"); - readBuffer = tmp; - } - - /** - * Clear the readBuffer, and if temporarily allocated bigger - * buffer is in use: ditch it and reset the reference to the - * fixed readBuffer. - */ - private void resetReadBuffer () { - fixedReadBuffer.clear(); - if (readBuffer == fixedReadBuffer) { - return; - } - log.fine("Resetting readbuffer"); - readBuffer = fixedReadBuffer; - } - - /** - * This method is used to determine whether the connection is still - * viable or not. All connections are initially valid, but they - * become invalid if we close the connection or something bad happens - * and the connection needs to be ditched. - */ - public boolean isValid() { - return valid; - } - -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java deleted file mode 100644 index 6176d069645..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- - -package com.yahoo.fs4.mplex; - -/** - * @author Bj\u00f8rn Borud - */ -@SuppressWarnings("serial") -public class InvalidChannelException extends Exception -{ - public InvalidChannelException (String message) { - super(message); - } -} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java deleted file mode 100644 index d76d270dcb7..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.io.FatalErrorHandler; -import com.yahoo.io.Listener; - -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Pool of com.yahoo.io.Listener instances for shared use by Vespa backend - * searchers. - * - * @author baldersheim - * @since 5.3.0 - */ -public final class ListenerPool { - private final static Logger logger = Logger.getLogger(ListenerPool.class.getName()); - private final List listeners; - - public ListenerPool(String name, int numListeners) { - listeners = new ArrayList<>(numListeners); - FatalErrorHandler fatalErrorHandler = new FatalErrorHandler(); - for (int i = 0; i < numListeners; i++) { - Listener listener = new Listener(name + "-" + i); - listener.setFatalErrorHandler(fatalErrorHandler); - listener.start(); - listeners.add(listener); - } - } - - public Listener get(int index) { - return listeners.get(index); - } - - public int size() { - return listeners.size(); - } - - public void close() { - for (Listener listener : listeners) { - listener.interrupt(); - } - try { - for (Listener listener : listeners) { - listener.join(); - } - } catch (InterruptedException e) { - logger.log(Level.WARNING, "Got interrupted", e); - Thread.currentThread().interrupt(); - } - } - -} -- cgit v1.2.3