diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-03-28 18:15:59 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-28 18:15:59 +0200 |
commit | a01cca22369e4ba4b7467ecff2d29ea10fd5a9c8 (patch) | |
tree | 0f7e82a2980887cf40e2c6d0a037cf1f6a7d319a /container-search | |
parent | 1af2c39a9d497348505fa32e083c4dc5617b2ecd (diff) | |
parent | fe7aae93eeef099040ee17f1ee697b96088544c9 (diff) |
Merge pull request #8925 from vespa-engine/balder/provide-initial-buffer
Balder/provide initial buffer.
Diffstat (limited to 'container-search')
5 files changed, 45 insertions, 65 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java index 6f87e45af25..f87721dc503 100644 --- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java @@ -26,9 +26,7 @@ public abstract class BasicPacket { private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024); public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags. - protected byte[] encodedBody; - - protected ByteBuffer encodingBuffer; + private byte[] encodedBody; /** The length of this packet in bytes or -1 if not known */ protected int length = -1; @@ -199,7 +197,7 @@ public abstract class BasicPacket { throw new UnsupportedOperationException("Encoding of " + this + " is not implemented"); } - protected void setEncodedBody(ByteBuffer b, int start, int length) { + private void setEncodedBody(ByteBuffer b, int start, int length) { encodedBody = new byte[length]; b.position(start); b.get(encodedBody); @@ -222,18 +220,7 @@ public abstract class BasicPacket { * * If this packet does not use a channel ID, the ID will be ignored. */ - public final void allocateAndEncode(int channelId) { - allocateAndEncode(channelId, DEFAULT_WRITE_BUFFER_SIZE); - } - - private void allocateAndEncode(int channelId, int initialSize) { - if (encodingBuffer != null) { - patchChannelId(encodingBuffer, channelId); - return; - } - - int size = initialSize; - ByteBuffer buffer = ByteBuffer.allocate(size); + private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) { while (true) { try { if (hasChannelId()) { @@ -242,43 +229,25 @@ public abstract class BasicPacket { encode(buffer); } buffer.flip(); - encodingBuffer = buffer; break; } catch (BufferTooSmallException e) { - size *= 2; - buffer = ByteBuffer.allocate(size); + buffer = ByteBuffer.allocate(buffer.capacity()*2); } } + return buffer; } - // No channel ID for BasicPacket instances, so it's a NOP - protected void patchChannelId(ByteBuffer buf, int channelId) {} - /** * Return buffer containing the encoded form of this package and * remove internal reference to it. */ public final ByteBuffer grantEncodingBuffer(int channelId) { - if (encodingBuffer == null) { - allocateAndEncode(channelId); - } else { - patchChannelId(encodingBuffer, channelId); - } - ByteBuffer b = encodingBuffer; - encodingBuffer = null; - return b; + return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE)); } - public final ByteBuffer grantEncodingBuffer(int channelId, int initialSize) { - if (encodingBuffer == null) { - allocateAndEncode(channelId, initialSize); - } else { - patchChannelId(encodingBuffer, channelId); - } - ByteBuffer b = encodingBuffer; - encodingBuffer = null; - return b; + public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) { + return allocateAndEncode(channelId, buffer); } /** Returns the code of this package */ diff --git a/container-search/src/main/java/com/yahoo/fs4/Packet.java b/container-search/src/main/java/com/yahoo/fs4/Packet.java index 78d5083c25f..1e9deede59d 100644 --- a/container-search/src/main/java/com/yahoo/fs4/Packet.java +++ b/container-search/src/main/java/com/yahoo/fs4/Packet.java @@ -19,8 +19,6 @@ public abstract class Packet extends BasicPacket { */ protected int channel = -1; - private static final int CHANNEL_ID_OFFSET = 8; - /** * Fills this package from a byte buffer positioned at the first * byte of the package @@ -109,17 +107,6 @@ public abstract class Packet extends BasicPacket { return true; } - /** - * Only for use with encodingBuffer magic. - * - * This is only called from allocateAndEncode and grantEncodingBuffer, - * therefore an assumption about the packet starting at the beginning of the - * buffer is made. - */ - protected void patchChannelId(ByteBuffer buf, int channelId) { - buf.putInt(CHANNEL_ID_OFFSET, channelId); - } - public String toString() { return "packet with code " + getCode() + ", channelId=" + getChannel(); } diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java index 202ee94383f..12f8e9e387d 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java @@ -1,19 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4.mplex; - -import com.yahoo.fs4.*; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.PacketDumper; +import com.yahoo.fs4.PacketListener; +import com.yahoo.fs4.PacketNotificationsBroadcaster; +import com.yahoo.fs4.PacketQueryTracer; import com.yahoo.io.Connection; import com.yahoo.io.ConnectionFactory; import com.yahoo.io.Listener; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.yolean.Exceptions; +import com.yahoo.yolean.concurrent.ConcurrentResourcePool; +import com.yahoo.yolean.concurrent.ResourceFactory; +import com.yahoo.yolean.concurrent.ResourcePool; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.util.*; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -23,6 +33,8 @@ import java.util.logging.Logger; */ public class Backend implements ConnectionFactory { + private static int DEFAULT_BUFFER_SIZE = 0x8000; + public static final class BackendStatistics { public final int activeConnections; @@ -61,6 +73,12 @@ public class Backend implements ConnectionFactory { private final ConnectionPool connectionPool; private final PacketDumper packetDumper; private final AtomicInteger connectionCount = new AtomicInteger(0); + private final ConcurrentResourcePool<ByteBuffer> byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() { + @Override + public ByteBuffer create() { + return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + } + }); /** * For unit testing. do not use @@ -116,6 +134,10 @@ public class Backend implements ConnectionFactory { return connection; } + ConcurrentResourcePool<ByteBuffer> getBufferPool() { + return byteBufferResourcePool; + } + /** * Return a connection to the connection pool. If the * connection is not valid anymore we drop it, ie. do not @@ -416,10 +438,6 @@ public class Backend implements ConnectionFactory { } } - public void dumpPackets(final PacketDumper.PacketType packetType, final boolean on) throws IOException { - packetDumper.dumpPackets(packetType, on); - } - public String getHost() { return host; } diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java index 69267f4a6b2..7dcbefde9fa 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java @@ -36,7 +36,6 @@ public class FS4Connection implements Connection private static int idCounter = 1; private int idNumber; - private int maxInitialSize = 1024; // outbound data private ByteBuffer writeBuffer; @@ -69,7 +68,7 @@ public class FS4Connection implements Connection * Packet sending interface. */ public void sendPacket (BasicPacket packet, Integer channelId) throws IOException { - ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), maxInitialSize); + ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc()); ByteBuffer viewForPacketListener = buffer.slice(); synchronized (this) { if (!(valid && channel.isOpen())) { @@ -79,9 +78,6 @@ public class FS4Connection implements Connection ", isOpen = " + channel.isOpen()); } - if (buffer.capacity() > maxInitialSize) { - maxInitialSize = buffer.limit(); - } if (writeBuffer == null) { writeBuffer = buffer; } else { @@ -131,6 +127,8 @@ public class FS4Connection implements Connection // buffer drained so we forget it and see what happens when we // go around. if indeed we go around if (!writeBuffer.hasRemaining()) { + writeBuffer.clear(); + backend.getBufferPool().free(writeBuffer); writeBuffer = null; } } while (bytesWritten > 0); diff --git a/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java index fc39c1d8fe0..911831e3a65 100644 --- a/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java +++ b/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java @@ -192,6 +192,15 @@ public class QueryTestCase { } @Test + public void testBufferExpands() throws BufferTooSmallException { + Query query = new Query("/?query=chain&sortspec=%2Ba+-b&timeout=0"); + QueryPacket packet = QueryPacket.create("container.0", query); + + ByteBuffer buffer = packet.grantEncodingBuffer(0, ByteBuffer.allocate(2)); + assertEquals(64, buffer.capacity()); + } + + @Test public void testPhraseEqualsPhraseWithPhraseSegment() throws BufferTooSmallException { Query query = new Query(); PhraseItem p = new PhraseItem(); @@ -258,7 +267,6 @@ public class QueryTestCase { assertEqualArrays(correctBuffer,encoded); - packet.allocateAndEncode(0x07070707); buffer = packet.grantEncodingBuffer(0x09090909); correctBuffer = new byte[] {0,0,0,46,0,0,0,-38,9,9,9,9, // Header 0,0,0,6, // Features |