diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/BasicPacket.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/fs4/BasicPacket.java | 315 |
1 files changed, 315 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java new file mode 100644 index 00000000000..f87721dc503 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java @@ -0,0 +1,315 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.fastsearch.TimeoutException; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.logging.Logger; + +/** + * Superclass of fs4 packets + * + * @author bratseth + */ +public abstract class BasicPacket { + + private final Compressor compressor = new Compressor(); + + private static Logger log = Logger.getLogger(QueryResultPacket.class.getName()); + private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024); + public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags. + + private byte[] encodedBody; + + /** The length of this packet in bytes or -1 if not known */ + protected int length = -1; + + /** + * A timestamp which can be set or inspected by clients of this class + * but which is never updated by the class itself. This is mostly + * a convenience for when you need to queue packets or retain them + * in some structure where their validity is limited by a timeout + * or similar. + */ + private long timeStamp = -1; + + private int compressionLimit = 0; + + private CompressionType compressionType; + + /** + * Sets the number of bytes the package must be before activating compression. + * A value of 0 means no compression. + * + * @param limit smallest package size that triggers compression. + */ + public void setCompressionLimit(int limit) { compressionLimit = limit; } + + public void setCompressionType(String type) { + compressionType = CompressionType.valueOf(type); + } + + /** + * Fills this package from a byte buffer positioned at the first byte of the package + * + * @return this for convenience + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public BasicPacket decode(ByteBuffer buffer) { + length = buffer.getInt()+4; // Streamed packet length is the length-4 + int code = buffer.getInt(); + + decodeAndDecompressBody(buffer, code, length - 2*4); + return this; + } + + protected void decodeAndDecompressBody(ByteBuffer buffer, int code, int packetLength) { + byte compressionType = (byte)((code & ~CODE_MASK) >> 24); + boolean isCompressed = compressionType != 0; + codeDecodedHook(code & CODE_MASK); + if (isCompressed) { + int uncompressedSize = buffer.getInt(); + int compressedSize = packetLength - 4; + int offset = 0; + byte[] compressedData; + if (buffer.hasArray()) { + compressedData = buffer.array(); + offset = buffer.arrayOffset() + buffer.position(); + buffer.position(buffer.position() + compressedSize); + } else { + compressedData = new byte[compressedSize]; + buffer.get(compressedData); + } + byte[] body = compressor.decompress(CompressionType.valueOf(compressionType), compressedData, offset, + uncompressedSize, Optional.of(compressedSize)); + ByteBuffer bodyBuffer = ByteBuffer.wrap(body); + length += uncompressedSize - (compressedSize + 4); + decodeBody(bodyBuffer); + } else { + decodeBody(buffer); + } + } + + /** + * Decodes the body of this package from a byte buffer + * positioned at the first byte of the package. + * + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public void decodeBody(ByteBuffer buffer) { + throw new UnsupportedOperationException("Decoding of " + this + " is not implemented"); + } + + /** + * Called when the packet code is decoded. + * This default implementation just throws an exception if the code + * is not the code of this packet. Packets which has several possible codes + * will use this method to store the code. + */ + protected void codeDecodedHook(int code) { + if (code != getCode()) + throw new RuntimeException("Can not decode " + code + " into " + this); + } + + /** + * <p>Encodes this package onto the given buffer at the current position. + * The position of the buffer after encoding is the byte following + * the last encoded byte.</p> + * + * <p>This method will ensure that everything is written provided + * sufficient capacity regardless of the buffer limit. + * When returning, the limit is at the end of the package (qual to the + * position).</p> + * + * @return this for convenience + * @throws UnsupportedOperationException if not implemented in the subclass + */ + public BasicPacket encode(ByteBuffer buffer) throws BufferTooSmallException { + int oldLimit = buffer.limit(); + int startPosition = buffer.position(); + + buffer.limit(buffer.capacity()); + try { + buffer.putInt(4); // Real length written later, when we know it + buffer.putInt(getCode()); + + encodeAndCompressBody(buffer, startPosition); + } + catch (java.nio.BufferOverflowException e) { + // reset buffer to expected state + buffer.position(startPosition); + buffer.limit(oldLimit); + throw new BufferTooSmallException("Destination buffer too small while encoding packet"); + } + + return this; + } + + protected void encodeAndCompressBody(ByteBuffer buffer, int startPosition) { + int startOfBody = buffer.position(); + encodeBody(buffer); + setEncodedBody(buffer, startOfBody, buffer.position() - startOfBody); + length = buffer.position() - startPosition; + + if (compressionLimit != 0 && length-4 > compressionLimit) { + byte[] compressedBody; + compressionType = CompressionType.LZ4; + LZ4Factory factory = LZ4Factory.fastestInstance(); + LZ4Compressor compressor = factory.fastCompressor(); + compressedBody = compressor.compress(encodedBody); + + log.log(LogLevel.DEBUG, "Uncompressed size: " + encodedBody.length + ", Compressed size: " + compressedBody.length); + if (compressedBody.length + 4 < encodedBody.length) { + buffer.position(startPosition); + buffer.putInt(compressedBody.length + startOfBody - startPosition + 4 - 4); // +4 for compressed size + buffer.putInt(getCompressedCode(compressionType)); + buffer.position(startOfBody); + buffer.putInt(encodedBody.length); + buffer.put(compressedBody); + buffer.limit(buffer.position()); + return; + } + } + buffer.putInt(startPosition, length - 4); // Encoded length 4 less than actual length + buffer.limit(buffer.position()); + } + + private int getCompressedCode(CompressionType compression) { + int code = compression.getCode(); + return getCode() | (code << 24); + } + + /** + * Encodes the body of this package onto the given buffer at the current position. + * The position of the buffer after encoding is the byte following + * the last encoded byte. + * + * @throws UnsupportedOperationException if not implemented in the subclass + */ + protected void encodeBody(ByteBuffer buffer) { + throw new UnsupportedOperationException("Encoding of " + this + " is not implemented"); + } + + private void setEncodedBody(ByteBuffer b, int start, int length) { + encodedBody = new byte[length]; + b.position(start); + b.get(encodedBody); + } + + public boolean isEncoded() { + return encodedBody != null; + } + + /** + * Just a place holder to make the APIs simpler. + */ + public Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException { + throw new UnsupportedOperationException("This class does not support a channel ID"); + } + + /** + * Allocate the needed buffers and encode the packet using the given + * channel ID (if pertinent). + * + * If this packet does not use a channel ID, the ID will be ignored. + */ + private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) { + while (true) { + try { + if (hasChannelId()) { + encode(buffer, channelId); + } else { + encode(buffer); + } + buffer.flip(); + break; + } + catch (BufferTooSmallException e) { + buffer = ByteBuffer.allocate(buffer.capacity()*2); + } + } + return buffer; + } + + /** + * Return buffer containing the encoded form of this package and + * remove internal reference to it. + */ + public final ByteBuffer grantEncodingBuffer(int channelId) { + return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE)); + } + + public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) { + return allocateAndEncode(channelId, buffer); + } + + /** Returns the code of this package */ + public abstract int getCode(); + + /** + * Returns the length of this body (including header (8 bytes) and body), + * or -1 if not known. + * Note that the streamed packet format length is 4 bytes less than this length, + * for unknown reasons. + * The length is always known when decodeBody is called. + */ + public int getLength() { + return length; + } + + /** + * Set the timestamp field of the packet. + * + * A timestamp which can be set or inspected by clients of this class + * but which is never updated by the class itself. This is mostly + * a convenience for when you need to queue packets or retain them + * in some structure where their validity is limited by a timeout + * or similar. + */ + public void setTimestamp (long timeStamp) { + this.timeStamp = timeStamp; + } + + /** + * Get the timestamp field of this packet. Note that this is + * <b>not</b> part of the FS4 protocol. @see #setTimestamp for + * more information + * + */ + public long getTimestamp () { + return timeStamp; + } + + public String toString() { + return "packet with code " + getCode(); + } + + /** Whether this is a packets which can encode a channel ID. */ + public boolean hasChannelId() { + return false; + } + + /** + * Throws an IOException if the packet is not of the expected type + */ + public void ensureInstanceOf(Class<? extends BasicPacket> type, String name) throws IOException { + if ((type.isAssignableFrom(getClass()))) return; + + if (this instanceof ErrorPacket) { + ErrorPacket errorPacket = (ErrorPacket) this; + if (errorPacket.getErrorCode() == 8) + throw new TimeoutException("Query timed out in " + name); + else + throw new IOException("Received error from backend in " + name + ": " + this); + } else { + throw new IOException("Received " + this + " when expecting " + type); + } + } +} |