diff options
author | Jon Bratseth <bratseth@oath.com> | 2018-07-05 23:04:45 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@oath.com> | 2018-07-05 23:04:45 +0200 |
commit | d0a999c508838c4bdb5ff1208a21c1830b349b2a (patch) | |
tree | bcb61d62e24a24d570b6fec1259fa517e47eec77 /container-search | |
parent | a4c151090af3664e588d253b2eb9ca9309f5eac9 (diff) |
Set distribution key whe dispatching to content node
Diffstat (limited to 'container-search')
14 files changed, 83 insertions, 47 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java b/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java index 075ec60c099..e5ab00fb139 100644 --- a/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java +++ b/container-search/src/main/java/com/yahoo/fs4/DocumentInfo.java @@ -28,9 +28,9 @@ public class DocumentInfo implements Cloneable { } public DocumentInfo(GlobalId globalId, int metric, int partId, int distributionKey) { - this.globalId=globalId; - this.metric=metric; - this.partId=partId; + this.globalId = globalId; + this.metric = metric; + this.partId = partId; this.distributionKey = distributionKey; } diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java index b18d1e88a4b..239101184ea 100644 --- a/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/QueryResultPacket.java @@ -125,7 +125,7 @@ public class QueryResultPacket extends Packet { soonActiveDocs = buffer.getLong(); degradedReason = buffer.getInt(); - decodeDocuments(buffer,documentCount); + decodeDocuments(buffer, documentCount); if (propsFeature) { int numMaps = buffer.getInt(); propsArray = new FS4Properties[numMaps]; diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java index 81192be035a..68a13c06e32 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java @@ -61,7 +61,7 @@ public class Backend implements ConnectionFactory { private final ConnectionPool connectionPool; private final PacketDumper packetDumper; private final AtomicInteger connectionCount = new AtomicInteger(0); - + private final Optional<Integer> distributionKey; /** * For unit testing. do not use @@ -74,9 +74,15 @@ public class Backend implements ConnectionFactory { packetDumper = null; address = null; connectionPool = new ConnectionPool(); + distributionKey = Optional.empty(); } - public Backend(String host, int port, String serverDiscriminator, ListenerPool listenerPool, ConnectionPool connectionPool) { + public Backend(String host, + int port, + String serverDiscriminator, + ListenerPool listenerPool, + ConnectionPool connectionPool, + Optional<Integer> distributionKey) { String fileNamePattern = "qrs." + serverDiscriminator + '.' + host + ":" + port + ".%s" + ".dump"; packetDumper = new PacketDumper(new File(Defaults.getDefaults().underVespaHome("logs/vespa/qrs/")), fileNamePattern); @@ -86,6 +92,7 @@ public class Backend implements ConnectionFactory { this.port = port; address = new InetSocketAddress(host, port); this.connectionPool = connectionPool; + this.distributionKey = distributionKey; } private void logWarning(String attemptDescription, Exception e) { @@ -96,11 +103,13 @@ public class Backend implements ConnectionFactory { log.log(Level.INFO, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); } + /** Returns the distribution key of the content node this represents, or empty if it is a dispatch node */ + public Optional<Integer> distributionKey() { return distributionKey; } + // ============================================================ // ==== connection pool stuff // ============================================================ - /** * Fetch a connection from the connection pool. If the pool * is empty we create a connection. @@ -187,10 +196,7 @@ public class Backend implements ConnectionFactory { //==== channel management //============================================================ - /** - * Open a new channel to fdispatch. Analogous to the "Channel" - * concept as used in FS4. - */ + /** Opens a new channel to fdispatch. Analogous to the "Channel" concept as used in FS4. */ public FS4Channel openChannel () { int cachedChannelId; synchronized (this) { diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java index b0fce14e73f..237b0cdb8e2 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java @@ -1,10 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - package com.yahoo.fs4.mplex; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -16,23 +16,18 @@ import com.yahoo.fs4.ChannelTimeoutException; import com.yahoo.fs4.Packet; import com.yahoo.search.Query; - - /** - * * This class is used to represent a "channel" in the FS4 protocol. * A channel represents a session between a client and the fdispatch. * Internally this class has a response queue used by the backend * for queueing up FS4 packets that belong to this channel (or * <em>session</em>, which might be a more appropriate name for it). - * - * <P> * Outbound packets are handed off to the FS4Connection. * * @author Bjorn Borud */ -public class FS4Channel -{ +public class FS4Channel { + private static Logger log = Logger.getLogger(FS4Channel.class.getName()); private Integer channelId; @@ -67,13 +62,14 @@ public class FS4Channel return query; } - /** - * @return returns an Integer representing the (fs4) channel id - */ + /** Returns the (fs4) channel id */ public Integer getChannelId () { return channelId; } + /** Returns the distribution key of the content node this represents, or empty if it is a dispatch node */ + public Optional<Integer> distributionKey() { return backend == null ? Optional.empty() : backend.distributionKey(); } + /** * Closes the channel */ diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/CacheControl.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/CacheControl.java index b7e16b6c082..33e7ad4bccc 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/CacheControl.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/CacheControl.java @@ -8,6 +8,8 @@ import com.yahoo.fs4.QueryResultPacket; import com.yahoo.search.Query; import com.yahoo.processing.request.CompoundName; +import java.util.Optional; + /** * The cache control logic for FastSearcher @@ -84,7 +86,7 @@ public class CacheControl { } } - void cache(CacheKey key, Query query, DocsumPacketKey[] packetKeys, Packet[] packets) { + void cache(CacheKey key, Query query, DocsumPacketKey[] packetKeys, Packet[] packets, Optional<Integer> distributionKey) { if ( ! activeCache) return; if (query.getNoCache()) return; @@ -92,7 +94,7 @@ public class CacheControl { PacketWrapper wrapper = lookup(key, query); if (wrapper == null) { - wrapper = new PacketWrapper(key, packetKeys,packets); + wrapper = new PacketWrapper(key, packetKeys, packets, distributionKey); long now = System.currentTimeMillis(); synchronized (packetCache) { packetCache.put(key, wrapper, now); diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java index 8d4e9418c1c..e933f4857b3 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java @@ -14,6 +14,7 @@ import com.yahoo.io.Connection; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Timer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -60,11 +61,14 @@ public class FS4ResourcePool extends AbstractComponent { } public Backend getBackend(String host, int port) { + return getBackend(host, port, Optional.empty()); + } + public Backend getBackend(String host, int port, Optional<Integer> distributionKey) { String key = host + ":" + port; synchronized (connectionPoolMap) { Backend pool = connectionPoolMap.get(key); if (pool == null) { - pool = new Backend(host, port, Server.get().getServerDiscriminator(), listeners, new ConnectionPool(timer)); + pool = new Backend(host, port, Server.get().getServerDiscriminator(), listeners, new ConnectionPool(timer), distributionKey); connectionPoolMap.put(key, pool); } return pool; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 30e2adab182..90410699748 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -229,8 +229,9 @@ public class FastSearcher extends VespaBackEndSearcher { // Dispatch directly to the single, local search node query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get()); - return fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), - directDispatchRecipient.get().fs4port()); + return fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), + directDispatchRecipient.get().fs4port(), + Optional.of(directDispatchRecipient.get().key())); } /** @@ -373,7 +374,6 @@ public class FastSearcher extends VespaBackEndSearcher { } private Result searchTwoPhase(FS4Channel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { - if (isLoggingFine()) getLogger().finest("sending query packet"); @@ -417,7 +417,8 @@ public class FastSearcher extends VespaBackEndSearcher { addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result, false); - addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey); + addUnfilledHits(result, resultPacket.getDocuments(), false, + queryPacket.getQueryPacketData(), cacheKey, channel.distributionKey()); Packet[] packets; PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); @@ -432,7 +433,7 @@ public class FastSearcher extends VespaBackEndSearcher { } else { packets = new Packet[1]; packets[0] = resultPacket; - cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets); + cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, channel.distributionKey()); } } return result; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/PacketWrapper.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/PacketWrapper.java index 2b1b7dd24cc..8e22bae430a 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/PacketWrapper.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/PacketWrapper.java @@ -7,6 +7,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.logging.Logger; import com.yahoo.fs4.BasicPacket; @@ -31,10 +32,13 @@ public class PacketWrapper implements Cloneable { private static Logger log = Logger.getLogger(PacketWrapper.class.getName()); - final int keySize; + private final int keySize; // associated result packets, sorted in regard to offset private ArrayList<BasicPacket> resultPackets = new ArrayList<>(3); // length = "some small number" - LinkedHashMap<DocsumPacketKey, BasicPacket> packets; + + private LinkedHashMap<DocsumPacketKey, BasicPacket> packets; + + private final Optional<Integer> distributionKey; private static class ResultPacketComparator<T extends BasicPacket> implements Comparator<T> { @Override @@ -47,11 +51,12 @@ public class PacketWrapper implements Cloneable { private static ResultPacketComparator<BasicPacket> resultPacketComparator = new ResultPacketComparator<>(); - public PacketWrapper(CacheKey key, DocsumPacketKey[] packetKeys, BasicPacket[] bpackets) { + public PacketWrapper(CacheKey key, DocsumPacketKey[] packetKeys, BasicPacket[] bpackets, Optional<Integer> distributionKey) { // Should not support key == null this.keySize = key.byteSize(); resultPackets.add(bpackets[0]); this.packets = new LinkedHashMap<>(); + this.distributionKey = distributionKey; Packet[] ppackets = new Packet[packetKeys.length]; for (int i = 0; i < packetKeys.length; i++) { @@ -72,6 +77,7 @@ public class PacketWrapper implements Cloneable { } resultPackets.add(packets[0]); this.packets = new LinkedHashMap<>(); + this.distributionKey = Optional.empty(); for (int i = 0; i < packets.length - 1; i++) { this.packets.put(new DocsumPacketKey(new GlobalId(new DocumentId("doc:test:" + i).getGlobalId()), i, null), packets[i + 1]); } @@ -87,7 +93,13 @@ public class PacketWrapper implements Cloneable { } /** - * @return list of documents, null if not all are available + * Returns the distribution key of the content node producing these hits, + * or empty if the hits were returned through dispatch + */ + public Optional<Integer> distributionKey() { return distributionKey; } + + /** + * Returns the list of documents, null if not all are available */ public List<DocumentInfo> getDocuments(int offset, int hits) { // speculatively allocate list for the hits diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index 34e8390892d..3e9a92ea0f7 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -137,7 +137,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { addMetaInfo(query, queryPacketData, resultPacket, result, true); if (packetWrapper.getNumPackets() == 0) - addUnfilledHits(result, documents, true, queryPacketData, key); + addUnfilledHits(result, documents, true, queryPacketData, key, packetWrapper.distributionKey()); else addCachedHits(result, packetWrapper, summaryClass, documents); return result; @@ -616,8 +616,16 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * * @param queryPacketData binary data from first phase of search, or null * @param cacheKey the key this hit should match in the packet cache, or null + * @param channelDistributionKey distribution key of the node producing these hits. + * Only set if produced directly by a search node, not dispatch + * (in which case it is not set in the received packets.) */ - boolean addUnfilledHits(Result result, List<DocumentInfo> documents, boolean fromCache, QueryPacketData queryPacketData, CacheKey cacheKey) { + boolean addUnfilledHits(Result result, + List<DocumentInfo> documents, + boolean fromCache, + QueryPacketData queryPacketData, + CacheKey cacheKey, + Optional<Integer> channelDistributionKey) { boolean allHitsOK = true; Query myQuery = result.getQuery(); @@ -634,6 +642,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { hit.setCached(fromCache); extractDocumentInfo(hit, document); + channelDistributionKey.ifPresent(hit::setDistributionKey); result.hits().add(hit); } catch (ConfigurationException e) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 654fd6cc68f..3eb010da555 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -62,8 +62,7 @@ public class Dispatcher extends AbstractComponent { this.client = new RpcClient(); this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus); - // Create node rpc connections, indexed by the legacy "partid", which allows us to bridge - // between fs4 calls (for search) and rpc calls (for summary fetch) + // Create node rpc connections, indexed by the node distribution key ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>(); for (DispatchConfig.Node node : dispatchConfig.node()) { nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port())); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index 9b2a24cd01f..efce2fdac9c 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -137,7 +137,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>(); for (DispatchConfig.Node node : dispatchConfig.node()) - nodesBuilder.add(new Node(node.host(), node.fs4port(), node.group())); + nodesBuilder.add(new Node(node.key(), node.host(), node.fs4port(), node.group())); return nodesBuilder.build(); } @@ -360,6 +360,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { /** A node in a search cluster. This class is multithread safe. */ public static class Node { + private final int key; private final String hostname; private final int fs4port; private final int group; @@ -367,12 +368,16 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private final AtomicBoolean working = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); - public Node(String hostname, int fs4port, int group) { + public Node(int key, String hostname, int fs4port, int group) { + this.key = key; this.hostname = hostname; this.fs4port = fs4port; this.group = group; } + /** Returns the unique and stable distribution key of this node */ + public int key() { return key; } + public String hostname() { return hostname; } public int fs4port() { return fs4port; } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index 2a91319a905..ec586e67763 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -115,8 +115,8 @@ public class FastSearcherTestCase { .name("simpler").hasRankFeatures(false).hasSummaryFeatures(false)))); List<SearchCluster.Node> nodes = new ArrayList<>(); - nodes.add(new SearchCluster.Node("host1", 5000, 0)); - nodes.add(new SearchCluster.Node("host2", 5000, 0)); + nodes.add(new SearchCluster.Node(0, "host1", 5000, 0)); + nodes.add(new SearchCluster.Node(2, "host2", 5000, 0)); MockFS4ResourcePool mockFs4ResourcePool = new MockFS4ResourcePool(); FastSearcher fastSearcher = new FastSearcher(new MockBackend(), @@ -442,7 +442,7 @@ public class FastSearcherTestCase { public void testSinglePassGroupingIsForcedWithSingleNodeGroups() { FastSearcher fastSearcher = new FastSearcher(new MockBackend(), new FS4ResourcePool(1), - new MockDispatcher(new SearchCluster.Node("host0", 123, 0)), + new MockDispatcher(new SearchCluster.Node(0, "host0", 123, 0)), new SummaryParameters(null), new ClusterParams("testhittype"), new CacheParams(100, 1e64), @@ -465,8 +465,8 @@ public class FastSearcherTestCase { @Test public void testSinglePassGroupingIsNotForcedWithSingleNodeGroups() { MockDispatcher dispatcher = - new MockDispatcher(ImmutableList.of(new SearchCluster.Node("host0", 123, 0), - new SearchCluster.Node("host1", 123, 0))); + new MockDispatcher(ImmutableList.of(new SearchCluster.Node(0, "host0", 123, 0), + new SearchCluster.Node(2, "host1", 123, 0))); FastSearcher fastSearcher = new FastSearcher(new MockBackend(), new FS4ResourcePool(1), diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java index 7c1f05e3e60..035710c612c 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java @@ -58,9 +58,10 @@ class FastSearcherTester { private static List<SearchCluster.Node> toNodes(String... hostAndPortAndGroupStrings) { List<SearchCluster.Node> nodes = new ArrayList<>(); + int key = 0; for (String s : hostAndPortAndGroupStrings) { String[] parts = s.split(":"); - nodes.add(new SearchCluster.Node(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]))); + nodes.add(new SearchCluster.Node(key++, parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]))); } return nodes; } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java index 997fbb198ea..704dcd31c0d 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/fs4mock/MockFS4ResourcePool.java @@ -7,6 +7,7 @@ import com.yahoo.prelude.fastsearch.FS4ResourcePool; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -25,7 +26,7 @@ public class MockFS4ResourcePool extends FS4ResourcePool { } @Override - public Backend getBackend(String hostname, int port) { + public Backend getBackend(String hostname, int port, Optional<Integer> distributionKey) { countRequest(hostname + ":" + port); if (nonRespondingBackends.contains(hostname)) return new MockBackend(hostname, NonWorkingMockFSChannel::new); |