aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java')
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java449
1 files changed, 0 insertions, 449 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
deleted file mode 100644
index 12f8e9e387d..00000000000
--- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
+++ /dev/null
@@ -1,449 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.fs4.mplex;
-
-import com.yahoo.fs4.BasicPacket;
-import com.yahoo.fs4.Packet;
-import com.yahoo.fs4.PacketDumper;
-import com.yahoo.fs4.PacketListener;
-import com.yahoo.fs4.PacketNotificationsBroadcaster;
-import com.yahoo.fs4.PacketQueryTracer;
-import com.yahoo.io.Connection;
-import com.yahoo.io.ConnectionFactory;
-import com.yahoo.io.Listener;
-import com.yahoo.vespa.defaults.Defaults;
-import com.yahoo.yolean.Exceptions;
-import com.yahoo.yolean.concurrent.ConcurrentResourcePool;
-import com.yahoo.yolean.concurrent.ResourceFactory;
-import com.yahoo.yolean.concurrent.ResourcePool;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * @author Bjorn Borud
- */
-public class Backend implements ConnectionFactory {
-
- private static int DEFAULT_BUFFER_SIZE = 0x8000;
-
- public static final class BackendStatistics {
-
- public final int activeConnections;
- public final int passiveConnections;
-
- public BackendStatistics(int activeConnections, int passiveConnections) {
- this.activeConnections = activeConnections;
- this.passiveConnections = passiveConnections;
- }
-
- @Override
- public String toString() {
- return activeConnections + "/" + totalConnections();
- }
-
- public int totalConnections() {
- return activeConnections + passiveConnections;
- }
- }
-
- private static final Logger log = Logger.getLogger(Backend.class.getName());
-
- private final ListenerPool listeners;
- private final InetSocketAddress address;
- private final String host;
- private final int port;
- private final Map<Integer, FS4Channel> activeChannels = new HashMap<>();
- private int channelId = 0;
- private boolean shutdownInitiated = false;
-
- /** Whether we are currently in the state of not being able to connect, to avoid repeated logging */
- private boolean areInSocketNotConnectableState = false;
-
- private final LinkedList<FS4Channel> pingChannels = new LinkedList<>();
- private final PacketListener packetListener;
- private final ConnectionPool connectionPool;
- private final PacketDumper packetDumper;
- private final AtomicInteger connectionCount = new AtomicInteger(0);
- private final ConcurrentResourcePool<ByteBuffer> byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() {
- @Override
- public ByteBuffer create() {
- return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
- }
- });
-
- /**
- * For unit testing. do not use
- */
- protected Backend() {
- listeners = null;
- host = null;
- port = 0;
- packetListener = null;
- packetDumper = null;
- address = null;
- connectionPool = new ConnectionPool();
- }
-
- public Backend(String host,
- int port,
- String serverDiscriminator,
- ListenerPool listenerPool,
- ConnectionPool connectionPool) {
- String fileNamePattern = "qrs." + serverDiscriminator + '.' + host + ":" + port + ".%s" + ".dump";
- packetDumper = new PacketDumper(new File(Defaults.getDefaults().underVespaHome("logs/vespa/qrs/")),
- fileNamePattern);
- packetListener = new PacketNotificationsBroadcaster(packetDumper, new PacketQueryTracer());
- this.listeners = listenerPool;
- this.host = host;
- this.port = port;
- address = new InetSocketAddress(host, port);
- this.connectionPool = connectionPool;
- }
-
- private void logWarning(String attemptDescription, Exception e) {
- log.log(Level.WARNING, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e));
- }
-
- private void logInfo(String attemptDescription, Exception e) {
- log.log(Level.INFO, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e));
- }
-
- // ============================================================
- // ==== connection pool stuff
- // ============================================================
-
- /**
- * Fetch a connection from the connection pool. If the pool
- * is empty we create a connection.
- */
- private FS4Connection getConnection() throws IOException {
- FS4Connection connection = connectionPool.getConnection();
- if (connection == null) {
- // if pool was empty create one:
- connection = createConnection();
- }
- return connection;
- }
-
- ConcurrentResourcePool<ByteBuffer> getBufferPool() {
- return byteBufferResourcePool;
- }
-
- /**
- * Return a connection to the connection pool. If the
- * connection is not valid anymore we drop it, ie. do not
- * put it into the pool.
- */
- public void returnConnection(FS4Connection connection) {
- connectionPool.releaseConnection(connection);
- }
-
- /**
- * Create a new connection to the target for this backend.
- */
- private FS4Connection createConnection() throws IOException {
- SocketChannel socket = SocketChannel.open();
- try {
- connectSocket(socket);
- } catch (Exception e) {
- // was warning, see VESPA-1922
- if ( ! areInSocketNotConnectableState) {
- logInfo("connecting to", e);
- }
- areInSocketNotConnectableState = true;
- socket.close();
- return null;
- }
- areInSocketNotConnectableState = false;
- int listenerId = connectionCount.getAndIncrement()%listeners.size();
- Listener listener = listeners.get(listenerId);
- FS4Connection connection = new FS4Connection(socket, listener, this, packetListener);
- listener.registerConnection(connection);
-
- log.fine("Created new connection to " + host + ":" + port);
- connectionPool.createdConnection();
- return connection;
- }
-
- private void connectSocket(SocketChannel socket) throws IOException {
- socket.configureBlocking(false);
-
- boolean connected = socket.connect(address);
-
- // wait for connection
- if (!connected) {
- long timeBarrier = System.currentTimeMillis() + 20L;
- while (true) {
- try {
- Thread.sleep(5L);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Received InterruptedException while waiting for socket to connect.", e);
- }
- // don't care whether it's spurious wakeup
- connected = socket.finishConnect();
- if (connected || System.currentTimeMillis() > timeBarrier) {
- break;
- }
- }
- }
-
- // did we get a connection?
- if ( !connected) {
- throw new IllegalArgumentException("Could not create connection to dispatcher on "
- + address.getHostName() + ":" + address.getPort());
- }
- socket.socket().
- setTcpNoDelay(true);
- }
-
-
- //============================================================
- //==== channel management
- //============================================================
-
- /** Opens a new channel to fdispatch. Analogous to the "Channel" concept as used in FS4. */
- public FS4Channel openChannel() {
- int cachedChannelId;
- synchronized (this) {
- if (channelId >= ((1 << 31) - 2)) {
- channelId = 0;
- }
- cachedChannelId = channelId;
- channelId += 2;
- }
- Integer id = cachedChannelId;
- FS4Channel chan = new FS4Channel(this, id);
- synchronized (activeChannels) {
- activeChannels.put(id, chan);
- }
- return chan;
- }
-
- public FS4Channel openPingChannel() {
- FS4Channel chan = FS4Channel.createPingChannel(this);
- synchronized (pingChannels) {
- pingChannels.add(chan);
- }
- return chan;
- }
-
- /**
- * Get the remote address for this Backend. This method
- * has package access only, because it is really only of
- * importance to FS4Channel for writing slightly more sensible
- * log messages.
- * @return Returns the address (host, port) for this Backend.
- */
- InetSocketAddress getAddress() {
- return address;
- }
-
- /**
- * Get an active channel by id.
- *
- * @param id the (fs4) channel id
- * @return returns the (fs4) channel associated with this id
- * or <code>null</code> if the channel is not in the
- * set of active channels.
- */
- public FS4Channel getChannel(Integer id) {
- synchronized (activeChannels) {
- return activeChannels.get(id);
- }
- }
-
- /**
- * Return the first channel in the queue waiting for pings or
- * <code>null</code> if none.
- */
- public FS4Channel getPingChannel() {
- synchronized (pingChannels) {
- return (pingChannels.isEmpty()) ? null : pingChannels.getFirst();
- }
- }
-
- /**
- * Get an active channel by id. This is a wrapper for the method
- * that takes the id as an Integer.
- *
- * @param id The (fs4) channel id
- * @return Returns the (fs4) channel associated with this id
- * or <code>null</code> if the channel is not in the
- * set of active channels.
- */
- public FS4Channel getChannel(int id) {
- return getChannel(Integer.valueOf(id));
- }
-
- /**
- * Remove a channel. We do not want this method to be called
- * directly by the client -- removal of channels should be done
- * by calling the close() method of the channel.
- *
- * @param id The (fs4) channel id
- * @return Removes and returns the (fs4) channel associated
- * with this id or <code>null</code> if the channel is
- * not in the set of active channels.
- */
- protected FS4Channel removeChannel(Integer id) {
- synchronized (activeChannels) {
- return activeChannels.remove(id);
- }
- }
-
- /**
- * Remove a ping channel. We do not want this method to be called
- * directly by the client -- removal of channels should be done
- * by calling the close() method of the channel.
- *
- * @return Removes and returns the (fs4) channel first in
- * the queue of ping channels or <code>null</code>
- * if there are no active ping channels.
- */
- protected FS4Channel removePingChannel() {
- synchronized (pingChannels) {
- if (pingChannels.isEmpty())
- return null;
- return pingChannels.removeFirst();
- }
- }
- //============================================================
- //==== packet sending and reception
- //============================================================
-
- protected boolean sendPacket(BasicPacket packet, Integer channelId) throws IOException {
- if (shutdownInitiated) {
- log.fine("Tried to send packet after shutdown initiated. Ignored.");
- return false;
- }
-
- FS4Connection connection = null;
- try {
- connection = getConnection();
- if (connection == null) {
- return false;
- }
- connection.sendPacket(packet, channelId);
- }
- finally {
- if (connection != null) {
- returnConnection(connection);
- }
- }
-
- return true;
- }
-
- /**
- * When a connection receives a packet, it uses this method to
- * dispatch the packet to the right FS4Channel. If the corresponding
- * FS4Channel does not exist the packet is dropped and a message is
- * logged saying so.
- */
- protected void receivePacket(BasicPacket packet) {
- FS4Channel fs4;
- if (packet.hasChannelId())
- fs4 = getChannel(((Packet)packet).getChannel());
- else
- fs4 = getPingChannel();
-
- // channel does not exist
- if (fs4 == null) {
- return;
- }
- try {
- fs4.addPacket(packet);
- }
- catch (InterruptedException e) {
- log.info("Interrupted during packet adding. Packet = " + packet.toString());
- Thread.currentThread().interrupt();
- }
- catch (InvalidChannelException e) {
- log.log(Level.WARNING, "Channel was invalid. Packet = " + packet.toString()
- + " Backend probably sent data pertaining an old request,"
- + " system may be overloaded.");
- }
- }
-
- /**
- * Attempt to establish a connection without sending messages and then
- * return it to the pool. The assumption is that if the probing is
- * successful, the connection will be used soon after. There should be
- * minimal overhead since the connection is cached.
- */
- public boolean probeConnection() {
- if (shutdownInitiated) {
- return false;
- }
-
- FS4Connection connection = null;
- try {
- connection = getConnection();
- } catch (IOException ignored) {
- // connection is null
- } finally {
- if (connection != null) {
- returnConnection(connection);
- }
- }
-
- return connection != null;
- }
-
- /**
- * This method should be used to ensure graceful shutdown of the backend.
- */
- public void shutdown() {
- log.fine("shutting down");
- if (shutdownInitiated) {
- throw new IllegalStateException("Shutdown already in progress");
- }
- shutdownInitiated = true;
- }
-
- public void close() {
- for (Connection c = connectionPool.getConnection(); c != null; c = connectionPool.getConnection()) {
- try {
- c.close();
- } catch (IOException e) {
- logWarning("closing", e);
- }
- }
- }
-
- /**
- * Connection factory used by the Listener class.
- */
- public Connection newConnection(SocketChannel channel, Listener listener) {
- return new FS4Connection(channel, listener, this, packetListener);
- }
-
- public String toString () {
- return("Backend/" + host + ":" + port);
- }
-
- public BackendStatistics getStatistics() {
- synchronized (connectionPool) { //ensure consistent values
- return new BackendStatistics(connectionPool.activeConnections(), connectionPool.passiveConnections());
- }
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
-}