diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-11-28 22:51:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-28 22:51:10 +0100 |
commit | 6ebdeb2925f2cd050f2534cb1ca12631905a8e09 (patch) | |
tree | a9f91cb0c690c56fc1d7eb3f956edcc22ab81c16 /container-search/src/main | |
parent | 2dd15d21c181089027240feabe0984e994117402 (diff) | |
parent | 162af6427e8330b40cc3cbe3c953a6cd665130bf (diff) |
Merge pull request #7806 from vespa-engine/bratseth/avoid-server-discriminator
Bratseth/avoid server discriminator
Diffstat (limited to 'container-search/src/main')
12 files changed, 78 insertions, 43 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java b/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java index b8242ff5101..e3c10c1f8ce 100644 --- a/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/QueryPacket.java @@ -25,6 +25,7 @@ import java.util.List; */ public class QueryPacket extends Packet { + private final String serverId; private final Query query; private QueryPacketData queryPacketData; @@ -33,7 +34,8 @@ public class QueryPacket extends Packet { private int ignoreableOffset = 0; // Start of (hits/offset/timestamp) ignore section for cache key private int ignoreableSize = 0; // Length of (hits/offset/timestamp) ignore section for cache key - private QueryPacket(Query query) { + private QueryPacket(String serverId, Query query) { + this.serverId = serverId; this.query = query; } @@ -47,8 +49,8 @@ public class QueryPacket extends Packet { * * @param query the query to convert to a packet */ - public static QueryPacket create(Query query) { - return new QueryPacket(query); + public static QueryPacket create(String serverId, Query query) { + return new QueryPacket(serverId, query); } @@ -162,7 +164,7 @@ public class QueryPacket extends Packet { sessionOffset = buffer.position() - relativeZero; if (sendSessionKey) { - Utf8String key = query.getSessionId(true).asUtf8String(); + Utf8String key = query.getSessionId(serverId).asUtf8String(); sessionSize = key.getByteLength(); buffer.putInt(key.getByteLength()); buffer.put(key.getBytes()); diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index 3cb95fd0f7f..04a7d9a5303 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -144,7 +144,8 @@ public class ClusterSearcher extends Searcher { } if (searchClusterConfig.indexingmode() == STREAMING) { - VdsStreamingSearcher searcher = vdsCluster(searchClusterIndex, + VdsStreamingSearcher searcher = vdsCluster(fs4ResourcePool.getServerId(), + searchClusterIndex, searchClusterConfig, cacheParams, emulationConfig, docSumParams, documentDbConfig); addBackendSearcher(searcher); @@ -210,7 +211,8 @@ public class ClusterSearcher extends Searcher { documentdbInfoConfig); } - private static VdsStreamingSearcher vdsCluster(int searchclusterIndex, + private static VdsStreamingSearcher vdsCluster(String serverId, + int searchclusterIndex, QrSearchersConfig.Searchcluster searchClusterConfig, CacheParams cacheParams, LegacyEmulationConfig emulConfig, @@ -226,7 +228,7 @@ public class ClusterSearcher extends Searcher { searcher.setSearchClusterConfigId(searchClusterConfig.rankprofiles().configid()); searcher.setDocumentType(searchClusterConfig.searchdef(0)); searcher.setStorageClusterRouteSpec(searchClusterConfig.storagecluster().routespec()); - searcher.init(docSumParams, clusterParams, cacheParams, documentdbInfoConfig); + searcher.init(serverId, docSumParams, clusterParams, cacheParams, documentdbInfoConfig); return searcher; } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java index 90eb0b611bf..7453af55ec0 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java @@ -29,6 +29,8 @@ import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator; * @author ollivir */ public class FS4FillInvoker extends FillInvoker { + + private final String serverId; private final VespaBackEndSearcher searcher; private FS4Channel channel; @@ -36,17 +38,17 @@ public class FS4FillInvoker extends FillInvoker { private CacheKey summaryCacheKey = null; private DocsumPacketKey[] summaryPacketKeys = null; - public FS4FillInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port, - int distributionKey) { + public FS4FillInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port) { + this.serverId = fs4ResourcePool.getServerId(); this.searcher = searcher; - Backend backend = fs4ResourcePool.getBackend(hostname, port); this.channel = backend.openChannel(); channel.setQuery(query); } // fdispatch code path - public FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) { + public FS4FillInvoker(String serverId, VespaBackEndSearcher searcher, Query query, Backend backend) { + this.serverId = serverId; this.searcher = searcher; this.channel = backend.openChannel(); channel.setQuery(query); @@ -58,7 +60,7 @@ public class FS4FillInvoker extends FillInvoker { if (searcher.getCacheControl().useCache(channel.getQuery())) { summaryCacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass); if (summaryCacheKey == null) { - QueryPacket queryPacket = QueryPacket.create(channel.getQuery()); + QueryPacket queryPacket = QueryPacket.create(serverId, channel.getQuery()); summaryCacheKey = new CacheKey(queryPacket); } boolean cacheFound = cacheLookupTwoPhase(summaryCacheKey, result, summaryClass); diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index 5c2ac402dfa..fa5bd45bbbc 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -134,7 +134,7 @@ public class FS4InvokerFactory { } public FillInvoker getFillInvoker(Query query, Node node) { - return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()); + return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port()); } /** diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java index 51b3146a609..f85a4019b78 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java @@ -4,7 +4,7 @@ package com.yahoo.prelude.fastsearch; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.container.Server; +import com.yahoo.container.QrConfig; import com.yahoo.container.search.Fs4Config; import com.yahoo.fs4.mplex.Backend; import com.yahoo.fs4.mplex.ConnectionPool; @@ -30,6 +30,7 @@ public class FS4ResourcePool extends AbstractComponent { private static final Logger logger = Logger.getLogger(FS4ResourcePool.class.getName()); private static final AtomicInteger instanceCounter = new AtomicInteger(0); + private final String serverId; private final int instanceId; private final ListenerPool listeners; private final Timer timer = new Timer(); // This is a timer for cleaning the closed connections @@ -38,11 +39,12 @@ public class FS4ResourcePool extends AbstractComponent { private final ScheduledExecutorService scheduledExecutor; @Inject - public FS4ResourcePool(Fs4Config fs4Config) { - this(fs4Config.numlistenerthreads()); + public FS4ResourcePool(Fs4Config fs4Config, QrConfig config) { + this(config.discriminator(), fs4Config.numlistenerthreads()); } - public FS4ResourcePool(int listenerThreads) { + public FS4ResourcePool(String serverId, int listenerThreads) { + this.serverId = serverId; instanceId = instanceCounter.getAndIncrement(); String name = "FS4-" + instanceId; listeners = new ListenerPool(name, listenerThreads); @@ -50,19 +52,17 @@ public class FS4ResourcePool extends AbstractComponent { scheduledExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryFactory.getDaemonThreadFactory(name + ".scheduled")); } - public ExecutorService getExecutor() { - return executor; - } - public ScheduledExecutorService getScheduledExecutor() { - return scheduledExecutor; - } + /** Returns an unique identifier of the server this runs in */ + public String getServerId() { return serverId; } + public ExecutorService getExecutor() { return executor; } + public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; } public Backend getBackend(String host, int port) { String key = host + ":" + port; synchronized (connectionPoolMap) { Backend pool = connectionPoolMap.get(key); if (pool == null) { - pool = new Backend(host, port, Server.get().getServerDiscriminator(), listeners, new ConnectionPool(timer)); + pool = new Backend(host, port, serverId, listeners, new ConnectionPool(timer)); connectionPoolMap.put(key, pool); } return pool; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index eb9ab75c0d0..1760b2bbb43 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -30,6 +30,7 @@ import static java.util.Arrays.asList; * @author ollivir */ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<FS4Channel> { + private final VespaBackEndSearcher searcher; private FS4Channel channel; @@ -53,7 +54,7 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<F if(queryPacket == null) { // query changed for subchannel - queryPacket = searcher.createQueryPacket(query); + queryPacket = searcher.createQueryPacket(searcher.getServerId(), query); } this.query = query; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 209f6faefa0..1a01fb8b116 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -77,7 +77,7 @@ public class FastSearcher extends VespaBackEndSearcher { public FastSearcher(Backend dispatchBackend, FS4ResourcePool fs4ResourcePool, Dispatcher dispatcher, SummaryParameters docSumParams, ClusterParams clusterParams, CacheParams cacheParams, DocumentdbInfoConfig documentdbInfoConfig) { - init(docSumParams, clusterParams, cacheParams, documentdbInfoConfig); + init(fs4ResourcePool.getServerId(), docSumParams, clusterParams, cacheParams, documentdbInfoConfig); this.dispatchBackend = dispatchBackend; this.dispatcher = dispatcher; this.fs4InvokerFactory = new FS4InvokerFactory(fs4ResourcePool, dispatcher.searchCluster(), this); @@ -241,7 +241,7 @@ public class FastSearcher extends VespaBackEndSearcher { if (direct.isPresent()) { return fs4InvokerFactory.getFillInvoker(query, direct.get()); } - return new FS4FillInvoker(this, query, dispatchBackend); + return new FS4FillInvoker(getServerId(), this, query, dispatchBackend); } /** diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index 42903dcfa90..e8f5d7110f7 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -54,7 +54,9 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype"); - protected static final CompoundName TRACE_DISABLE = new CompoundName("trace.disable"); + private static final CompoundName TRACE_DISABLE = new CompoundName("trace.disable"); + + private String serverId; /** The set of all document databases available in the backend handled by this searcher */ private Map<String, DocumentDatabase> documentDbs = new LinkedHashMap<>(); @@ -72,7 +74,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { private String name; /** Cache wrapper */ - protected CacheControl cacheControl = null; + private CacheControl cacheControl = null; public final String getName() { return name; } protected final String getDefaultDocsumClass() { return defaultDocsumClass; } @@ -144,6 +146,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return result; } + public String getServerId() { return serverId; } protected DocumentDatabase getDocumentDatabase(Query query) { if (query.getModel().getRestrict().size() == 1) { @@ -163,8 +166,9 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { } } - public final void init(SummaryParameters docSumParams, ClusterParams clusterParams, CacheParams cacheParams, - DocumentdbInfoConfig documentdbInfoConfig) { + public final void init(String serverId, SummaryParameters docSumParams, ClusterParams clusterParams, + CacheParams cacheParams, DocumentdbInfoConfig documentdbInfoConfig) { + this.serverId = serverId; this.name = clusterParams.searcherName; Validator.ensureNotNull("Name of Vespa backend integration", getName()); @@ -213,7 +217,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if (root == null || root instanceof NullItem) // root can become null after resolving and transformation? return new Result(query); - QueryPacket queryPacket = createQueryPacket(query); + QueryPacket queryPacket = createQueryPacket(serverId, query); if (isLoggingFine()) getLogger().fine("made QueryPacket: " + queryPacket); @@ -237,8 +241,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return result; } - protected QueryPacket createQueryPacket(Query query) { - QueryPacket queryPacket = QueryPacket.create(query); + protected QueryPacket createQueryPacket(String serverId, Query query) { + QueryPacket queryPacket = QueryPacket.create(serverId, query); int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); queryPacket.setCompressionLimit(compressionLimit); if (compressionLimit != 0) @@ -368,7 +372,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { s.append(" ranking.queryCache=true"); } if (query.getGroupingSessionCache() || query.getRanking().getQueryCache()) { - s.append(" sessionId=").append(query.getSessionId(true)); + s.append(" sessionId=").append(query.getSessionId(false)); } List<Grouping> grouping = GroupingExecutor.getGroupingList(query); diff --git a/container-search/src/main/java/com/yahoo/search/Query.java b/container-search/src/main/java/com/yahoo/search/Query.java index 1fd81e506bd..8e5e14a3aac 100644 --- a/container-search/src/main/java/com/yahoo/search/Query.java +++ b/container-search/src/main/java/com/yahoo/search/Query.java @@ -1015,14 +1015,27 @@ public class Query extends com.yahoo.processing.Request implements Cloneable { * * @param create if true this is created if not already set * @return the session id of this query, or null if not set and create is false + * @deprecated use getSessionId() or getSessionId(serverId) instead */ + @Deprecated public SessionId getSessionId(boolean create) { - if (requestId == null && ! create) return null; + if ( ! create) return getSessionId(); - if (requestId == null && create) { + if (requestId == null) requestId = UniqueRequestId.next(); - } + return new SessionId(requestId, getRanking().getProfile()); + } + + /** Returns the session id of this query, or null if none is assigned */ + public SessionId getSessionId() { + if (requestId == null) return null; + return new SessionId(requestId, getRanking().getProfile()); + } + /** Returns the session id of this query, and creates and assigns it if not already present */ + public SessionId getSessionId(String serverId) { + if (requestId == null) + requestId = UniqueRequestId.next(serverId); return new SessionId(requestId, getRanking().getProfile()); } diff --git a/container-search/src/main/java/com/yahoo/search/grouping/vespa/GroupingExecutor.java b/container-search/src/main/java/com/yahoo/search/grouping/vespa/GroupingExecutor.java index 694c0c4580d..b28e648be78 100644 --- a/container-search/src/main/java/com/yahoo/search/grouping/vespa/GroupingExecutor.java +++ b/container-search/src/main/java/com/yahoo/search/grouping/vespa/GroupingExecutor.java @@ -217,7 +217,7 @@ public class GroupingExecutor extends Searcher { baseRoot = origRoot.clone(); } if (query.isTraceable(3) && query.getGroupingSessionCache()) { - query.trace("Grouping in " + (lastPass + 1) + " passes. SessionId='" + query.getSessionId(true) + "'.", 3); + query.trace("Grouping in " + (lastPass + 1) + " passes. SessionId='" + query.getSessionId(false) + "'.", 3); } for (int pass = 0; pass <= lastPass; ++pass) { boolean firstPass = (pass == 0); diff --git a/container-search/src/main/java/com/yahoo/search/query/SessionId.java b/container-search/src/main/java/com/yahoo/search/query/SessionId.java index b065bd9a0a9..38b69b1a1f8 100644 --- a/container-search/src/main/java/com/yahoo/search/query/SessionId.java +++ b/container-search/src/main/java/com/yahoo/search/query/SessionId.java @@ -4,7 +4,7 @@ package com.yahoo.search.query; import com.yahoo.text.Utf8String; /** - * A id which is unique across this cluster + the extra differentiator. + * An id which is unique across the cluster of nodes * * @author baldersheim */ @@ -12,8 +12,8 @@ public class SessionId { private final Utf8String id; - public SessionId(UniqueRequestId requestId, String extraDifferentiator) { - this.id = new Utf8String(requestId.toString() + "." + extraDifferentiator); + public SessionId(UniqueRequestId requestId, String localSessionId) { + this.id = new Utf8String(requestId.toString() + "." + localSessionId); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/query/UniqueRequestId.java b/container-search/src/main/java/com/yahoo/search/query/UniqueRequestId.java index 8d11b0f4fb9..49529936901 100644 --- a/container-search/src/main/java/com/yahoo/search/query/UniqueRequestId.java +++ b/container-search/src/main/java/com/yahoo/search/query/UniqueRequestId.java @@ -13,7 +13,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class UniqueRequestId { - private static final String serverId = Server.get().getServerDiscriminator(); private static final AtomicLong sequenceCounter = new AtomicLong(); private final String id; @@ -28,8 +27,20 @@ public class UniqueRequestId { /** * Creates a session id which is unique across the cluster this runtime is a member of each time this is called. * Calling this causes synchronization. + * + * @deprecated use nextId(serverId) instead */ + @Deprecated public static UniqueRequestId next() { + return new UniqueRequestId(Server.get().getServerDiscriminator(), System.currentTimeMillis(), sequenceCounter.getAndIncrement()); + } + + /** + * Creates a session id which is unique across the cluster this runtime is a member of each time this is called. + * Calling this causes synchronization. + */ + public static UniqueRequestId next(String serverId) { return new UniqueRequestId(serverId, System.currentTimeMillis(), sequenceCounter.getAndIncrement()); } + } |