summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/BasicPacket.java')
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/BasicPacket.java315
1 files changed, 0 insertions, 315 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
deleted file mode 100644
index f87721dc503..00000000000
--- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
+++ /dev/null
@@ -1,315 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.fs4;
-
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
-import com.yahoo.log.LogLevel;
-import com.yahoo.prelude.fastsearch.TimeoutException;
-import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Factory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Optional;
-import java.util.logging.Logger;
-
-/**
- * Superclass of fs4 packets
- *
- * @author bratseth
- */
-public abstract class BasicPacket {
-
- private final Compressor compressor = new Compressor();
-
- private static Logger log = Logger.getLogger(QueryResultPacket.class.getName());
- private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024);
- public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags.
-
- private byte[] encodedBody;
-
- /** The length of this packet in bytes or -1 if not known */
- protected int length = -1;
-
- /**
- * A timestamp which can be set or inspected by clients of this class
- * but which is never updated by the class itself. This is mostly
- * a convenience for when you need to queue packets or retain them
- * in some structure where their validity is limited by a timeout
- * or similar.
- */
- private long timeStamp = -1;
-
- private int compressionLimit = 0;
-
- private CompressionType compressionType;
-
- /**
- * Sets the number of bytes the package must be before activating compression.
- * A value of 0 means no compression.
- *
- * @param limit smallest package size that triggers compression.
- */
- public void setCompressionLimit(int limit) { compressionLimit = limit; }
-
- public void setCompressionType(String type) {
- compressionType = CompressionType.valueOf(type);
- }
-
- /**
- * Fills this package from a byte buffer positioned at the first byte of the package
- *
- * @return this for convenience
- * @throws UnsupportedOperationException if not implemented in the subclass
- */
- public BasicPacket decode(ByteBuffer buffer) {
- length = buffer.getInt()+4; // Streamed packet length is the length-4
- int code = buffer.getInt();
-
- decodeAndDecompressBody(buffer, code, length - 2*4);
- return this;
- }
-
- protected void decodeAndDecompressBody(ByteBuffer buffer, int code, int packetLength) {
- byte compressionType = (byte)((code & ~CODE_MASK) >> 24);
- boolean isCompressed = compressionType != 0;
- codeDecodedHook(code & CODE_MASK);
- if (isCompressed) {
- int uncompressedSize = buffer.getInt();
- int compressedSize = packetLength - 4;
- int offset = 0;
- byte[] compressedData;
- if (buffer.hasArray()) {
- compressedData = buffer.array();
- offset = buffer.arrayOffset() + buffer.position();
- buffer.position(buffer.position() + compressedSize);
- } else {
- compressedData = new byte[compressedSize];
- buffer.get(compressedData);
- }
- byte[] body = compressor.decompress(CompressionType.valueOf(compressionType), compressedData, offset,
- uncompressedSize, Optional.of(compressedSize));
- ByteBuffer bodyBuffer = ByteBuffer.wrap(body);
- length += uncompressedSize - (compressedSize + 4);
- decodeBody(bodyBuffer);
- } else {
- decodeBody(buffer);
- }
- }
-
- /**
- * Decodes the body of this package from a byte buffer
- * positioned at the first byte of the package.
- *
- * @throws UnsupportedOperationException if not implemented in the subclass
- */
- public void decodeBody(ByteBuffer buffer) {
- throw new UnsupportedOperationException("Decoding of " + this + " is not implemented");
- }
-
- /**
- * Called when the packet code is decoded.
- * This default implementation just throws an exception if the code
- * is not the code of this packet. Packets which has several possible codes
- * will use this method to store the code.
- */
- protected void codeDecodedHook(int code) {
- if (code != getCode())
- throw new RuntimeException("Can not decode " + code + " into " + this);
- }
-
- /**
- * <p>Encodes this package onto the given buffer at the current position.
- * The position of the buffer after encoding is the byte following
- * the last encoded byte.</p>
- *
- * <p>This method will ensure that everything is written provided
- * sufficient capacity regardless of the buffer limit.
- * When returning, the limit is at the end of the package (qual to the
- * position).</p>
- *
- * @return this for convenience
- * @throws UnsupportedOperationException if not implemented in the subclass
- */
- public BasicPacket encode(ByteBuffer buffer) throws BufferTooSmallException {
- int oldLimit = buffer.limit();
- int startPosition = buffer.position();
-
- buffer.limit(buffer.capacity());
- try {
- buffer.putInt(4); // Real length written later, when we know it
- buffer.putInt(getCode());
-
- encodeAndCompressBody(buffer, startPosition);
- }
- catch (java.nio.BufferOverflowException e) {
- // reset buffer to expected state
- buffer.position(startPosition);
- buffer.limit(oldLimit);
- throw new BufferTooSmallException("Destination buffer too small while encoding packet");
- }
-
- return this;
- }
-
- protected void encodeAndCompressBody(ByteBuffer buffer, int startPosition) {
- int startOfBody = buffer.position();
- encodeBody(buffer);
- setEncodedBody(buffer, startOfBody, buffer.position() - startOfBody);
- length = buffer.position() - startPosition;
-
- if (compressionLimit != 0 && length-4 > compressionLimit) {
- byte[] compressedBody;
- compressionType = CompressionType.LZ4;
- LZ4Factory factory = LZ4Factory.fastestInstance();
- LZ4Compressor compressor = factory.fastCompressor();
- compressedBody = compressor.compress(encodedBody);
-
- log.log(LogLevel.DEBUG, "Uncompressed size: " + encodedBody.length + ", Compressed size: " + compressedBody.length);
- if (compressedBody.length + 4 < encodedBody.length) {
- buffer.position(startPosition);
- buffer.putInt(compressedBody.length + startOfBody - startPosition + 4 - 4); // +4 for compressed size
- buffer.putInt(getCompressedCode(compressionType));
- buffer.position(startOfBody);
- buffer.putInt(encodedBody.length);
- buffer.put(compressedBody);
- buffer.limit(buffer.position());
- return;
- }
- }
- buffer.putInt(startPosition, length - 4); // Encoded length 4 less than actual length
- buffer.limit(buffer.position());
- }
-
- private int getCompressedCode(CompressionType compression) {
- int code = compression.getCode();
- return getCode() | (code << 24);
- }
-
- /**
- * Encodes the body of this package onto the given buffer at the current position.
- * The position of the buffer after encoding is the byte following
- * the last encoded byte.
- *
- * @throws UnsupportedOperationException if not implemented in the subclass
- */
- protected void encodeBody(ByteBuffer buffer) {
- throw new UnsupportedOperationException("Encoding of " + this + " is not implemented");
- }
-
- private void setEncodedBody(ByteBuffer b, int start, int length) {
- encodedBody = new byte[length];
- b.position(start);
- b.get(encodedBody);
- }
-
- public boolean isEncoded() {
- return encodedBody != null;
- }
-
- /**
- * Just a place holder to make the APIs simpler.
- */
- public Packet encode(ByteBuffer buffer, int channel) throws BufferTooSmallException {
- throw new UnsupportedOperationException("This class does not support a channel ID");
- }
-
- /**
- * Allocate the needed buffers and encode the packet using the given
- * channel ID (if pertinent).
- *
- * If this packet does not use a channel ID, the ID will be ignored.
- */
- private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) {
- while (true) {
- try {
- if (hasChannelId()) {
- encode(buffer, channelId);
- } else {
- encode(buffer);
- }
- buffer.flip();
- break;
- }
- catch (BufferTooSmallException e) {
- buffer = ByteBuffer.allocate(buffer.capacity()*2);
- }
- }
- return buffer;
- }
-
- /**
- * Return buffer containing the encoded form of this package and
- * remove internal reference to it.
- */
- public final ByteBuffer grantEncodingBuffer(int channelId) {
- return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE));
- }
-
- public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) {
- return allocateAndEncode(channelId, buffer);
- }
-
- /** Returns the code of this package */
- public abstract int getCode();
-
- /**
- * Returns the length of this body (including header (8 bytes) and body),
- * or -1 if not known.
- * Note that the streamed packet format length is 4 bytes less than this length,
- * for unknown reasons.
- * The length is always known when decodeBody is called.
- */
- public int getLength() {
- return length;
- }
-
- /**
- * Set the timestamp field of the packet.
- *
- * A timestamp which can be set or inspected by clients of this class
- * but which is never updated by the class itself. This is mostly
- * a convenience for when you need to queue packets or retain them
- * in some structure where their validity is limited by a timeout
- * or similar.
- */
- public void setTimestamp (long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- /**
- * Get the timestamp field of this packet. Note that this is
- * <b>not</b> part of the FS4 protocol. @see #setTimestamp for
- * more information
- *
- */
- public long getTimestamp () {
- return timeStamp;
- }
-
- public String toString() {
- return "packet with code " + getCode();
- }
-
- /** Whether this is a packets which can encode a channel ID. */
- public boolean hasChannelId() {
- return false;
- }
-
- /**
- * Throws an IOException if the packet is not of the expected type
- */
- public void ensureInstanceOf(Class<? extends BasicPacket> type, String name) throws IOException {
- if ((type.isAssignableFrom(getClass()))) return;
-
- if (this instanceof ErrorPacket) {
- ErrorPacket errorPacket = (ErrorPacket) this;
- if (errorPacket.getErrorCode() == 8)
- throw new TimeoutException("Query timed out in " + name);
- else
- throw new IOException("Received error from backend in " + name + ": " + this);
- } else {
- throw new IOException("Received " + this + " when expecting " + type);
- }
- }
-}