summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-03-28 18:15:59 +0200
committerGitHub <noreply@github.com>2019-03-28 18:15:59 +0200
commita01cca22369e4ba4b7467ecff2d29ea10fd5a9c8 (patch)
tree0f7e82a2980887cf40e2c6d0a037cf1f6a7d319a /container-search
parent1af2c39a9d497348505fa32e083c4dc5617b2ecd (diff)
parentfe7aae93eeef099040ee17f1ee697b96088544c9 (diff)
Merge pull request #8925 from vespa-engine/balder/provide-initial-buffer
Balder/provide initial buffer.
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/BasicPacket.java47
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/Packet.java13
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java32
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java8
-rw-r--r--container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java10
5 files changed, 45 insertions, 65 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
index 6f87e45af25..f87721dc503 100644
--- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
+++ b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
@@ -26,9 +26,7 @@ public abstract class BasicPacket {
private static int DEFAULT_WRITE_BUFFER_SIZE = (10 * 1024);
public static final int CODE_MASK = 0x00ff_ffff; // Reserve upper byte for flags.
- protected byte[] encodedBody;
-
- protected ByteBuffer encodingBuffer;
+ private byte[] encodedBody;
/** The length of this packet in bytes or -1 if not known */
protected int length = -1;
@@ -199,7 +197,7 @@ public abstract class BasicPacket {
throw new UnsupportedOperationException("Encoding of " + this + " is not implemented");
}
- protected void setEncodedBody(ByteBuffer b, int start, int length) {
+ private void setEncodedBody(ByteBuffer b, int start, int length) {
encodedBody = new byte[length];
b.position(start);
b.get(encodedBody);
@@ -222,18 +220,7 @@ public abstract class BasicPacket {
*
* If this packet does not use a channel ID, the ID will be ignored.
*/
- public final void allocateAndEncode(int channelId) {
- allocateAndEncode(channelId, DEFAULT_WRITE_BUFFER_SIZE);
- }
-
- private void allocateAndEncode(int channelId, int initialSize) {
- if (encodingBuffer != null) {
- patchChannelId(encodingBuffer, channelId);
- return;
- }
-
- int size = initialSize;
- ByteBuffer buffer = ByteBuffer.allocate(size);
+ private ByteBuffer allocateAndEncode(int channelId, ByteBuffer buffer) {
while (true) {
try {
if (hasChannelId()) {
@@ -242,43 +229,25 @@ public abstract class BasicPacket {
encode(buffer);
}
buffer.flip();
- encodingBuffer = buffer;
break;
}
catch (BufferTooSmallException e) {
- size *= 2;
- buffer = ByteBuffer.allocate(size);
+ buffer = ByteBuffer.allocate(buffer.capacity()*2);
}
}
+ return buffer;
}
- // No channel ID for BasicPacket instances, so it's a NOP
- protected void patchChannelId(ByteBuffer buf, int channelId) {}
-
/**
* Return buffer containing the encoded form of this package and
* remove internal reference to it.
*/
public final ByteBuffer grantEncodingBuffer(int channelId) {
- if (encodingBuffer == null) {
- allocateAndEncode(channelId);
- } else {
- patchChannelId(encodingBuffer, channelId);
- }
- ByteBuffer b = encodingBuffer;
- encodingBuffer = null;
- return b;
+ return allocateAndEncode(channelId, ByteBuffer.allocate(DEFAULT_WRITE_BUFFER_SIZE));
}
- public final ByteBuffer grantEncodingBuffer(int channelId, int initialSize) {
- if (encodingBuffer == null) {
- allocateAndEncode(channelId, initialSize);
- } else {
- patchChannelId(encodingBuffer, channelId);
- }
- ByteBuffer b = encodingBuffer;
- encodingBuffer = null;
- return b;
+ public final ByteBuffer grantEncodingBuffer(int channelId, ByteBuffer buffer) {
+ return allocateAndEncode(channelId, buffer);
}
/** Returns the code of this package */
diff --git a/container-search/src/main/java/com/yahoo/fs4/Packet.java b/container-search/src/main/java/com/yahoo/fs4/Packet.java
index 78d5083c25f..1e9deede59d 100644
--- a/container-search/src/main/java/com/yahoo/fs4/Packet.java
+++ b/container-search/src/main/java/com/yahoo/fs4/Packet.java
@@ -19,8 +19,6 @@ public abstract class Packet extends BasicPacket {
*/
protected int channel = -1;
- private static final int CHANNEL_ID_OFFSET = 8;
-
/**
* Fills this package from a byte buffer positioned at the first
* byte of the package
@@ -109,17 +107,6 @@ public abstract class Packet extends BasicPacket {
return true;
}
- /**
- * Only for use with encodingBuffer magic.
- *
- * This is only called from allocateAndEncode and grantEncodingBuffer,
- * therefore an assumption about the packet starting at the beginning of the
- * buffer is made.
- */
- protected void patchChannelId(ByteBuffer buf, int channelId) {
- buf.putInt(CHANNEL_ID_OFFSET, channelId);
- }
-
public String toString() {
return "packet with code " + getCode() + ", channelId=" + getChannel();
}
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
index 202ee94383f..12f8e9e387d 100644
--- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
@@ -1,19 +1,29 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.fs4.mplex;
-
-import com.yahoo.fs4.*;
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.Packet;
+import com.yahoo.fs4.PacketDumper;
+import com.yahoo.fs4.PacketListener;
+import com.yahoo.fs4.PacketNotificationsBroadcaster;
+import com.yahoo.fs4.PacketQueryTracer;
import com.yahoo.io.Connection;
import com.yahoo.io.ConnectionFactory;
import com.yahoo.io.Listener;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.yolean.Exceptions;
+import com.yahoo.yolean.concurrent.ConcurrentResourcePool;
+import com.yahoo.yolean.concurrent.ResourceFactory;
+import com.yahoo.yolean.concurrent.ResourcePool;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import java.util.*;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -23,6 +33,8 @@ import java.util.logging.Logger;
*/
public class Backend implements ConnectionFactory {
+ private static int DEFAULT_BUFFER_SIZE = 0x8000;
+
public static final class BackendStatistics {
public final int activeConnections;
@@ -61,6 +73,12 @@ public class Backend implements ConnectionFactory {
private final ConnectionPool connectionPool;
private final PacketDumper packetDumper;
private final AtomicInteger connectionCount = new AtomicInteger(0);
+ private final ConcurrentResourcePool<ByteBuffer> byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() {
+ @Override
+ public ByteBuffer create() {
+ return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ }
+ });
/**
* For unit testing. do not use
@@ -116,6 +134,10 @@ public class Backend implements ConnectionFactory {
return connection;
}
+ ConcurrentResourcePool<ByteBuffer> getBufferPool() {
+ return byteBufferResourcePool;
+ }
+
/**
* Return a connection to the connection pool. If the
* connection is not valid anymore we drop it, ie. do not
@@ -416,10 +438,6 @@ public class Backend implements ConnectionFactory {
}
}
- public void dumpPackets(final PacketDumper.PacketType packetType, final boolean on) throws IOException {
- packetDumper.dumpPackets(packetType, on);
- }
-
public String getHost() {
return host;
}
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java
index 69267f4a6b2..7dcbefde9fa 100644
--- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java
@@ -36,7 +36,6 @@ public class FS4Connection implements Connection
private static int idCounter = 1;
private int idNumber;
- private int maxInitialSize = 1024;
// outbound data
private ByteBuffer writeBuffer;
@@ -69,7 +68,7 @@ public class FS4Connection implements Connection
* Packet sending interface.
*/
public void sendPacket (BasicPacket packet, Integer channelId) throws IOException {
- ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), maxInitialSize);
+ ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc());
ByteBuffer viewForPacketListener = buffer.slice();
synchronized (this) {
if (!(valid && channel.isOpen())) {
@@ -79,9 +78,6 @@ public class FS4Connection implements Connection
", isOpen = " + channel.isOpen());
}
- if (buffer.capacity() > maxInitialSize) {
- maxInitialSize = buffer.limit();
- }
if (writeBuffer == null) {
writeBuffer = buffer;
} else {
@@ -131,6 +127,8 @@ public class FS4Connection implements Connection
// buffer drained so we forget it and see what happens when we
// go around. if indeed we go around
if (!writeBuffer.hasRemaining()) {
+ writeBuffer.clear();
+ backend.getBufferPool().free(writeBuffer);
writeBuffer = null;
}
} while (bytesWritten > 0);
diff --git a/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java b/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java
index fc39c1d8fe0..911831e3a65 100644
--- a/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java
+++ b/container-search/src/test/java/com/yahoo/fs4/test/QueryTestCase.java
@@ -192,6 +192,15 @@ public class QueryTestCase {
}
@Test
+ public void testBufferExpands() throws BufferTooSmallException {
+ Query query = new Query("/?query=chain&sortspec=%2Ba+-b&timeout=0");
+ QueryPacket packet = QueryPacket.create("container.0", query);
+
+ ByteBuffer buffer = packet.grantEncodingBuffer(0, ByteBuffer.allocate(2));
+ assertEquals(64, buffer.capacity());
+ }
+
+ @Test
public void testPhraseEqualsPhraseWithPhraseSegment() throws BufferTooSmallException {
Query query = new Query();
PhraseItem p = new PhraseItem();
@@ -258,7 +267,6 @@ public class QueryTestCase {
assertEqualArrays(correctBuffer,encoded);
- packet.allocateAndEncode(0x07070707);
buffer = packet.grantEncodingBuffer(0x09090909);
correctBuffer = new byte[] {0,0,0,46,0,0,0,-38,9,9,9,9, // Header
0,0,0,6, // Features