diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java | 38 |
1 files changed, 34 insertions, 4 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java index ed9eb72d7dd..f85a4019b78 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java @@ -5,6 +5,14 @@ import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.QrConfig; +import com.yahoo.container.search.Fs4Config; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.ConnectionPool; +import com.yahoo.fs4.mplex.ListenerPool; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -14,7 +22,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * All users will get the same pool instance. + * Provider for {@link com.yahoo.fs4.mplex.ListenerPool}. All users will get the same pool instance. * * @author baldersheim */ @@ -24,18 +32,22 @@ public class FS4ResourcePool extends AbstractComponent { private static final AtomicInteger instanceCounter = new AtomicInteger(0); private final String serverId; private final int instanceId; + private final ListenerPool listeners; + private final Timer timer = new Timer(); // This is a timer for cleaning the closed connections + private final Map<String, Backend> connectionPoolMap = new HashMap<>(); private final ExecutorService executor; private final ScheduledExecutorService scheduledExecutor; @Inject - public FS4ResourcePool(QrConfig config) { - this(config.discriminator()); + public FS4ResourcePool(Fs4Config fs4Config, QrConfig config) { + this(config.discriminator(), fs4Config.numlistenerthreads()); } - public FS4ResourcePool(String serverId) { + public FS4ResourcePool(String serverId, int listenerThreads) { this.serverId = serverId; instanceId = instanceCounter.getAndIncrement(); String name = "FS4-" + instanceId; + listeners = new ListenerPool(name, listenerThreads); executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(name)); scheduledExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryFactory.getDaemonThreadFactory(name + ".scheduled")); } @@ -45,10 +57,28 @@ public class FS4ResourcePool extends AbstractComponent { public ExecutorService getExecutor() { return executor; } public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; } + public Backend getBackend(String host, int port) { + String key = host + ":" + port; + synchronized (connectionPoolMap) { + Backend pool = connectionPoolMap.get(key); + if (pool == null) { + pool = new Backend(host, port, serverId, listeners, new ConnectionPool(timer)); + connectionPoolMap.put(key, pool); + } + return pool; + } + } + @Override public void deconstruct() { logger.log(Level.INFO, "Deconstructing FS4ResourcePool with id '" + instanceId + "'."); super.deconstruct(); + listeners.close(); + timer.cancel(); + for (Backend backend : connectionPoolMap.values()) { + backend.shutdown(); + backend.close(); + } executor.shutdown(); scheduledExecutor.shutdown(); try { |