summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java38
1 files changed, 34 insertions, 4 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java
index ed9eb72d7dd..f85a4019b78 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java
@@ -5,6 +5,14 @@ import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.QrConfig;
+import com.yahoo.container.search.Fs4Config;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.ConnectionPool;
+import com.yahoo.fs4.mplex.ListenerPool;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -14,7 +22,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * All users will get the same pool instance.
+ * Provider for {@link com.yahoo.fs4.mplex.ListenerPool}. All users will get the same pool instance.
*
* @author baldersheim
*/
@@ -24,18 +32,22 @@ public class FS4ResourcePool extends AbstractComponent {
private static final AtomicInteger instanceCounter = new AtomicInteger(0);
private final String serverId;
private final int instanceId;
+ private final ListenerPool listeners;
+ private final Timer timer = new Timer(); // This is a timer for cleaning the closed connections
+ private final Map<String, Backend> connectionPoolMap = new HashMap<>();
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
@Inject
- public FS4ResourcePool(QrConfig config) {
- this(config.discriminator());
+ public FS4ResourcePool(Fs4Config fs4Config, QrConfig config) {
+ this(config.discriminator(), fs4Config.numlistenerthreads());
}
- public FS4ResourcePool(String serverId) {
+ public FS4ResourcePool(String serverId, int listenerThreads) {
this.serverId = serverId;
instanceId = instanceCounter.getAndIncrement();
String name = "FS4-" + instanceId;
+ listeners = new ListenerPool(name, listenerThreads);
executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(name));
scheduledExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryFactory.getDaemonThreadFactory(name + ".scheduled"));
}
@@ -45,10 +57,28 @@ public class FS4ResourcePool extends AbstractComponent {
public ExecutorService getExecutor() { return executor; }
public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; }
+ public Backend getBackend(String host, int port) {
+ String key = host + ":" + port;
+ synchronized (connectionPoolMap) {
+ Backend pool = connectionPoolMap.get(key);
+ if (pool == null) {
+ pool = new Backend(host, port, serverId, listeners, new ConnectionPool(timer));
+ connectionPoolMap.put(key, pool);
+ }
+ return pool;
+ }
+ }
+
@Override
public void deconstruct() {
logger.log(Level.INFO, "Deconstructing FS4ResourcePool with id '" + instanceId + "'.");
super.deconstruct();
+ listeners.close();
+ timer.cancel();
+ for (Backend backend : connectionPoolMap.values()) {
+ backend.shutdown();
+ backend.close();
+ }
executor.shutdown();
scheduledExecutor.shutdown();
try {