aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/fs4/mplex
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/mplex')
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java449
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java90
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java255
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java371
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java15
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java56
6 files changed, 1236 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
new file mode 100644
index 00000000000..12f8e9e387d
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
@@ -0,0 +1,449 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.fs4.mplex;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.Packet;
+import com.yahoo.fs4.PacketDumper;
+import com.yahoo.fs4.PacketListener;
+import com.yahoo.fs4.PacketNotificationsBroadcaster;
+import com.yahoo.fs4.PacketQueryTracer;
+import com.yahoo.io.Connection;
+import com.yahoo.io.ConnectionFactory;
+import com.yahoo.io.Listener;
+import com.yahoo.vespa.defaults.Defaults;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.yolean.concurrent.ConcurrentResourcePool;
+import com.yahoo.yolean.concurrent.ResourceFactory;
+import com.yahoo.yolean.concurrent.ResourcePool;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author Bjorn Borud
+ */
+public class Backend implements ConnectionFactory {
+
+ private static int DEFAULT_BUFFER_SIZE = 0x8000;
+
+ public static final class BackendStatistics {
+
+ public final int activeConnections;
+ public final int passiveConnections;
+
+ public BackendStatistics(int activeConnections, int passiveConnections) {
+ this.activeConnections = activeConnections;
+ this.passiveConnections = passiveConnections;
+ }
+
+ @Override
+ public String toString() {
+ return activeConnections + "/" + totalConnections();
+ }
+
+ public int totalConnections() {
+ return activeConnections + passiveConnections;
+ }
+ }
+
+ private static final Logger log = Logger.getLogger(Backend.class.getName());
+
+ private final ListenerPool listeners;
+ private final InetSocketAddress address;
+ private final String host;
+ private final int port;
+ private final Map<Integer, FS4Channel> activeChannels = new HashMap<>();
+ private int channelId = 0;
+ private boolean shutdownInitiated = false;
+
+ /** Whether we are currently in the state of not being able to connect, to avoid repeated logging */
+ private boolean areInSocketNotConnectableState = false;
+
+ private final LinkedList<FS4Channel> pingChannels = new LinkedList<>();
+ private final PacketListener packetListener;
+ private final ConnectionPool connectionPool;
+ private final PacketDumper packetDumper;
+ private final AtomicInteger connectionCount = new AtomicInteger(0);
+ private final ConcurrentResourcePool<ByteBuffer> byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() {
+ @Override
+ public ByteBuffer create() {
+ return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ }
+ });
+
+ /**
+ * For unit testing. do not use
+ */
+ protected Backend() {
+ listeners = null;
+ host = null;
+ port = 0;
+ packetListener = null;
+ packetDumper = null;
+ address = null;
+ connectionPool = new ConnectionPool();
+ }
+
+ public Backend(String host,
+ int port,
+ String serverDiscriminator,
+ ListenerPool listenerPool,
+ ConnectionPool connectionPool) {
+ String fileNamePattern = "qrs." + serverDiscriminator + '.' + host + ":" + port + ".%s" + ".dump";
+ packetDumper = new PacketDumper(new File(Defaults.getDefaults().underVespaHome("logs/vespa/qrs/")),
+ fileNamePattern);
+ packetListener = new PacketNotificationsBroadcaster(packetDumper, new PacketQueryTracer());
+ this.listeners = listenerPool;
+ this.host = host;
+ this.port = port;
+ address = new InetSocketAddress(host, port);
+ this.connectionPool = connectionPool;
+ }
+
+ private void logWarning(String attemptDescription, Exception e) {
+ log.log(Level.WARNING, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e));
+ }
+
+ private void logInfo(String attemptDescription, Exception e) {
+ log.log(Level.INFO, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e));
+ }
+
+ // ============================================================
+ // ==== connection pool stuff
+ // ============================================================
+
+ /**
+ * Fetch a connection from the connection pool. If the pool
+ * is empty we create a connection.
+ */
+ private FS4Connection getConnection() throws IOException {
+ FS4Connection connection = connectionPool.getConnection();
+ if (connection == null) {
+ // if pool was empty create one:
+ connection = createConnection();
+ }
+ return connection;
+ }
+
+ ConcurrentResourcePool<ByteBuffer> getBufferPool() {
+ return byteBufferResourcePool;
+ }
+
+ /**
+ * Return a connection to the connection pool. If the
+ * connection is not valid anymore we drop it, ie. do not
+ * put it into the pool.
+ */
+ public void returnConnection(FS4Connection connection) {
+ connectionPool.releaseConnection(connection);
+ }
+
+ /**
+ * Create a new connection to the target for this backend.
+ */
+ private FS4Connection createConnection() throws IOException {
+ SocketChannel socket = SocketChannel.open();
+ try {
+ connectSocket(socket);
+ } catch (Exception e) {
+ // was warning, see VESPA-1922
+ if ( ! areInSocketNotConnectableState) {
+ logInfo("connecting to", e);
+ }
+ areInSocketNotConnectableState = true;
+ socket.close();
+ return null;
+ }
+ areInSocketNotConnectableState = false;
+ int listenerId = connectionCount.getAndIncrement()%listeners.size();
+ Listener listener = listeners.get(listenerId);
+ FS4Connection connection = new FS4Connection(socket, listener, this, packetListener);
+ listener.registerConnection(connection);
+
+ log.fine("Created new connection to " + host + ":" + port);
+ connectionPool.createdConnection();
+ return connection;
+ }
+
+ private void connectSocket(SocketChannel socket) throws IOException {
+ socket.configureBlocking(false);
+
+ boolean connected = socket.connect(address);
+
+ // wait for connection
+ if (!connected) {
+ long timeBarrier = System.currentTimeMillis() + 20L;
+ while (true) {
+ try {
+ Thread.sleep(5L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Received InterruptedException while waiting for socket to connect.", e);
+ }
+ // don't care whether it's spurious wakeup
+ connected = socket.finishConnect();
+ if (connected || System.currentTimeMillis() > timeBarrier) {
+ break;
+ }
+ }
+ }
+
+ // did we get a connection?
+ if ( !connected) {
+ throw new IllegalArgumentException("Could not create connection to dispatcher on "
+ + address.getHostName() + ":" + address.getPort());
+ }
+ socket.socket().
+ setTcpNoDelay(true);
+ }
+
+
+ //============================================================
+ //==== channel management
+ //============================================================
+
+ /** Opens a new channel to fdispatch. Analogous to the "Channel" concept as used in FS4. */
+ public FS4Channel openChannel() {
+ int cachedChannelId;
+ synchronized (this) {
+ if (channelId >= ((1 << 31) - 2)) {
+ channelId = 0;
+ }
+ cachedChannelId = channelId;
+ channelId += 2;
+ }
+ Integer id = cachedChannelId;
+ FS4Channel chan = new FS4Channel(this, id);
+ synchronized (activeChannels) {
+ activeChannels.put(id, chan);
+ }
+ return chan;
+ }
+
+ public FS4Channel openPingChannel() {
+ FS4Channel chan = FS4Channel.createPingChannel(this);
+ synchronized (pingChannels) {
+ pingChannels.add(chan);
+ }
+ return chan;
+ }
+
+ /**
+ * Get the remote address for this Backend. This method
+ * has package access only, because it is really only of
+ * importance to FS4Channel for writing slightly more sensible
+ * log messages.
+ * @return Returns the address (host, port) for this Backend.
+ */
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
+ /**
+ * Get an active channel by id.
+ *
+ * @param id the (fs4) channel id
+ * @return returns the (fs4) channel associated with this id
+ * or <code>null</code> if the channel is not in the
+ * set of active channels.
+ */
+ public FS4Channel getChannel(Integer id) {
+ synchronized (activeChannels) {
+ return activeChannels.get(id);
+ }
+ }
+
+ /**
+ * Return the first channel in the queue waiting for pings or
+ * <code>null</code> if none.
+ */
+ public FS4Channel getPingChannel() {
+ synchronized (pingChannels) {
+ return (pingChannels.isEmpty()) ? null : pingChannels.getFirst();
+ }
+ }
+
+ /**
+ * Get an active channel by id. This is a wrapper for the method
+ * that takes the id as an Integer.
+ *
+ * @param id The (fs4) channel id
+ * @return Returns the (fs4) channel associated with this id
+ * or <code>null</code> if the channel is not in the
+ * set of active channels.
+ */
+ public FS4Channel getChannel(int id) {
+ return getChannel(Integer.valueOf(id));
+ }
+
+ /**
+ * Remove a channel. We do not want this method to be called
+ * directly by the client -- removal of channels should be done
+ * by calling the close() method of the channel.
+ *
+ * @param id The (fs4) channel id
+ * @return Removes and returns the (fs4) channel associated
+ * with this id or <code>null</code> if the channel is
+ * not in the set of active channels.
+ */
+ protected FS4Channel removeChannel(Integer id) {
+ synchronized (activeChannels) {
+ return activeChannels.remove(id);
+ }
+ }
+
+ /**
+ * Remove a ping channel. We do not want this method to be called
+ * directly by the client -- removal of channels should be done
+ * by calling the close() method of the channel.
+ *
+ * @return Removes and returns the (fs4) channel first in
+ * the queue of ping channels or <code>null</code>
+ * if there are no active ping channels.
+ */
+ protected FS4Channel removePingChannel() {
+ synchronized (pingChannels) {
+ if (pingChannels.isEmpty())
+ return null;
+ return pingChannels.removeFirst();
+ }
+ }
+ //============================================================
+ //==== packet sending and reception
+ //============================================================
+
+ protected boolean sendPacket(BasicPacket packet, Integer channelId) throws IOException {
+ if (shutdownInitiated) {
+ log.fine("Tried to send packet after shutdown initiated. Ignored.");
+ return false;
+ }
+
+ FS4Connection connection = null;
+ try {
+ connection = getConnection();
+ if (connection == null) {
+ return false;
+ }
+ connection.sendPacket(packet, channelId);
+ }
+ finally {
+ if (connection != null) {
+ returnConnection(connection);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * When a connection receives a packet, it uses this method to
+ * dispatch the packet to the right FS4Channel. If the corresponding
+ * FS4Channel does not exist the packet is dropped and a message is
+ * logged saying so.
+ */
+ protected void receivePacket(BasicPacket packet) {
+ FS4Channel fs4;
+ if (packet.hasChannelId())
+ fs4 = getChannel(((Packet)packet).getChannel());
+ else
+ fs4 = getPingChannel();
+
+ // channel does not exist
+ if (fs4 == null) {
+ return;
+ }
+ try {
+ fs4.addPacket(packet);
+ }
+ catch (InterruptedException e) {
+ log.info("Interrupted during packet adding. Packet = " + packet.toString());
+ Thread.currentThread().interrupt();
+ }
+ catch (InvalidChannelException e) {
+ log.log(Level.WARNING, "Channel was invalid. Packet = " + packet.toString()
+ + " Backend probably sent data pertaining an old request,"
+ + " system may be overloaded.");
+ }
+ }
+
+ /**
+ * Attempt to establish a connection without sending messages and then
+ * return it to the pool. The assumption is that if the probing is
+ * successful, the connection will be used soon after. There should be
+ * minimal overhead since the connection is cached.
+ */
+ public boolean probeConnection() {
+ if (shutdownInitiated) {
+ return false;
+ }
+
+ FS4Connection connection = null;
+ try {
+ connection = getConnection();
+ } catch (IOException ignored) {
+ // connection is null
+ } finally {
+ if (connection != null) {
+ returnConnection(connection);
+ }
+ }
+
+ return connection != null;
+ }
+
+ /**
+ * This method should be used to ensure graceful shutdown of the backend.
+ */
+ public void shutdown() {
+ log.fine("shutting down");
+ if (shutdownInitiated) {
+ throw new IllegalStateException("Shutdown already in progress");
+ }
+ shutdownInitiated = true;
+ }
+
+ public void close() {
+ for (Connection c = connectionPool.getConnection(); c != null; c = connectionPool.getConnection()) {
+ try {
+ c.close();
+ } catch (IOException e) {
+ logWarning("closing", e);
+ }
+ }
+ }
+
+ /**
+ * Connection factory used by the Listener class.
+ */
+ public Connection newConnection(SocketChannel channel, Listener listener) {
+ return new FS4Connection(channel, listener, this, packetListener);
+ }
+
+ public String toString () {
+ return("Backend/" + host + ":" + port);
+ }
+
+ public BackendStatistics getStatistics() {
+ synchronized (connectionPool) { //ensure consistent values
+ return new BackendStatistics(connectionPool.activeConnections(), connectionPool.passiveConnections());
+ }
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java
new file mode 100644
index 00000000000..e84adfbef2c
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java
@@ -0,0 +1,90 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.fs4.mplex;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import com.yahoo.log.LogLevel;
+/**
+ * Pool of FS4 connections.
+ *
+ * @author Tony Vaagenes
+ */
+public class ConnectionPool {
+
+ private final static int CLEANINGPERIOD = 1000; // Execute every second
+ private final Queue<FS4Connection> connections = new ConcurrentLinkedQueue<>();
+ private final AtomicInteger activeConnections = new AtomicInteger(0);
+ private final AtomicInteger passiveConnections = new AtomicInteger(0);
+ private static final Logger log = Logger.getLogger(ConnectionPool.class.getName());
+
+ class PoolCleanerTask extends TimerTask {
+ private final ConnectionPool connectionPool;
+ public PoolCleanerTask(ConnectionPool connectionPool) {
+ this.connectionPool = connectionPool;
+ }
+
+ public void run() {
+ try {
+ connectionPool.dropInvalidConnections();
+ } catch (Exception e) {
+ log.log(LogLevel.WARNING,
+ "Caught exception in connection pool cleaner, ignoring.",
+ e);
+ }
+ }
+ }
+
+ public ConnectionPool() {
+ }
+
+ public ConnectionPool(Timer timer) {
+ timer.schedule(new PoolCleanerTask(this), CLEANINGPERIOD, CLEANINGPERIOD);
+ }
+
+ private void dropInvalidConnections() {
+ for (Iterator<FS4Connection> i = connections.iterator(); i.hasNext();) {
+ FS4Connection connection = i.next();
+ if (!connection.isValid()) {
+ i.remove();
+ }
+ }
+ }
+
+ private FS4Connection registerAsActiveIfNonZero(FS4Connection connection) {
+ activeConnections.incrementAndGet();
+ passiveConnections.decrementAndGet();
+ return connection;
+ }
+
+ public FS4Connection getConnection() {
+ return registerAsActiveIfNonZero(connections.poll());
+ }
+
+ void releaseConnection(FS4Connection connection) {
+ assert(connection != null);
+ activeConnections.decrementAndGet();
+ if (connection.isValid()) {
+ passiveConnections.incrementAndGet();
+ connections.add(connection);
+ }
+ }
+
+ void createdConnection() {
+ activeConnections.incrementAndGet();
+ }
+
+ int activeConnections() {
+ return activeConnections.get();
+ }
+
+ //unused connections in the pool
+ int passiveConnections() {
+ return passiveConnections.get();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java
new file mode 100644
index 00000000000..adfc63d02f7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java
@@ -0,0 +1,255 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.fs4.mplex;
+
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.Packet;
+import com.yahoo.search.Query;
+import com.yahoo.search.dispatch.ResponseMonitor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * This class is used to represent a "channel" in the FS4 protocol.
+ * A channel represents a session between a client and the fdispatch.
+ * Internally this class has a response queue used by the backend
+ * for queueing up FS4 packets that belong to this channel (or
+ * <em>session</em>, which might be a more appropriate name for it).
+ * Outbound packets are handed off to the FS4Connection.
+ *
+ * @author Bjorn Borud
+ */
+public class FS4Channel {
+
+ private static Logger log = Logger.getLogger(FS4Channel.class.getName());
+
+ private Integer channelId;
+ private Backend backend;
+ volatile private BlockingQueue<BasicPacket> responseQueue;
+ private Query query;
+ private boolean isPingChannel = false;
+ private ResponseMonitor<FS4Channel> monitor;
+
+ /** for unit testing. do not use */
+ protected FS4Channel () {
+ }
+
+ protected FS4Channel(Backend backend, Integer channelId) {
+ this.channelId = channelId;
+ this.backend = backend;
+ this.responseQueue = new LinkedBlockingQueue<>();
+ }
+
+ static public FS4Channel createPingChannel(Backend backend) {
+ FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0));
+ pingChannel.isPingChannel = true;
+ return pingChannel;
+ }
+
+ /** Set the query currently associated with this channel */
+ public void setQuery(Query query) {
+ this.query = query;
+ }
+
+ /** Get the query currently associated with this channel */
+ public Query getQuery() {
+ return query;
+ }
+
+ /** Returns the (fs4) channel id */
+ public Integer getChannelId () {
+ return channelId;
+ }
+
+ /**
+ * Closes the channel
+ */
+ public void close () {
+ BlockingQueue<BasicPacket> q = responseQueue;
+ responseQueue = null;
+ query = null;
+ if (isPingChannel) {
+ backend.removePingChannel();
+ } else {
+ backend.removeChannel(channelId);
+ }
+ if (q != null) {
+ q.clear();
+ }
+ }
+
+ /**
+ * Legacy interface.
+ */
+ public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException {
+ ensureValid();
+ return backend.sendPacket(packet, channelId);
+ }
+
+ /**
+ * Receives the given number of packets and returns them, OR
+ * <ul>
+ * <li>Returns a smaller number of packets if an error or eol packet is received
+ * <li>Throws a ChannelTimeoutException if timeout occurs before all packets
+ * are received. Packets received with the wrong channel id are ignored.
+ * </ul>
+ *
+ * @param timeout the number of ms to attempt to get packets before throwing an exception
+ * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error
+ */
+ public BasicPacket[] receivePackets(long timeout, int packetCount)
+ throws InvalidChannelException, ChannelTimeoutException {
+ ensureValid();
+
+ List<BasicPacket> packets = new ArrayList<>(12);
+ long startTime = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeout;
+
+ try {
+ while (timeLeft >= 0) {
+ BasicPacket p = nextPacket(timeLeft);
+ if (p == null) throw new ChannelTimeoutException("Timed out");
+
+ if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) {
+ log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId());
+ continue;
+ }
+
+ packets.add(p);
+ if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) {
+ BasicPacket[] packetArray = new BasicPacket[packets.size()];
+ packets.toArray(packetArray);
+ return packetArray;
+ }
+
+ // doing this last might save us one system call for the last
+ // packet.
+ timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime);
+ }
+ }
+ catch (InvalidChannelException e) {
+ // nop. if we get this we want to return the default
+ // zero length packet array indicating that we have no
+ // valid response
+ log.info("FS4Channel was invalid. timeLeft="
+ + timeLeft + ", timeout=" + timeout);
+ }
+ catch (InterruptedException e) {
+ log.info("FS4Channel was interrupted. timeLeft="
+ + timeLeft + ", timeout=" + timeout);
+ Thread.currentThread().interrupt();
+ }
+
+ // default return, we only hit this if we timed out and
+ // did not get the end of the packet stream
+ throw new ChannelTimeoutException();
+ }
+
+ private static boolean hasEnoughPackets(int packetCount,List<BasicPacket> packets) {
+ if (packetCount<0) return false;
+ return packets.size()>=packetCount;
+ }
+
+ /**
+ * Returns true if we will definitely receive more packets on this stream
+ *
+ * Shouldn't that be "_not_ receive more packets"?
+ */
+ private static boolean isLastPacket (BasicPacket packet) {
+ if (packet instanceof com.yahoo.fs4.ErrorPacket) return true;
+ if (packet instanceof com.yahoo.fs4.EolPacket) return true;
+ if (packet instanceof com.yahoo.fs4.PongPacket) return true;
+ return false;
+ }
+
+ /**
+ * Return the next available packet from the response queue. If there
+ * are no packets available we wait a maximum of <code>timeout</code>
+ * milliseconds before returning a <code>null</code>
+ *
+ * @param timeout Number of milliseconds to wait for a packet
+ * to become available.
+ *
+ * @return Returns the next available <code>BasicPacket</code> or
+ * <code>null</code> if we timed out.
+ */
+ public BasicPacket nextPacket(long timeout)
+ throws InterruptedException, InvalidChannelException
+ {
+ return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Add incoming packet to the response queue. This is to be used
+ * by the listener for placing incoming packets in the response
+ * queue.
+ *
+ * @param packet BasicPacket to be placed in the response queue.
+ *
+ */
+ protected void addPacket (BasicPacket packet)
+ throws InterruptedException, InvalidChannelException
+ {
+ ensureValidQ().put(packet);
+ notifyMonitor();
+ }
+
+ /**
+ * A valid FS4Channel is one that has not yet been closed.
+ *
+ * @return Returns <code>true</code> if the FS4Channel is valid.
+ */
+ public boolean isValid () {
+ return responseQueue != null;
+ }
+
+ /**
+ * This method is called whenever we want to perform an operation
+ * which assumes that the FS4Channel object is valid. An exception
+ * is thrown if the opposite turns out to be the case.
+ *
+ * @throws InvalidChannelException if the channel is no longer valid.
+ */
+ private void ensureValid () throws InvalidChannelException {
+ if (isValid()) {
+ return;
+ }
+ throw new InvalidChannelException("Channel is no longer valid");
+ }
+
+ /**
+ * This method is called whenever we want to perform an operation
+ * which assumes that the FS4Channel object is valid. An exception
+ * is thrown if the opposite turns out to be the case.
+ *
+ * @throws InvalidChannelException if the channel is no longer valid.
+ */
+ private BlockingQueue<BasicPacket> ensureValidQ () throws InvalidChannelException {
+ BlockingQueue<BasicPacket> q = responseQueue;
+ if (q != null) {
+ return q;
+ }
+ throw new InvalidChannelException("Channel is no longer valid");
+ }
+
+ public String toString() {
+ return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]");
+ }
+
+ public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) {
+ this.monitor = monitor;
+ }
+
+ protected void notifyMonitor() {
+ if(monitor != null) {
+ monitor.responseAvailable(this);
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java
new file mode 100644
index 00000000000..7dcbefde9fa
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java
@@ -0,0 +1,371 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.fs4.mplex;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.BufferTooSmallException;
+import com.yahoo.fs4.PacketDecoder;
+import com.yahoo.fs4.PacketListener;
+import com.yahoo.io.Connection;
+import com.yahoo.io.Listener;
+import com.yahoo.log.LogLevel;
+
+/**
+ *
+ * This class is used to represent a connection to an fdispatch
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class FS4Connection implements Connection
+{
+ private static Logger log = Logger.getLogger(FS4Connection.class.getName());
+ private Backend backend;
+ private Listener listener;
+ private SocketChannel channel;
+
+ private boolean shouldWrite = false;
+
+ private static int idCounter = 1;
+ private int idNumber;
+
+ // outbound data
+ private ByteBuffer writeBuffer;
+ private LinkedList<ByteBuffer> writeBufferList = new LinkedList<>();
+
+ // inbound data
+ private ByteBuffer fixedReadBuffer = ByteBuffer.allocateDirect(256 * 1024);
+ private ByteBuffer readBuffer = fixedReadBuffer;
+
+ private volatile boolean valid = true;
+
+ private final PacketListener packetListener;
+
+
+ /**
+ * Create an FS4 Connection.
+ */
+ public FS4Connection (SocketChannel channel, Listener listener, Backend backend, PacketListener packetListener) {
+ this.backend = backend;
+ this.listener = listener;
+ this.channel = channel;
+ this.idNumber = idCounter++;
+ this.packetListener = packetListener;
+
+ log.log(Level.FINER, "new: "+this+", id="+idNumber + ", address=" + backend.getAddress());
+ }
+
+
+ /**
+ * Packet sending interface.
+ */
+ public void sendPacket (BasicPacket packet, Integer channelId) throws IOException {
+ ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc());
+ ByteBuffer viewForPacketListener = buffer.slice();
+ synchronized (this) {
+ if (!(valid && channel.isOpen())) {
+ throw new IllegalStateException("Connection is not valid. " +
+ "Address = " + backend.getAddress() +
+ ", valid = " + valid +
+ ", isOpen = " + channel.isOpen());
+ }
+
+ if (writeBuffer == null) {
+ writeBuffer = buffer;
+ } else {
+ writeBufferList.addLast(buffer);
+ enableWrite();
+ }
+ write();
+ }
+
+ if (packetListener != null)
+ packetListener.packetSent(backend.getChannel(channelId), packet, viewForPacketListener);
+ }
+
+
+ /**
+ * The write event handler. This can be called both from the client
+ * thread and from the IO thread, so it needs to be synchronized. It
+ * assumes that IO is nonblocking, and will attempt to keep writing
+ * data until the system won't accept more data.
+ *
+ */
+ public synchronized void write () throws IOException {
+ if (! channel.isOpen()) {
+ throw new IllegalStateException("Channel not open in write(), address=" + backend.getAddress());
+ }
+
+ try {
+ int bytesWritten = 0;
+ boolean isFinished = false;
+ do {
+ // if writeBuffer is not set we need to fetch the next buffer
+ if (writeBuffer == null) {
+
+ // if the list is empty, signal the selector we do not need
+ // to do any writing for a while yet and bail
+ if (writeBufferList.isEmpty()) {
+ disableWrite();
+ isFinished = true;
+ break;
+ }
+ writeBuffer = writeBufferList.removeFirst();
+ }
+
+ // invariants: we have a writeBuffer
+ bytesWritten = channel.write(writeBuffer);
+
+ // buffer drained so we forget it and see what happens when we
+ // go around. if indeed we go around
+ if (!writeBuffer.hasRemaining()) {
+ writeBuffer.clear();
+ backend.getBufferPool().free(writeBuffer);
+ writeBuffer = null;
+ }
+ } while (bytesWritten > 0);
+ if (!isFinished) {
+ enableWrite();
+ }
+ } catch (IOException e) {
+ log.log(LogLevel.DEBUG, "Failed writing to channel for backend " + backend.getAddress() +
+ ". Closing channel", e);
+ try {
+ close();
+ } catch (IOException ignored) {}
+
+ throw e;
+ }
+ }
+
+
+ private void disableWrite() {
+ if (shouldWrite) {
+ listener.modifyInterestOpsBatch(this, SelectionKey.OP_WRITE, false);
+ shouldWrite = false;
+ }
+ }
+
+
+ private void enableWrite() {
+ if (!shouldWrite) {
+ listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true);
+ shouldWrite = true;
+ }
+ }
+
+
+
+ public void read () throws IOException {
+ if (! channel.isOpen()) {
+ throw new IOException("Channel not open in read(), address=" + backend.getAddress());
+ }
+
+ int bytesRead = 0;
+
+ do {
+ try {
+ if (readBuffer == fixedReadBuffer) {
+ bytesRead = channel.read(readBuffer);
+ } else {
+ fixedReadBuffer.clear();
+ if (readBuffer.remaining() < fixedReadBuffer.capacity()) {
+ fixedReadBuffer.limit(readBuffer.remaining());
+ }
+ bytesRead = channel.read(fixedReadBuffer);
+ fixedReadBuffer.flip();
+ readBuffer.put(fixedReadBuffer);
+ fixedReadBuffer.clear();
+ }
+ }
+ catch (IOException e) {
+ // this is the "normal" way that connection closes.
+ log.log(Level.FINER, "Read exception address=" + backend.getAddress() + " id="+idNumber+": "+
+ e.getClass().getName()+" / ", e);
+ bytesRead = -1;
+ }
+
+ // end of file
+ if (bytesRead == -1) {
+ log.log(LogLevel.DEBUG, "Dispatch closed connection"
+ + " (id="+idNumber+", address=" + backend.getAddress() + ")");
+ try {
+ close();
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Close failed, address=" + backend.getAddress(), e);
+ }
+ }
+
+ // no more read
+ if (bytesRead == 0) {
+ // buffer too small?
+ if (! readBuffer.hasRemaining()) {
+ log.fine("Buffer possibly too small, extending");
+ readBuffer.flip();
+ extendReadBuffer(readBuffer.capacity() * 2);
+ }
+ }
+
+ } while (bytesRead > 0);
+
+ readBuffer.flip();
+
+ // hand off packet extraction
+ extractPackets(readBuffer);
+ }
+
+ private void extractPackets(ByteBuffer readBuffer) {
+ for (;;) {
+ PacketDecoder.DecodedPacket packet = null;
+ try {
+ FS4Channel receiver = null;
+ int queryId = PacketDecoder.sniffChannel(readBuffer);
+ if (queryId == 0) {
+ if (PacketDecoder.isPongPacket(readBuffer))
+ receiver = backend.getPingChannel();
+ }
+ else {
+ receiver = backend.getChannel(Integer.valueOf(queryId));
+ }
+ packet = PacketDecoder.extractPacket(readBuffer);
+
+ if (packet != null)
+ packetListener.packetReceived(receiver, packet.packet, packet.consumedBytes);
+ }
+ catch (BufferTooSmallException e) {
+ log.fine("Unable to decode, extending readBuffer");
+ extendReadBuffer(PacketDecoder.packetLength(readBuffer));
+ return;
+ }
+
+ // break out of loop if we did not get a packet out of the
+ // buffer so we can select and read some more
+ if (packet == null) {
+
+ // if the buffer has been cleared, we can do a reset
+ // of the readBuffer
+ if ((readBuffer.position() == 0)
+ && (readBuffer.limit() == readBuffer.capacity()))
+ {
+ resetReadBuffer();
+ }
+ break;
+ }
+
+ backend.receivePacket(packet.packet);
+ }
+ }
+
+ /**
+ * This is called when we close the connection to do any
+ * pending cleanup work. Closing a connection marks it as
+ * not valid.
+ */
+ public void close () throws IOException {
+ valid = false;
+ channel.close();
+ log.log(Level.FINER, "invalidated id="+idNumber + " address=" + backend.getAddress());
+ }
+
+ /**
+ * Upon asynchronous connect completion this method is called by
+ * the Listener.
+ */
+ public void connect() throws IOException {
+ throw new RuntimeException("connect() was called, address=" + backend.getAddress() + ". "
+ + "asynchronous connect in NIO is flawed!");
+ }
+
+ /**
+ * Since we are performing an asynchronous connect we are initially
+ * only interested in the <code>OP_CONNECT</code> event.
+ */
+ public int selectOps () {
+ return SelectionKey.OP_READ;
+ }
+
+ /**
+ * Return the underlying SocketChannel used by this connection.
+ */
+ public SocketChannel socketChannel() {
+ return channel;
+ }
+
+
+ public String toString () {
+ return FS4Connection.class.getName() + "/" + channel;
+ }
+
+
+ //============================================================
+ //==== readbuffer management
+ //============================================================
+
+
+ /**
+ * Extend the readBuffer. Make a new buffer of the requested size
+ * copy the contents of the readBuffer into it and assign reference
+ * to readBuffer instance variable.
+ *
+ * <P>
+ * <b>The readBuffer needs to be in "readable" (flipped) state before
+ * this is called and it will be in the "writeable" state when it
+ * returns.</b>
+ */
+ private void extendReadBuffer (int size) {
+ // we specifically check this because packetLength() can return -1
+ // and someone might alter the code so that we do in fact get -1
+ // ...which never happens as the code is now
+ //
+ if (size == -1) {
+ throw new RuntimeException("Invalid buffer size requested: -1");
+ }
+
+ // if we get a size that is smaller than the current
+ // readBuffer capacity we just double it. not sure how wise this
+ // might be.
+ //
+ if (size < readBuffer.capacity()) {
+ size = readBuffer.capacity() * 2;
+ }
+
+ ByteBuffer tmp = ByteBuffer.allocate(size);
+ tmp.put(readBuffer);
+ log.fine("Extended readBuffer to " + size + " bytes"
+ + "from " + readBuffer.capacity() + " bytes");
+ readBuffer = tmp;
+ }
+
+ /**
+ * Clear the readBuffer, and if temporarily allocated bigger
+ * buffer is in use: ditch it and reset the reference to the
+ * fixed readBuffer.
+ */
+ private void resetReadBuffer () {
+ fixedReadBuffer.clear();
+ if (readBuffer == fixedReadBuffer) {
+ return;
+ }
+ log.fine("Resetting readbuffer");
+ readBuffer = fixedReadBuffer;
+ }
+
+ /**
+ * This method is used to determine whether the connection is still
+ * viable or not. All connections are initially valid, but they
+ * become invalid if we close the connection or something bad happens
+ * and the connection needs to be ditched.
+ */
+ public boolean isValid() {
+ return valid;
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java
new file mode 100644
index 00000000000..6176d069645
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java
@@ -0,0 +1,15 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// -*- mode: java; folded-file: t; c-basic-offset: 4 -*-
+
+package com.yahoo.fs4.mplex;
+
+/**
+ * @author <a href="mailto:borud@yahoo-inc.com">Bj\u00f8rn Borud</a>
+ */
+@SuppressWarnings("serial")
+public class InvalidChannelException extends Exception
+{
+ public InvalidChannelException (String message) {
+ super(message);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java
new file mode 100644
index 00000000000..d76d270dcb7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java
@@ -0,0 +1,56 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.fs4.mplex;
+
+import com.yahoo.io.FatalErrorHandler;
+import com.yahoo.io.Listener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Pool of com.yahoo.io.Listener instances for shared use by Vespa backend
+ * searchers.
+ *
+ * @author baldersheim
+ * @since 5.3.0
+ */
+public final class ListenerPool {
+ private final static Logger logger = Logger.getLogger(ListenerPool.class.getName());
+ private final List<Listener> listeners;
+
+ public ListenerPool(String name, int numListeners) {
+ listeners = new ArrayList<>(numListeners);
+ FatalErrorHandler fatalErrorHandler = new FatalErrorHandler();
+ for (int i = 0; i < numListeners; i++) {
+ Listener listener = new Listener(name + "-" + i);
+ listener.setFatalErrorHandler(fatalErrorHandler);
+ listener.start();
+ listeners.add(listener);
+ }
+ }
+
+ public Listener get(int index) {
+ return listeners.get(index);
+ }
+
+ public int size() {
+ return listeners.size();
+ }
+
+ public void close() {
+ for (Listener listener : listeners) {
+ listener.interrupt();
+ }
+ try {
+ for (Listener listener : listeners) {
+ listener.join();
+ }
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, "Got interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}