diff options
Diffstat (limited to 'container-search')
14 files changed, 209 insertions, 120 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/Ping.java b/container-search/src/main/java/com/yahoo/prelude/Ping.java index ce8f1cba399..ae4f1e9e99e 100644 --- a/container-search/src/main/java/com/yahoo/prelude/Ping.java +++ b/container-search/src/main/java/com/yahoo/prelude/Ping.java @@ -7,6 +7,7 @@ package com.yahoo.prelude; * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> */ public class Ping { + /** How long to wait for a pong */ private long timeout; @@ -26,4 +27,5 @@ public class Ping { public String toString() { return "Ping(timeout = " + timeout + ")"; } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java index d183a47a66f..3365f5e896b 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java @@ -31,11 +31,11 @@ public class ClusterMonitor implements Runnable, Freezable { /** A map from Node to corresponding MonitoredNode */ private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>(); - ScheduledFuture<?> future; + private ScheduledFuture<?> future; private boolean isFrozen = false; - ClusterMonitor(final ClusterSearcher manager, final QrMonitorConfig monitorConfig, VipStatus vipStatus) { + ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, VipStatus vipStatus) { configuration = new MonitorConfiguration(monitorConfig); nodeManager = manager; this.vipStatus = vipStatus; @@ -59,7 +59,7 @@ public class ClusterMonitor implements Runnable, Freezable { /** * Adds a new node for monitoring. */ - void add(final VespaBackEndSearcher node) { + void add(VespaBackEndSearcher node) { if (isFrozen()) { throw new IllegalStateException( "Can not add new nodes after ClusterMonitor has been frozen."); @@ -69,9 +69,9 @@ public class ClusterMonitor implements Runnable, Freezable { } /** Called from ClusterSearcher/NodeManager when a node failed */ - void failed(final VespaBackEndSearcher node, final ErrorMessage error) { - final NodeMonitor monitor = nodeMonitors.get(node); - final boolean wasWorking = monitor.isWorking(); + void failed(VespaBackEndSearcher node, ErrorMessage error) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasWorking = monitor.isWorking(); monitor.failed(error); if (wasWorking && !monitor.isWorking()) { // was warning, see VESPA-1922 @@ -82,9 +82,9 @@ public class ClusterMonitor implements Runnable, Freezable { } /** Called when a node responded */ - void responded(final VespaBackEndSearcher node, boolean hasDocumentsOnline) { - final NodeMonitor monitor = nodeMonitors.get(node); - final boolean wasFailing = !monitor.isWorking(); + void responded(VespaBackEndSearcher node, boolean hasDocumentsOnline) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasFailing = !monitor.isWorking(); monitor.responded(hasDocumentsOnline); if (wasFailing && monitor.isWorking()) { log.info("Failed node '" + node + "' started working again."); @@ -122,7 +122,7 @@ public class ClusterMonitor implements Runnable, Freezable { log.finest("Activating ping"); try { ping(); - } catch (final Exception e) { + } catch (Exception e) { log.log(Level.WARNING, "Error in monitor thread", e); } } @@ -143,4 +143,5 @@ public class ClusterMonitor implements Runnable, Freezable { public boolean isFrozen() { return isFrozen; } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index 8b224dd6d51..36710f86e5b 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -114,7 +114,7 @@ public class ClusterSearcher extends Searcher { QrSearchersConfig.Searchcluster searchClusterConfig = getSearchClusterConfigFromClusterName(qrsConfig, clusterModelName); documentTypes = new LinkedHashSet<>(); failoverToRemote = clusterConfig.failoverToRemote(); - Dispatcher dispatcher = new Dispatcher(dispatchConfig); + Dispatcher dispatcher = new Dispatcher(dispatchConfig, fs4ResourcePool); String eventName = clusterModelName + ".cache_hit_ratio"; cacheHitRatio = new Value(eventName, manager, new Value.Parameters() diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java index b6fe4b69052..038d3c3b4fd 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java @@ -60,10 +60,9 @@ public class NodeMonitor { /** * Called when this node fails. * - * @param error - * A container which should contain a short description + * @param error a container which should contain a short description */ - public void failed(final ErrorMessage error) { + public void failed(ErrorMessage error) { long respondedAt = System.currentTimeMillis(); if (error.getCode() == BACKEND_COMMUNICATION_ERROR.code diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 61835492e52..425c99d0a83 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -66,9 +66,6 @@ public class FastSearcher extends VespaBackEndSearcher { /** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */ private final Dispatcher dispatcher; - /** Time (in ms) at which the index of this searcher was last modified */ - private volatile long editionTimeStamp = 0; - /** Edition of the index */ private int docstamp; @@ -108,21 +105,6 @@ public class FastSearcher extends VespaBackEndSearcher { this.containerClusterSize = containerClusterSize; } - /** Clears the packet cache if the received timestamp is older than our timestamp */ - private void checkTimestamp(QueryResultPacket resultPacket) { - checkTimestamp(resultPacket.getDocstamp()); - } - - /** Clears the packet cache if the received timestamp is older than our timestamp */ - private void checkTimestamp(int newDocstamp) { - if (docstamp < newDocstamp) { - long currentTimeMillis = System.currentTimeMillis(); - - docstamp = newDocstamp; - setEditionTimeStamp(currentTimeMillis); - } - } - private static SimpleDateFormat isoDateFormat; static { @@ -148,10 +130,15 @@ public class FastSearcher extends VespaBackEndSearcher { */ @Override public Pong ping(Ping ping, Execution execution) { + return ping(ping, dispatchBackend, getName()); + } + + public static Pong ping(Ping ping, Backend backend, String name) { + FS4Channel channel = backend.openPingChannel(); + // If you want to change this code, you need to understand // com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and // com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage) - FS4Channel channel = dispatchBackend.openPingChannel(); try { PingPacket pingPacket = new PingPacket(); @@ -161,11 +148,11 @@ public class FastSearcher extends VespaBackEndSearcher { try { boolean couldSend = channel.sendPacket(pingPacket); if (!couldSend) { - pong.addError(ErrorMessage.createBackendCommunicationError("Could not ping in " + getName())); + pong.addError(ErrorMessage.createBackendCommunicationError("Could not ping in " + name)); return pong; } } catch (InvalidChannelException e) { - pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName())); + pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + name)); return pong; } catch (IllegalStateException e) { pong.addError( @@ -181,25 +168,21 @@ public class FastSearcher extends VespaBackEndSearcher { try { packets = channel.receivePackets(ping.getTimeout(), 1); } catch (ChannelTimeoutException e) { - pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + getName())); + pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name)); return pong; } catch (InvalidChannelException e) { - pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); + pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name)); return pong; } if (packets.length == 0) { - pong.addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); + pong.addError(ErrorMessage.createBackendCommunicationError(name + " got no packets back")); return pong; } - if (isLoggingFine()) { - getLogger().finest("got packets " + packets.length + " packets"); - } - try { - ensureInstanceOf(PongPacket.class, packets[0]); + ensureInstanceOf(PongPacket.class, packets[0], name); } catch (TimeoutException e) { pong.addError(ErrorMessage.createTimeout(e.getMessage())); return pong; @@ -208,7 +191,6 @@ public class FastSearcher extends VespaBackEndSearcher { return pong; } pong.addPongPacket((PongPacket) packets[0]); - checkTimestamp(((PongPacket) packets[0]).getDocstamp()); return pong; } finally { if (channel != null) { @@ -283,8 +265,9 @@ public class FastSearcher extends VespaBackEndSearcher { // Such configurations produce skewed load in any case. if (containerClusterSize < dispatcher.searchCluster().size()) return dispatchBackend; - // TODO: Only use direct dispatch if the local search node is up - + // Only use direct dispatch if the local search node is up + if ( ! localSearchNode.isWorking()) return dispatchBackend; + query.trace(false, 2, "Dispatching directly to ", localSearchNode); return fs4ResourcePool.getBackend(localSearchNode.hostname(), localSearchNode.port()); } @@ -458,11 +441,9 @@ public class FastSearcher extends VespaBackEndSearcher { if (isLoggingFine()) getLogger().finest("got packets " + basicPackets.length + " packets"); - ensureInstanceOf(QueryResultPacket.class, basicPackets[0]); + ensureInstanceOf(QueryResultPacket.class, basicPackets[0], getName()); QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; - checkTimestamp(resultPacket); - if (isLoggingFine()) getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); @@ -561,27 +542,6 @@ public class FastSearcher extends VespaBackEndSearcher { return false; } - /** - * Whether to mask out the row id from the index uri. - * Masking out the row number is useful when it is necessary to deduplicate - * across rows. That is necessary with searchers which issues several queries - * to produce one result in the first phase, as the grouping searcher - when - * some of those searchers go to different rows, a mechanism is needed to detect - * duplicates returned from different rows before the summary is requested. - * Producing an index id which is the same across rows and using that as the - * hit uri provides this. Note that this only works if the document ids are the - * same for all the nodes (rows) in a column. This is usually the case for - * batch and incremental indexing, but not for realtime. - */ - - public long getEditionTimeStamp() { - return editionTimeStamp; - } - - public void setEditionTimeStamp(long editionTime) { - this.editionTimeStamp = editionTime; - } - public String toString() { return "fast searcher (" + getName() + ") " + dispatchBackend; } @@ -619,4 +579,5 @@ public class FastSearcher extends VespaBackEndSearcher { protected boolean isLoggingFine() { return getLogger().isLoggable(Level.FINE); } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index 419e0cf41e5..9dac736dee2 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -446,7 +446,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if (hit instanceof FastHit && !hit.isFilled(summaryClass)) { FastHit fastHit = (FastHit) hit; - ensureInstanceOf(DocsumPacket.class, packets[packetIndex]); + ensureInstanceOf(DocsumPacket.class, packets[packetIndex], getName()); DocsumPacket docsum = (DocsumPacket) packets[packetIndex]; packetIndex++; @@ -461,15 +461,15 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { /** * Throws an IOException if the packet is not of the expected type */ - protected final void ensureInstanceOf(Class<? extends BasicPacket> type, BasicPacket packet) throws IOException { + protected static void ensureInstanceOf(Class<? extends BasicPacket> type, BasicPacket packet, String name) throws IOException { if ((type.isAssignableFrom(packet.getClass()))) return; if (packet instanceof ErrorPacket) { ErrorPacket errorPacket=(ErrorPacket)packet; if (errorPacket.getErrorCode() == 8) - throw new TimeoutException("Query timed out in " + getName()); + throw new TimeoutException("Query timed out in " + name); else - throw new IOException("Received error from backend in " + getName() + ": " + packet); + throw new IOException("Received error from backend in " + name + ": " + packet); } else { throw new IOException("Received " + packet + " when expecting " + type); } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java index de67369a231..9aafdb8b236 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java @@ -21,7 +21,7 @@ import com.yahoo.search.result.ErrorMessage; */ public abstract class BaseNodeMonitor<T> { - protected static Logger log=Logger.getLogger(BaseNodeMonitor.class.getName()); + protected static Logger log = Logger.getLogger(BaseNodeMonitor.class.getName()); /** The object representing the monitored node */ protected T node; @@ -90,4 +90,5 @@ public abstract class BaseNodeMonitor<T> { /** Returns whether or not this is monitoring an internal node. Default is false. */ public boolean isInternal() { return internal; } + } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 619beb0a8d1..2ccb57c6e83 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -24,7 +24,7 @@ import java.util.logging.Logger; */ public class ClusterMonitor<T> { - private MonitorConfiguration configuration=new MonitorConfiguration(); + private MonitorConfiguration configuration = new MonitorConfiguration(); private static Logger log=Logger.getLogger(ClusterMonitor.class.getName()); @@ -35,14 +35,18 @@ public class ClusterMonitor<T> { private volatile boolean shutdown = false; /** A map from Node to corresponding MonitoredNode */ - private Map<T,BaseNodeMonitor<T>> nodeMonitors= - Collections.synchronizedMap(new java.util.LinkedHashMap<T, BaseNodeMonitor<T>>()); + private Map<T, BaseNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); - public ClusterMonitor(NodeManager<T> manager, String monitorConfigID) { - nodeManager=manager; - monitorThread=new MonitorThread("search.clustermonitor"); + /** @deprecated use the constructor with just the first argument instead */ + @Deprecated + public ClusterMonitor(NodeManager<T> manager, String ignored) { + this(manager); + } + + public ClusterMonitor(NodeManager<T> manager) { + nodeManager = manager; + monitorThread = new MonitorThread("search.clustermonitor"); monitorThread.start(); - log.fine("checkInterval is " + configuration.getCheckInterval()+" ms"); } /** Returns the configuration of this cluster monitor */ @@ -59,10 +63,9 @@ public class ClusterMonitor<T> { * @param node the object representing the node * @param internal whether or not this node is internal to this cluster */ - public void add(T node,boolean internal) { - BaseNodeMonitor<T> monitor=new TrafficNodeMonitor<>(node,configuration,internal); - // BaseNodeMonitor monitor=new NodeMonitor(node,configuration); - nodeMonitors.put(node,monitor); + public void add(T node, boolean internal) { + BaseNodeMonitor<T> monitor = new TrafficNodeMonitor<>(node, configuration, internal); + nodeMonitors.put(node, monitor); } /** @@ -74,8 +77,8 @@ public class ClusterMonitor<T> { /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { - BaseNodeMonitor<T> monitor=nodeMonitors.get(node); - boolean wasWorking=monitor.isWorking(); + BaseNodeMonitor<T> monitor = nodeMonitors.get(node); + boolean wasWorking = monitor.isWorking(); monitor.failed(error); if (wasWorking && !monitor.isWorking()) { nodeManager.failed(node); @@ -85,7 +88,7 @@ public class ClusterMonitor<T> { /** Called when a node responded */ public synchronized void responded(T node) { BaseNodeMonitor<T> monitor = nodeMonitors.get(node); - boolean wasFailing=!monitor.isWorking(); + boolean wasFailing =! monitor.isWorking(); monitor.responded(); if (wasFailing && monitor.isWorking()) { nodeManager.working(monitor.getNode()); @@ -96,11 +99,9 @@ public class ClusterMonitor<T> { * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { - for (Iterator<BaseNodeMonitor<T>> i=nodeMonitorIterator(); i.hasNext(); ) { + for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext(); ) { BaseNodeMonitor<T> monitor= i.next(); - // always ping - // if (monitor.isIdle()) - nodeManager.ping(monitor.getNode(),executor); // Cause call to failed or responded + nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded } } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java index 893d6193bef..d5ab94e7ec1 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -3,9 +3,11 @@ package com.yahoo.search.cluster; import com.yahoo.component.ComponentId; import com.yahoo.container.protect.Error; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.log.LogLevel; import com.yahoo.prelude.Ping; import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FastSearcher; import com.yahoo.yolean.Exceptions; import com.yahoo.search.Query; import com.yahoo.search.Result; @@ -31,12 +33,12 @@ import java.util.concurrent.*; * The connection objects should implement a good toString to ease diagnostics. * * @author bratseth - * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a> + * @author Arne Bergene Fossaa */ public abstract class ClusterSearcher<T> extends PingableSearcher implements NodeManager<T> { private final Hasher<T> hasher; - private final ClusterMonitor<T> monitor = new ClusterMonitor<>(this, "dummy"); + private final ClusterMonitor<T> monitor = new ClusterMonitor<>(this); /** * Creates a new cluster searcher @@ -59,7 +61,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod } /** - * Pinging a node by sending a query NodeManager method, called from ClusterMonitor + * Pinging a node, called from ClusterMonitor */ @Override public final void ping(T p, Executor executor) { @@ -344,4 +346,5 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod } } + } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java index ce221fa1479..25582e43f5e 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java @@ -12,12 +12,15 @@ import java.util.concurrent.Executor; public interface NodeManager<T> { /** Called when a failed node is working (ready for production) again */ - public void working(T node); + void working(T node); /** Called when a working node fails */ - public void failed(T node); + void failed(T node); - /** Called when a node should be pinged */ - public void ping(T node, Executor executor); + /** + * Called when a node should be pinged. + * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded + */ + void ping(T node, Executor executor); } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java index 6464f0101be..7681ae89f18 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java @@ -3,7 +3,6 @@ package com.yahoo.search.cluster; import com.yahoo.search.result.ErrorMessage; - /** * This node monitor is responsible for maintaining the state of a monitored node. * It has the following properties: @@ -12,16 +11,15 @@ import com.yahoo.search.result.ErrorMessage; * <li>A node is put back in operation when it responds correctly again * </ul> * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + * @author Steinar Knutsen */ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { - /** - * Creates a new node monitor for a node - */ - public TrafficNodeMonitor(T node,MonitorConfiguration configuration,boolean internal) { + + /** Creates a new node monitor for a node */ + public TrafficNodeMonitor(T node, MonitorConfiguration configuration, boolean internal) { super(internal); - this.node=node; - this.configuration=configuration; + this.node = node; + this.configuration = configuration; } /** Whether or not this has ever responded successfully */ @@ -32,11 +30,11 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { /** * Called when this node fails. * - * @param error A container which should contain a short description + * @param error a container which should contain a short description */ @Override public void failed(ErrorMessage error) { - respondedAt=now(); + respondedAt = now(); switch (error.getCode()) { // TODO: Remove hard coded error messages. @@ -63,9 +61,8 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { succeededAt=respondedAt; atStartUp = false; - if (!isWorking) { + if (!isWorking) setWorking(true,"Responds correctly"); - } } /** Thread-safely changes the state of this node if required */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index e1b9f717d61..ca6445cff44 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.yahoo.collections.ListMap; @@ -8,6 +9,7 @@ import com.yahoo.component.AbstractComponent; import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.search.Query; @@ -35,6 +37,7 @@ import java.util.logging.Logger; * * @author bratseth */ +@Beta public class Dispatcher extends AbstractComponent { private final static Logger log = Logger.getLogger(Dispatcher.class.getName()); @@ -49,9 +52,9 @@ public class Dispatcher extends AbstractComponent { private final Compressor compressor = new Compressor(); @Inject - public Dispatcher(DispatchConfig dispatchConfig) { + public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) { this.client = new RpcClient(); - this.searchCluster = new SearchCluster(dispatchConfig); + this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool); // Create node rpc connections, indexed by the legacy "partid", which allows us to bridge // between fs4 calls (for search) and rpc calls (for summary fetch) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index 4f1b1ebfec0..6eb67b245f2 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -1,12 +1,33 @@ package com.yahoo.search.dispatch; +import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.cluster.NodeManager; +import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; +// Only needed until query requests are moved to rpc +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.fastsearch.FastSearcher; +import com.yahoo.yolean.Exceptions; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; + import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; /** @@ -14,18 +35,26 @@ import java.util.stream.Collectors; * * @author bratseth */ -public class SearchCluster { +@Beta +public class SearchCluster implements NodeManager<SearchCluster.Node> { + private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); + private final int size; private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; + private final ClusterMonitor<Node> clusterMonitor; - public SearchCluster(DispatchConfig dispatchConfig) { - this(toNodes(dispatchConfig)); + // Only needed until query requests are moved to rpc + private final FS4ResourcePool fs4ResourcePool; + + public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) { + this(toNodes(dispatchConfig), fs4ResourcePool); } - public SearchCluster(List<Node> nodes) { + public SearchCluster(List<Node> nodes, FS4ResourcePool fs4ResourcePool) { size = nodes.size(); + this.fs4ResourcePool = fs4ResourcePool; // Create groups ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); @@ -38,6 +67,12 @@ public class SearchCluster { for (Node node : nodes) nodesByHostBuilder.put(node.hostname(), node); nodesByHost = nodesByHostBuilder.build(); + + // Set up monitoring of the fs4 interface of the nodes + // We can switch to monitoring the rpc interface instead when we move the query phase to rpc + clusterMonitor = new ClusterMonitor<>(this); + for (Node node : nodes) + clusterMonitor.add(node, true); } private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { @@ -58,7 +93,68 @@ public class SearchCluster { * One host may contain multiple nodes (on different ports), so this is a multi-map. */ public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; } - + + /** Used by the cluster monitor to manage node status */ + @Override + public void working(Node node) { node.setWorking(true); } + + /** Used by the cluster monitor to manage node status */ + @Override + public void failed(Node node) { node.setWorking(false); } + + /** Used by the cluster monitor to manage node status */ + @Override + public void ping(Node node, Executor executor) { + Pinger pinger = new Pinger(node); + FutureTask<Pong> future = new FutureTask<>(pinger); + + executor.execute(future); + Pong pong; + try { + pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); + log.log(Level.WARNING, "Exception pinging " + node, e); + } catch (ExecutionException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); + log.log(Level.WARNING, "Exception pinging " + node, e); + } catch (TimeoutException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); + } + future.cancel(true); + + if (pong.badResponse()) + clusterMonitor.failed(node, pong.getError(0)); + else + clusterMonitor.responded(node); + } + + private class Pinger implements Callable<Pong> { + + private final Node node; + + public Pinger(Node node) { + this.node = node; + } + + public Pong call() { + Pong pong; + try { + pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), + fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString()); + } catch (RuntimeException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + + Exceptions.toMessageString(e))); + } + return pong; + } + + } + public static class Group { private final int id; @@ -86,6 +182,8 @@ public class SearchCluster { private final int port; private final int group; + private final AtomicBoolean working = new AtomicBoolean(); + public Node(String hostname, int port, int group) { this.hostname = hostname; this.port = port; @@ -98,6 +196,26 @@ public class SearchCluster { /** Returns the id of this group this node belongs to */ public int group() { return group; } + private void setWorking(boolean working) { + this.working.lazySet(working); + } + + /** Returns whether this node is currently responding to requests */ + public boolean isWorking() { return working.get(); } + + @Override + public int hashCode() { return Objects.hash(hostname, port); } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof Node)) return false; + Node other = (Node)o; + if ( ! Objects.equals(this.hostname, other.hostname)) return false; + if ( ! Objects.equals(this.port, other.port)) return false; + return true; + } + @Override public String toString() { return "search node " + hostname; } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index e4cbff8def9..abaf03d7ffd 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -648,7 +648,7 @@ public class FastSearcherTestCase { private static class MockDispatcher extends Dispatcher { public MockDispatcher() { - super(new DispatchConfig(new DispatchConfig.Builder())); + super(new DispatchConfig(new DispatchConfig.Builder()), new FS4ResourcePool(1)); } public void fill(Result result, String summaryClass) { |