aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-08-16 14:31:20 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-08-16 14:31:20 +0200
commit9dab52cdfc1d679629046899c85a6b5139ab5945 (patch)
tree90e850697f00f5f902ff1a2b5c11d4492c531ceb /container-search/src/main/java/com/yahoo
parente32d551e91700add8758cf57d9b91f7624c2bd3a (diff)
Only use direct dispatch when the local node is responding
Diffstat (limited to 'container-search/src/main/java/com/yahoo')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/Ping.java2
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java21
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java5
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java73
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java8
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java3
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java37
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java21
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java128
13 files changed, 208 insertions, 119 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/Ping.java b/container-search/src/main/java/com/yahoo/prelude/Ping.java
index ce8f1cba399..ae4f1e9e99e 100644
--- a/container-search/src/main/java/com/yahoo/prelude/Ping.java
+++ b/container-search/src/main/java/com/yahoo/prelude/Ping.java
@@ -7,6 +7,7 @@ package com.yahoo.prelude;
* @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
*/
public class Ping {
+
/** How long to wait for a pong */
private long timeout;
@@ -26,4 +27,5 @@ public class Ping {
public String toString() {
return "Ping(timeout = " + timeout + ")";
}
+
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
index d183a47a66f..3365f5e896b 100644
--- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
@@ -31,11 +31,11 @@ public class ClusterMonitor implements Runnable, Freezable {
/** A map from Node to corresponding MonitoredNode */
private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>();
- ScheduledFuture<?> future;
+ private ScheduledFuture<?> future;
private boolean isFrozen = false;
- ClusterMonitor(final ClusterSearcher manager, final QrMonitorConfig monitorConfig, VipStatus vipStatus) {
+ ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, VipStatus vipStatus) {
configuration = new MonitorConfiguration(monitorConfig);
nodeManager = manager;
this.vipStatus = vipStatus;
@@ -59,7 +59,7 @@ public class ClusterMonitor implements Runnable, Freezable {
/**
* Adds a new node for monitoring.
*/
- void add(final VespaBackEndSearcher node) {
+ void add(VespaBackEndSearcher node) {
if (isFrozen()) {
throw new IllegalStateException(
"Can not add new nodes after ClusterMonitor has been frozen.");
@@ -69,9 +69,9 @@ public class ClusterMonitor implements Runnable, Freezable {
}
/** Called from ClusterSearcher/NodeManager when a node failed */
- void failed(final VespaBackEndSearcher node, final ErrorMessage error) {
- final NodeMonitor monitor = nodeMonitors.get(node);
- final boolean wasWorking = monitor.isWorking();
+ void failed(VespaBackEndSearcher node, ErrorMessage error) {
+ NodeMonitor monitor = nodeMonitors.get(node);
+ boolean wasWorking = monitor.isWorking();
monitor.failed(error);
if (wasWorking && !monitor.isWorking()) {
// was warning, see VESPA-1922
@@ -82,9 +82,9 @@ public class ClusterMonitor implements Runnable, Freezable {
}
/** Called when a node responded */
- void responded(final VespaBackEndSearcher node, boolean hasDocumentsOnline) {
- final NodeMonitor monitor = nodeMonitors.get(node);
- final boolean wasFailing = !monitor.isWorking();
+ void responded(VespaBackEndSearcher node, boolean hasDocumentsOnline) {
+ NodeMonitor monitor = nodeMonitors.get(node);
+ boolean wasFailing = !monitor.isWorking();
monitor.responded(hasDocumentsOnline);
if (wasFailing && monitor.isWorking()) {
log.info("Failed node '" + node + "' started working again.");
@@ -122,7 +122,7 @@ public class ClusterMonitor implements Runnable, Freezable {
log.finest("Activating ping");
try {
ping();
- } catch (final Exception e) {
+ } catch (Exception e) {
log.log(Level.WARNING, "Error in monitor thread", e);
}
}
@@ -143,4 +143,5 @@ public class ClusterMonitor implements Runnable, Freezable {
public boolean isFrozen() {
return isFrozen;
}
+
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
index 8b224dd6d51..36710f86e5b 100644
--- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
@@ -114,7 +114,7 @@ public class ClusterSearcher extends Searcher {
QrSearchersConfig.Searchcluster searchClusterConfig = getSearchClusterConfigFromClusterName(qrsConfig, clusterModelName);
documentTypes = new LinkedHashSet<>();
failoverToRemote = clusterConfig.failoverToRemote();
- Dispatcher dispatcher = new Dispatcher(dispatchConfig);
+ Dispatcher dispatcher = new Dispatcher(dispatchConfig, fs4ResourcePool);
String eventName = clusterModelName + ".cache_hit_ratio";
cacheHitRatio = new Value(eventName, manager, new Value.Parameters()
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java
index b6fe4b69052..038d3c3b4fd 100644
--- a/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/NodeMonitor.java
@@ -60,10 +60,9 @@ public class NodeMonitor {
/**
* Called when this node fails.
*
- * @param error
- * A container which should contain a short description
+ * @param error a container which should contain a short description
*/
- public void failed(final ErrorMessage error) {
+ public void failed(ErrorMessage error) {
long respondedAt = System.currentTimeMillis();
if (error.getCode() == BACKEND_COMMUNICATION_ERROR.code
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index 61835492e52..425c99d0a83 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -66,9 +66,6 @@ public class FastSearcher extends VespaBackEndSearcher {
/** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */
private final Dispatcher dispatcher;
- /** Time (in ms) at which the index of this searcher was last modified */
- private volatile long editionTimeStamp = 0;
-
/** Edition of the index */
private int docstamp;
@@ -108,21 +105,6 @@ public class FastSearcher extends VespaBackEndSearcher {
this.containerClusterSize = containerClusterSize;
}
- /** Clears the packet cache if the received timestamp is older than our timestamp */
- private void checkTimestamp(QueryResultPacket resultPacket) {
- checkTimestamp(resultPacket.getDocstamp());
- }
-
- /** Clears the packet cache if the received timestamp is older than our timestamp */
- private void checkTimestamp(int newDocstamp) {
- if (docstamp < newDocstamp) {
- long currentTimeMillis = System.currentTimeMillis();
-
- docstamp = newDocstamp;
- setEditionTimeStamp(currentTimeMillis);
- }
- }
-
private static SimpleDateFormat isoDateFormat;
static {
@@ -148,10 +130,15 @@ public class FastSearcher extends VespaBackEndSearcher {
*/
@Override
public Pong ping(Ping ping, Execution execution) {
+ return ping(ping, dispatchBackend, getName());
+ }
+
+ public static Pong ping(Ping ping, Backend backend, String name) {
+ FS4Channel channel = backend.openPingChannel();
+
// If you want to change this code, you need to understand
// com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and
// com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage)
- FS4Channel channel = dispatchBackend.openPingChannel();
try {
PingPacket pingPacket = new PingPacket();
@@ -161,11 +148,11 @@ public class FastSearcher extends VespaBackEndSearcher {
try {
boolean couldSend = channel.sendPacket(pingPacket);
if (!couldSend) {
- pong.addError(ErrorMessage.createBackendCommunicationError("Could not ping in " + getName()));
+ pong.addError(ErrorMessage.createBackendCommunicationError("Could not ping in " + name));
return pong;
}
} catch (InvalidChannelException e) {
- pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()));
+ pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + name));
return pong;
} catch (IllegalStateException e) {
pong.addError(
@@ -181,25 +168,21 @@ public class FastSearcher extends VespaBackEndSearcher {
try {
packets = channel.receivePackets(ping.getTimeout(), 1);
} catch (ChannelTimeoutException e) {
- pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + getName()));
+ pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name));
return pong;
} catch (InvalidChannelException e) {
- pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()));
+ pong.addError(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name));
return pong;
}
if (packets.length == 0) {
- pong.addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"));
+ pong.addError(ErrorMessage.createBackendCommunicationError(name + " got no packets back"));
return pong;
}
- if (isLoggingFine()) {
- getLogger().finest("got packets " + packets.length + " packets");
- }
-
try {
- ensureInstanceOf(PongPacket.class, packets[0]);
+ ensureInstanceOf(PongPacket.class, packets[0], name);
} catch (TimeoutException e) {
pong.addError(ErrorMessage.createTimeout(e.getMessage()));
return pong;
@@ -208,7 +191,6 @@ public class FastSearcher extends VespaBackEndSearcher {
return pong;
}
pong.addPongPacket((PongPacket) packets[0]);
- checkTimestamp(((PongPacket) packets[0]).getDocstamp());
return pong;
} finally {
if (channel != null) {
@@ -283,8 +265,9 @@ public class FastSearcher extends VespaBackEndSearcher {
// Such configurations produce skewed load in any case.
if (containerClusterSize < dispatcher.searchCluster().size()) return dispatchBackend;
- // TODO: Only use direct dispatch if the local search node is up
-
+ // Only use direct dispatch if the local search node is up
+ if ( ! localSearchNode.isWorking()) return dispatchBackend;
+
query.trace(false, 2, "Dispatching directly to ", localSearchNode);
return fs4ResourcePool.getBackend(localSearchNode.hostname(), localSearchNode.port());
}
@@ -458,11 +441,9 @@ public class FastSearcher extends VespaBackEndSearcher {
if (isLoggingFine())
getLogger().finest("got packets " + basicPackets.length + " packets");
- ensureInstanceOf(QueryResultPacket.class, basicPackets[0]);
+ ensureInstanceOf(QueryResultPacket.class, basicPackets[0], getName());
QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0];
- checkTimestamp(resultPacket);
-
if (isLoggingFine())
getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary());
@@ -561,27 +542,6 @@ public class FastSearcher extends VespaBackEndSearcher {
return false;
}
- /**
- * Whether to mask out the row id from the index uri.
- * Masking out the row number is useful when it is necessary to deduplicate
- * across rows. That is necessary with searchers which issues several queries
- * to produce one result in the first phase, as the grouping searcher - when
- * some of those searchers go to different rows, a mechanism is needed to detect
- * duplicates returned from different rows before the summary is requested.
- * Producing an index id which is the same across rows and using that as the
- * hit uri provides this. Note that this only works if the document ids are the
- * same for all the nodes (rows) in a column. This is usually the case for
- * batch and incremental indexing, but not for realtime.
- */
-
- public long getEditionTimeStamp() {
- return editionTimeStamp;
- }
-
- public void setEditionTimeStamp(long editionTime) {
- this.editionTimeStamp = editionTime;
- }
-
public String toString() {
return "fast searcher (" + getName() + ") " + dispatchBackend;
}
@@ -619,4 +579,5 @@ public class FastSearcher extends VespaBackEndSearcher {
protected boolean isLoggingFine() {
return getLogger().isLoggable(Level.FINE);
}
+
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
index 419e0cf41e5..9dac736dee2 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
@@ -446,7 +446,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
if (hit instanceof FastHit && !hit.isFilled(summaryClass)) {
FastHit fastHit = (FastHit) hit;
- ensureInstanceOf(DocsumPacket.class, packets[packetIndex]);
+ ensureInstanceOf(DocsumPacket.class, packets[packetIndex], getName());
DocsumPacket docsum = (DocsumPacket) packets[packetIndex];
packetIndex++;
@@ -461,15 +461,15 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
/**
* Throws an IOException if the packet is not of the expected type
*/
- protected final void ensureInstanceOf(Class<? extends BasicPacket> type, BasicPacket packet) throws IOException {
+ protected static void ensureInstanceOf(Class<? extends BasicPacket> type, BasicPacket packet, String name) throws IOException {
if ((type.isAssignableFrom(packet.getClass()))) return;
if (packet instanceof ErrorPacket) {
ErrorPacket errorPacket=(ErrorPacket)packet;
if (errorPacket.getErrorCode() == 8)
- throw new TimeoutException("Query timed out in " + getName());
+ throw new TimeoutException("Query timed out in " + name);
else
- throw new IOException("Received error from backend in " + getName() + ": " + packet);
+ throw new IOException("Received error from backend in " + name + ": " + packet);
} else {
throw new IOException("Received " + packet + " when expecting " + type);
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
index de67369a231..9aafdb8b236 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
@@ -21,7 +21,7 @@ import com.yahoo.search.result.ErrorMessage;
*/
public abstract class BaseNodeMonitor<T> {
- protected static Logger log=Logger.getLogger(BaseNodeMonitor.class.getName());
+ protected static Logger log = Logger.getLogger(BaseNodeMonitor.class.getName());
/** The object representing the monitored node */
protected T node;
@@ -90,4 +90,5 @@ public abstract class BaseNodeMonitor<T> {
/** Returns whether or not this is monitoring an internal node. Default is false. */
public boolean isInternal() { return internal; }
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index 619beb0a8d1..2ccb57c6e83 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -24,7 +24,7 @@ import java.util.logging.Logger;
*/
public class ClusterMonitor<T> {
- private MonitorConfiguration configuration=new MonitorConfiguration();
+ private MonitorConfiguration configuration = new MonitorConfiguration();
private static Logger log=Logger.getLogger(ClusterMonitor.class.getName());
@@ -35,14 +35,18 @@ public class ClusterMonitor<T> {
private volatile boolean shutdown = false;
/** A map from Node to corresponding MonitoredNode */
- private Map<T,BaseNodeMonitor<T>> nodeMonitors=
- Collections.synchronizedMap(new java.util.LinkedHashMap<T, BaseNodeMonitor<T>>());
+ private Map<T, BaseNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
- public ClusterMonitor(NodeManager<T> manager, String monitorConfigID) {
- nodeManager=manager;
- monitorThread=new MonitorThread("search.clustermonitor");
+ /** @deprecated use the constructor with just the first argument instead */
+ @Deprecated
+ public ClusterMonitor(NodeManager<T> manager, String ignored) {
+ this(manager);
+ }
+
+ public ClusterMonitor(NodeManager<T> manager) {
+ nodeManager = manager;
+ monitorThread = new MonitorThread("search.clustermonitor");
monitorThread.start();
- log.fine("checkInterval is " + configuration.getCheckInterval()+" ms");
}
/** Returns the configuration of this cluster monitor */
@@ -59,10 +63,9 @@ public class ClusterMonitor<T> {
* @param node the object representing the node
* @param internal whether or not this node is internal to this cluster
*/
- public void add(T node,boolean internal) {
- BaseNodeMonitor<T> monitor=new TrafficNodeMonitor<>(node,configuration,internal);
- // BaseNodeMonitor monitor=new NodeMonitor(node,configuration);
- nodeMonitors.put(node,monitor);
+ public void add(T node, boolean internal) {
+ BaseNodeMonitor<T> monitor = new TrafficNodeMonitor<>(node, configuration, internal);
+ nodeMonitors.put(node, monitor);
}
/**
@@ -74,8 +77,8 @@ public class ClusterMonitor<T> {
/** Called from ClusterSearcher/NodeManager when a node failed */
public synchronized void failed(T node, ErrorMessage error) {
- BaseNodeMonitor<T> monitor=nodeMonitors.get(node);
- boolean wasWorking=monitor.isWorking();
+ BaseNodeMonitor<T> monitor = nodeMonitors.get(node);
+ boolean wasWorking = monitor.isWorking();
monitor.failed(error);
if (wasWorking && !monitor.isWorking()) {
nodeManager.failed(node);
@@ -85,7 +88,7 @@ public class ClusterMonitor<T> {
/** Called when a node responded */
public synchronized void responded(T node) {
BaseNodeMonitor<T> monitor = nodeMonitors.get(node);
- boolean wasFailing=!monitor.isWorking();
+ boolean wasFailing =! monitor.isWorking();
monitor.responded();
if (wasFailing && monitor.isWorking()) {
nodeManager.working(monitor.getNode());
@@ -96,11 +99,9 @@ public class ClusterMonitor<T> {
* Ping all nodes which needs pinging to discover state changes
*/
public void ping(Executor executor) {
- for (Iterator<BaseNodeMonitor<T>> i=nodeMonitorIterator(); i.hasNext(); ) {
+ for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext(); ) {
BaseNodeMonitor<T> monitor= i.next();
- // always ping
- // if (monitor.isIdle())
- nodeManager.ping(monitor.getNode(),executor); // Cause call to failed or responded
+ nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
index 893d6193bef..d5ab94e7ec1 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
@@ -3,9 +3,11 @@ package com.yahoo.search.cluster;
import com.yahoo.component.ComponentId;
import com.yahoo.container.protect.Error;
+import com.yahoo.fs4.mplex.Backend;
import com.yahoo.log.LogLevel;
import com.yahoo.prelude.Ping;
import com.yahoo.prelude.Pong;
+import com.yahoo.prelude.fastsearch.FastSearcher;
import com.yahoo.yolean.Exceptions;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
@@ -31,12 +33,12 @@ import java.util.concurrent.*;
* The connection objects should implement a good toString to ease diagnostics.
*
* @author bratseth
- * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author Arne Bergene Fossaa
*/
public abstract class ClusterSearcher<T> extends PingableSearcher implements NodeManager<T> {
private final Hasher<T> hasher;
- private final ClusterMonitor<T> monitor = new ClusterMonitor<>(this, "dummy");
+ private final ClusterMonitor<T> monitor = new ClusterMonitor<>(this);
/**
* Creates a new cluster searcher
@@ -59,7 +61,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
}
/**
- * Pinging a node by sending a query NodeManager method, called from ClusterMonitor
+ * Pinging a node, called from ClusterMonitor
*/
@Override
public final void ping(T p, Executor executor) {
@@ -344,4 +346,5 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
}
}
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
index ce221fa1479..25582e43f5e 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
@@ -12,12 +12,15 @@ import java.util.concurrent.Executor;
public interface NodeManager<T> {
/** Called when a failed node is working (ready for production) again */
- public void working(T node);
+ void working(T node);
/** Called when a working node fails */
- public void failed(T node);
+ void failed(T node);
- /** Called when a node should be pinged */
- public void ping(T node, Executor executor);
+ /**
+ * Called when a node should be pinged.
+ * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded
+ */
+ void ping(T node, Executor executor);
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
index 6464f0101be..7681ae89f18 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
@@ -3,7 +3,6 @@ package com.yahoo.search.cluster;
import com.yahoo.search.result.ErrorMessage;
-
/**
* This node monitor is responsible for maintaining the state of a monitored node.
* It has the following properties:
@@ -12,16 +11,15 @@ import com.yahoo.search.result.ErrorMessage;
* <li>A node is put back in operation when it responds correctly again
* </ul>
*
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author Steinar Knutsen
*/
public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
- /**
- * Creates a new node monitor for a node
- */
- public TrafficNodeMonitor(T node,MonitorConfiguration configuration,boolean internal) {
+
+ /** Creates a new node monitor for a node */
+ public TrafficNodeMonitor(T node, MonitorConfiguration configuration, boolean internal) {
super(internal);
- this.node=node;
- this.configuration=configuration;
+ this.node = node;
+ this.configuration = configuration;
}
/** Whether or not this has ever responded successfully */
@@ -32,11 +30,11 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
/**
* Called when this node fails.
*
- * @param error A container which should contain a short description
+ * @param error a container which should contain a short description
*/
@Override
public void failed(ErrorMessage error) {
- respondedAt=now();
+ respondedAt = now();
switch (error.getCode()) {
// TODO: Remove hard coded error messages.
@@ -63,9 +61,8 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
succeededAt=respondedAt;
atStartUp = false;
- if (!isWorking) {
+ if (!isWorking)
setWorking(true,"Responds correctly");
- }
}
/** Thread-safely changes the state of this node if required */
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index e1b9f717d61..ca6445cff44 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -1,6 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.yahoo.collections.ListMap;
@@ -8,6 +9,7 @@ import com.yahoo.component.AbstractComponent;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
@@ -35,6 +37,7 @@ import java.util.logging.Logger;
*
* @author bratseth
*/
+@Beta
public class Dispatcher extends AbstractComponent {
private final static Logger log = Logger.getLogger(Dispatcher.class.getName());
@@ -49,9 +52,9 @@ public class Dispatcher extends AbstractComponent {
private final Compressor compressor = new Compressor();
@Inject
- public Dispatcher(DispatchConfig dispatchConfig) {
+ public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) {
this.client = new RpcClient();
- this.searchCluster = new SearchCluster(dispatchConfig);
+ this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool);
// Create node rpc connections, indexed by the legacy "partid", which allows us to bridge
// between fs4 calls (for search) and rpc calls (for summary fetch)
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
index 4f1b1ebfec0..6eb67b245f2 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
@@ -1,12 +1,33 @@
package com.yahoo.search.dispatch;
+import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
+import com.yahoo.search.cluster.ClusterMonitor;
+import com.yahoo.search.cluster.NodeManager;
+import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
+// Only needed until query requests are moved to rpc
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.fastsearch.FastSearcher;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.prelude.Pong;
+import com.yahoo.prelude.fastsearch.FS4ResourcePool;
+
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
@@ -14,18 +35,26 @@ import java.util.stream.Collectors;
*
* @author bratseth
*/
-public class SearchCluster {
+@Beta
+public class SearchCluster implements NodeManager<SearchCluster.Node> {
+ private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
+
private final int size;
private final ImmutableMap<Integer, Group> groups;
private final ImmutableMultimap<String, Node> nodesByHost;
+ private final ClusterMonitor<Node> clusterMonitor;
- public SearchCluster(DispatchConfig dispatchConfig) {
- this(toNodes(dispatchConfig));
+ // Only needed until query requests are moved to rpc
+ private final FS4ResourcePool fs4ResourcePool;
+
+ public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) {
+ this(toNodes(dispatchConfig), fs4ResourcePool);
}
- public SearchCluster(List<Node> nodes) {
+ public SearchCluster(List<Node> nodes, FS4ResourcePool fs4ResourcePool) {
size = nodes.size();
+ this.fs4ResourcePool = fs4ResourcePool;
// Create groups
ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
@@ -38,6 +67,12 @@ public class SearchCluster {
for (Node node : nodes)
nodesByHostBuilder.put(node.hostname(), node);
nodesByHost = nodesByHostBuilder.build();
+
+ // Set up monitoring of the fs4 interface of the nodes
+ // We can switch to monitoring the rpc interface instead when we move the query phase to rpc
+ clusterMonitor = new ClusterMonitor<>(this);
+ for (Node node : nodes)
+ clusterMonitor.add(node, true);
}
private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) {
@@ -58,7 +93,68 @@ public class SearchCluster {
* One host may contain multiple nodes (on different ports), so this is a multi-map.
*/
public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; }
-
+
+ /** Used by the cluster monitor to manage node status */
+ @Override
+ public void working(Node node) { node.setWorking(true); }
+
+ /** Used by the cluster monitor to manage node status */
+ @Override
+ public void failed(Node node) { node.setWorking(false); }
+
+ /** Used by the cluster monitor to manage node status */
+ @Override
+ public void ping(Node node, Executor executor) {
+ Pinger pinger = new Pinger(node);
+ FutureTask<Pong> future = new FutureTask<>(pinger);
+
+ executor.execute(future);
+ Pong pong;
+ try {
+ pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node));
+ log.log(Level.WARNING, "Exception pinging " + node, e);
+ } catch (ExecutionException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node));
+ log.log(Level.WARNING, "Exception pinging " + node, e);
+ } catch (TimeoutException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out"));
+ }
+ future.cancel(true);
+
+ if (pong.badResponse())
+ clusterMonitor.failed(node, pong.getError(0));
+ else
+ clusterMonitor.responded(node);
+ }
+
+ private class Pinger implements Callable<Pong> {
+
+ private final Node node;
+
+ public Pinger(Node node) {
+ this.node = node;
+ }
+
+ public Pong call() {
+ Pong pong;
+ try {
+ pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()),
+ fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString());
+ } catch (RuntimeException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": "
+ + Exceptions.toMessageString(e)));
+ }
+ return pong;
+ }
+
+ }
+
public static class Group {
private final int id;
@@ -86,6 +182,8 @@ public class SearchCluster {
private final int port;
private final int group;
+ private final AtomicBoolean working = new AtomicBoolean();
+
public Node(String hostname, int port, int group) {
this.hostname = hostname;
this.port = port;
@@ -98,6 +196,26 @@ public class SearchCluster {
/** Returns the id of this group this node belongs to */
public int group() { return group; }
+ private void setWorking(boolean working) {
+ this.working.lazySet(working);
+ }
+
+ /** Returns whether this node is currently responding to requests */
+ public boolean isWorking() { return working.get(); }
+
+ @Override
+ public int hashCode() { return Objects.hash(hostname, port); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if ( ! (o instanceof Node)) return false;
+ Node other = (Node)o;
+ if ( ! Objects.equals(this.hostname, other.hostname)) return false;
+ if ( ! Objects.equals(this.port, other.port)) return false;
+ return true;
+ }
+
@Override
public String toString() { return "search node " + hostname; }