diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-08-30 10:09:42 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-08-30 10:09:42 +0200 |
commit | 6e56ca5b6f2064094933859f530df849f2e28716 (patch) | |
tree | 2bd367366f23800f7f88436f3c45b319e3cba84d /container-search/src/main/java/com/yahoo/search | |
parent | 1f554f8bb745cb3b5ea60e9dd2aafd10ceb1f22e (diff) |
Create code path for extending java side dispatcher
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 23 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java | 27 |
2 files changed, 50 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 5ef81403f26..473f3932680 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -11,6 +11,7 @@ import com.yahoo.container.protect.Error; import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.slime.ArrayTraverser; import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.TimeoutException; @@ -28,6 +29,7 @@ import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -57,10 +59,15 @@ public class Dispatcher extends AbstractComponent { private final Compressor compressor = new Compressor(); + private final LoadBalancer loadBalancer; + private final FS4ResourcePool fs4ResourcePool; + public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { this.client = new RpcClient(); this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus); + this.fs4ResourcePool = fs4ResourcePool; + this.loadBalancer = new LoadBalancer(searchCluster); // Create node rpc connections, indexed by the node distribution key ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>(); @@ -75,6 +82,8 @@ public class Dispatcher extends AbstractComponent { this.searchCluster = null; this.nodeConnections = ImmutableMap.copyOf(nodeConnections); this.client = client; + this.fs4ResourcePool = null; + this.loadBalancer = new LoadBalancer(searchCluster); } /** Returns the search cluster this dispatches to */ @@ -275,4 +284,18 @@ public class Dispatcher extends AbstractComponent { } + public Optional<Backend> getDispatchBackend(Query query) { + Optional<SearchCluster.Group> groupInCluster = loadBalancer.getGroupForQuery(query); + + return groupInCluster.flatMap(group -> { + if(group.nodes().size() == 1) { + return Optional.of(group.nodes().get(0)); + } else { + return Optional.empty(); + } + }).map(node -> { + query.trace(false, 2, "Dispatching directly (anywhere) to ", node); + return fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())); + }); + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java new file mode 100644 index 00000000000..8e90eae0eb3 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -0,0 +1,27 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.search.Query; +import com.yahoo.search.dispatch.SearchCluster.Group; + +import java.util.Optional; + +public class LoadBalancer { + + private final SearchCluster searchCluster; + + public LoadBalancer(SearchCluster searchCluster) { + this.searchCluster = searchCluster; + } + + public Optional<Group> getGroupForQuery(Query query) { + if (searchCluster.groups().size() == 1) { + for(Group group: searchCluster.groups().values()) { + // since the number of groups is 1, this will run only once + if(group.nodes().size() == 1) { + return Optional.of(group); + } + } + } + return Optional.empty(); + } +} |