diff options
author | Olli Virtanen <ovirtanen@gmail.com> | 2019-03-29 12:36:04 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-29 12:36:04 +0100 |
commit | b0f1e3055d4fc395105277076eb87306700136bb (patch) | |
tree | 279f9422cdd8d632889ba63be24d9595413c1b3f /container-search/src/main/java/com/yahoo/fs4 | |
parent | 748ad31c704fbd53ec45b659002a72564dbe2c04 (diff) | |
parent | 399a5dc9b483ca40690b940ecebb6fea96cdbc7d (diff) |
Merge branch 'master' into ollivir/protobuf-ping-and-feature-flag
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4')
4 files changed, 36 insertions, 64 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java index 6f87e45af25..f87721dc503 100644 --- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java @@ -26,9 +26,7 @@ public abstract class BasicPacket { private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024); public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags. - protected byte[] encodedBody; - - protected ByteBuffer encodingBuffer; + private byte[] encodedBody; /** The length of this packet in bytes or -1 if not known */ protected int length = -1; @@ -199,7 +197,7 @@ public abstract class BasicPacket { throw new UnsupportedOperationException("Encoding of " + this + " is not implemented"); } - protected void setEncodedBody(ByteBuffer b, int start, int length) { + private void setEncodedBody(ByteBuffer b, int start, int length) { encodedBody = new byte[length]; b.position(start); b.get(encodedBody); @@ -222,18 +220,7 @@ public abstract class BasicPacket { * * If this packet does not use a channel ID, the ID will be ignored. */ - public final void allocateAndEncode(int channelId) { - allocateAndEncode(channelId, DEFAULT_WRITE_BUFFER_SIZE); - } - - private void allocateAndEncode(int channelId, int initialSize) { - if (encodingBuffer != null) { - patchChannelId(encodingBuffer, channelId); - return; - } - - int size = initialSize; - ByteBuffer buffer = ByteBuffer.allocate(size); + private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) { while (true) { try { if (hasChannelId()) { @@ -242,43 +229,25 @@ public abstract class BasicPacket { encode(buffer); } buffer.flip(); - encodingBuffer = buffer; break; } catch (BufferTooSmallException e) { - size *= 2; - buffer = ByteBuffer.allocate(size); + buffer = ByteBuffer.allocate(buffer.capacity()*2); } } + return buffer; } - // No channel ID for BasicPacket instances, so it's a NOP - protected void patchChannelId(ByteBuffer buf, int channelId) {} - /** * Return buffer containing the encoded form of this package and * remove internal reference to it. */ public final ByteBuffer grantEncodingBuffer(int channelId) { - if (encodingBuffer == null) { - allocateAndEncode(channelId); - } else { - patchChannelId(encodingBuffer, channelId); - } - ByteBuffer b = encodingBuffer; - encodingBuffer = null; - return b; + return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE)); } - public final ByteBuffer grantEncodingBuffer(int channelId, int initialSize) { - if (encodingBuffer == null) { - allocateAndEncode(channelId, initialSize); - } else { - patchChannelId(encodingBuffer, channelId); - } - ByteBuffer b = encodingBuffer; - encodingBuffer = null; - return b; + public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) { + return allocateAndEncode(channelId, buffer); } /** Returns the code of this package */ diff --git a/container-search/src/main/java/com/yahoo/fs4/Packet.java b/container-search/src/main/java/com/yahoo/fs4/Packet.java index 78d5083c25f..1e9deede59d 100644 --- a/container-search/src/main/java/com/yahoo/fs4/Packet.java +++ b/container-search/src/main/java/com/yahoo/fs4/Packet.java @@ -19,8 +19,6 @@ public abstract class Packet extends BasicPacket { */ protected int channel = -1; - private static final int CHANNEL_ID_OFFSET = 8; - /** * Fills this package from a byte buffer positioned at the first * byte of the package @@ -109,17 +107,6 @@ public abstract class Packet extends BasicPacket { return true; } - /** - * Only for use with encodingBuffer magic. - * - * This is only called from allocateAndEncode and grantEncodingBuffer, - * therefore an assumption about the packet starting at the beginning of the - * buffer is made. - */ - protected void patchChannelId(ByteBuffer buf, int channelId) { - buf.putInt(CHANNEL_ID_OFFSET, channelId); - } - public String toString() { return "packet with code " + getCode() + ", channelId=" + getChannel(); } diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java index 202ee94383f..12f8e9e387d 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java @@ -1,19 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4.mplex; - -import com.yahoo.fs4.*; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.PacketDumper; +import com.yahoo.fs4.PacketListener; +import com.yahoo.fs4.PacketNotificationsBroadcaster; +import com.yahoo.fs4.PacketQueryTracer; import com.yahoo.io.Connection; import com.yahoo.io.ConnectionFactory; import com.yahoo.io.Listener; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.yolean.Exceptions; +import com.yahoo.yolean.concurrent.ConcurrentResourcePool; +import com.yahoo.yolean.concurrent.ResourceFactory; +import com.yahoo.yolean.concurrent.ResourcePool; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.util.*; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -23,6 +33,8 @@ import java.util.logging.Logger; */ public class Backend implements ConnectionFactory { + private static int DEFAULT_BUFFER_SIZE = 0x8000; + public static final class BackendStatistics { public final int activeConnections; @@ -61,6 +73,12 @@ public class Backend implements ConnectionFactory { private final ConnectionPool connectionPool; private final PacketDumper packetDumper; private final AtomicInteger connectionCount = new AtomicInteger(0); + private final ConcurrentResourcePool<ByteBuffer> byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() { + @Override + public ByteBuffer create() { + return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + } + }); /** * For unit testing. do not use @@ -116,6 +134,10 @@ public class Backend implements ConnectionFactory { return connection; } + ConcurrentResourcePool<ByteBuffer> getBufferPool() { + return byteBufferResourcePool; + } + /** * Return a connection to the connection pool. If the * connection is not valid anymore we drop it, ie. do not @@ -416,10 +438,6 @@ public class Backend implements ConnectionFactory { } } - public void dumpPackets(final PacketDumper.PacketType packetType, final boolean on) throws IOException { - packetDumper.dumpPackets(packetType, on); - } - public String getHost() { return host; } diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java index 69267f4a6b2..7dcbefde9fa 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java @@ -36,7 +36,6 @@ public class FS4Connection implements Connection private static int idCounter = 1; private int idNumber; - private int maxInitialSize = 1024; // outbound data private ByteBuffer writeBuffer; @@ -69,7 +68,7 @@ public class FS4Connection implements Connection * Packet sending interface. */ public void sendPacket (BasicPacket packet, Integer channelId) throws IOException { - ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), maxInitialSize); + ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc()); ByteBuffer viewForPacketListener = buffer.slice(); synchronized (this) { if (!(valid && channel.isOpen())) { @@ -79,9 +78,6 @@ public class FS4Connection implements Connection ", isOpen = " + channel.isOpen()); } - if (buffer.capacity() > maxInitialSize) { - maxInitialSize = buffer.limit(); - } if (writeBuffer == null) { writeBuffer = buffer; } else { @@ -131,6 +127,8 @@ public class FS4Connection implements Connection // buffer drained so we forget it and see what happens when we // go around. if indeed we go around if (!writeBuffer.hasRemaining()) { + writeBuffer.clear(); + backend.getBufferPool().free(writeBuffer); writeBuffer = null; } } while (bytesWritten > 0); |