summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2019-03-28 16:14:48 +0100
committerOlli Virtanen <olli.virtanen@oath.com>2019-03-28 16:14:48 +0100
commit748ad31c704fbd53ec45b659002a72564dbe2c04 (patch)
treec2e82755e2060af95f873d8e0224315522471755
parent5cb137c5209918bef89cf9fe14628c1078ac78f1 (diff)
Feature flag to enable protobuf in search protocol as default; protobuf ping
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java1
-rw-r--r--config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java2
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java3
-rw-r--r--configdefinitions/src/vespa/dispatch.def3
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java6
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/PongPacket.java2
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/Pong.java24
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java8
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java14
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java77
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java45
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java2
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java8
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java2
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java8
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java6
21 files changed, 190 insertions, 49 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
index 47160bdd4a2..959e08d18fa 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
@@ -54,6 +54,7 @@ public interface ModelContext {
boolean isFirstTimeDeployment();
boolean useDedicatedNodeForLogserver();
boolean useFdispatchByDefault();
+ boolean dispatchWithProtobuf();
boolean useAdaptiveDispatch();
boolean useSeparateServiceTypeForLogserverContainer();
boolean enableMetricsProxyContainer();
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
index 89433cdf7c0..c5837ae73f3 100644
--- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
+++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
@@ -35,6 +35,7 @@ public class TestProperties implements ModelContext.Properties {
private boolean isFirstTimeDeployment = false;
private boolean useDedicatedNodeForLogserver = false;
private boolean useFdispatchByDefault = true;
+ private boolean dispatchWithProtobuf = true;
private boolean useAdaptiveDispatch = false;
private boolean useSeparateServiceTypeForLogserverContainer = false;
private boolean enableMetricsProxyContainer = false;
@@ -54,6 +55,7 @@ public class TestProperties implements ModelContext.Properties {
@Override public boolean useAdaptiveDispatch() { return useAdaptiveDispatch; }
@Override public boolean useDedicatedNodeForLogserver() { return useDedicatedNodeForLogserver; }
@Override public boolean useFdispatchByDefault() { return useFdispatchByDefault; }
+ @Override public boolean dispatchWithProtobuf() { return dispatchWithProtobuf; }
@Override public boolean useSeparateServiceTypeForLogserverContainer() { return useSeparateServiceTypeForLogserverContainer; }
@Override public boolean enableMetricsProxyContainer() { return enableMetricsProxyContainer; }
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java
index 9caf7fbdc9e..ee82d0cd719 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java
@@ -107,6 +107,7 @@ public class IndexedSearchCluster extends SearchCluster
private final DispatchGroup rootDispatch;
private DispatchSpec dispatchSpec;
private final boolean useFdispatchByDefault;
+ private final boolean dispatchWithProtobuf;
private final boolean useAdaptiveDispatch;
private List<SearchNode> searchNodes = new ArrayList<>();
@@ -126,6 +127,7 @@ public class IndexedSearchCluster extends SearchCluster
dispatchParent = new SimpleConfigProducer(this, "dispatchers");
rootDispatch = new DispatchGroup(this);
useFdispatchByDefault = deployState.getProperties().useFdispatchByDefault();
+ dispatchWithProtobuf = deployState.getProperties().dispatchWithProtobuf();
useAdaptiveDispatch = deployState.getProperties().useAdaptiveDispatch();
}
@@ -437,6 +439,7 @@ public class IndexedSearchCluster extends SearchCluster
builder.maxNodesDownPerGroup(rootDispatch.getMaxNodesDownPerFixedRow());
builder.useMultilevelDispatch(useMultilevelDispatchSetup());
builder.useFdispatchByDefault(useFdispatchByDefault);
+ builder.dispatchWithProtobuf(dispatchWithProtobuf);
builder.useLocalNode(tuning.dispatch.useLocalNode);
builder.searchableCopies(rootDispatch.getSearchableCopies());
if (searchCoverage != null) {
diff --git a/configdefinitions/src/vespa/dispatch.def b/configdefinitions/src/vespa/dispatch.def
index f0a4b0d8419..7d5979bcdf1 100644
--- a/configdefinitions/src/vespa/dispatch.def
+++ b/configdefinitions/src/vespa/dispatch.def
@@ -19,6 +19,9 @@ distributionPolicy enum { ROUNDROBIN, ADAPTIVE } default=ROUNDROBIN
# Should fdispatch be used as the default dispatcher
useFdispatchByDefault bool default=true
+# Should protobuf/jrt be preferred over fs4
+dispatchWithProtobuf bool default=false
+
# Is multi-level dispatch configured for this cluster
useMultilevelDispatch bool default=false
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index 780135c893e..38886b4f656 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -131,6 +131,7 @@ public class ModelContextImpl implements ModelContext {
private final boolean useDedicatedNodeForLogserver;
private final boolean useFdispatchByDefault;
private final boolean useAdaptiveDispatch;
+ private final boolean dispatchWithProtobuf;
private final boolean useSeparateServiceTypeForLogserverContainer;
private final boolean enableMetricsProxyContainer;
@@ -161,6 +162,8 @@ public class ModelContextImpl implements ModelContext {
.with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value();
this.useFdispatchByDefault = Flags.USE_FDISPATCH_BY_DEFAULT.bindTo(flagSource)
.with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value();
+ this.dispatchWithProtobuf = Flags.DISPATCH_WITH_PROTOBUF.bindTo(flagSource)
+ .with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value();
this.useAdaptiveDispatch = Flags.USE_ADAPTIVE_DISPATCH.bindTo(flagSource)
.with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value();
this.useSeparateServiceTypeForLogserverContainer = Flags.USE_SEPARATE_SERVICE_TYPE_FOR_LOGSERVER_CONTAINER.bindTo(flagSource)
@@ -213,6 +216,9 @@ public class ModelContextImpl implements ModelContext {
public boolean useFdispatchByDefault() { return useFdispatchByDefault; }
@Override
+ public boolean dispatchWithProtobuf() { return dispatchWithProtobuf; }
+
+ @Override
public boolean useAdaptiveDispatch() { return useAdaptiveDispatch; }
@Override
diff --git a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java
index 13fb7d84408..37aaf7067a9 100644
--- a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java
+++ b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java
@@ -28,7 +28,7 @@ public class PongPacket extends BasicPacket {
/** For testing */
public PongPacket(long activeDocuments) {
- this.activeDocuments = Optional.of(activeDocuments);
+ this.activeDocuments = Optional.of(activeDocuments);
}
private int code;
diff --git a/container-search/src/main/java/com/yahoo/prelude/Pong.java b/container-search/src/main/java/com/yahoo/prelude/Pong.java
index cef64c293af..a6bc3e7975d 100644
--- a/container-search/src/main/java/com/yahoo/prelude/Pong.java
+++ b/container-search/src/main/java/com/yahoo/prelude/Pong.java
@@ -1,15 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude;
+import com.yahoo.fs4.PongPacket;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.statistics.ElapsedTime;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import com.yahoo.fs4.PongPacket;
-import com.yahoo.search.result.ErrorMessage;
-import com.yahoo.search.statistics.ElapsedTime;
-
/**
* An answer from Ping.
*
@@ -21,20 +21,29 @@ public class Pong {
private final List<ErrorMessage> errors = new ArrayList<>(1);
private final Optional<PongPacket> pongPacket;
private ElapsedTime elapsed = new ElapsedTime();
+ private final Optional<Long> activeDocuments;
public Pong() {
this.pongPacket = Optional.empty();
+ this.activeDocuments = Optional.empty();
}
-
+
public Pong(ErrorMessage error) {
errors.add(error);
this.pongPacket = Optional.empty();
+ this.activeDocuments = Optional.empty();
}
-
+
public Pong(PongPacket pongPacket) {
this.pongPacket = Optional.of(pongPacket);
+ this.activeDocuments = Optional.empty();
}
-
+
+ public Pong(long activeDocuments) {
+ this.pongPacket = Optional.empty();
+ this.activeDocuments = Optional.of(activeDocuments);
+ }
+
public void addError(ErrorMessage error) {
errors.add(error);
}
@@ -49,6 +58,7 @@ public class Pong {
/** Returns the number of active documents in the backend responding in this Pong, if available */
public Optional<Long> activeDocuments() {
+ if (activeDocuments.isPresent()) return activeDocuments;
if ( ! pongPacket.isPresent()) return Optional.empty();
return pongPacket.get().getActiveDocuments();
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
index ac99675c9c5..b9af60089f8 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
@@ -3,13 +3,16 @@ package com.yahoo.prelude.fastsearch;
import com.google.common.collect.ImmutableMap;
import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.prelude.Pong;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.InterleavedFillInvoker;
import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.dispatch.searchcluster.Pinger;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Hit;
@@ -20,6 +23,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Callable;
/**
* FS4InvokerFactory constructs {@link FillInvoker} and {@link SearchInvoker} objects that communicate with
@@ -97,4 +101,8 @@ public class FS4InvokerFactory extends InvokerFactory {
return requiredNodes;
}
+ @Override
+ public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) {
+ return new Pinger(node, monitor, fs4ResourcePool);
+ }
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 74d9c38b273..f2dbb1e8557 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -56,6 +56,7 @@ public class Dispatcher extends AbstractComponent {
private final LoadBalancer loadBalancer;
private final boolean multilevelDispatch;
private final boolean internalDispatchByDefault;
+ private final boolean dispatchWithProtobuf;
private final FS4InvokerFactory fs4InvokerFactory;
private final RpcInvokerFactory rpcInvokerFactory;
@@ -65,14 +66,12 @@ public class Dispatcher extends AbstractComponent {
public Dispatcher(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize,
VipStatus vipStatus, Metric metric) {
- this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig,
- fs4ResourcePool, new RpcResourcePool(dispatchConfig), metric);
+ this(new SearchCluster(clusterId, dispatchConfig, containerClusterSize, vipStatus), dispatchConfig, fs4ResourcePool, metric);
}
- public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool,
- RpcResourcePool rpcResourcePool, Metric metric) {
+ public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, Metric metric) {
this(searchCluster, dispatchConfig, new FS4InvokerFactory(fs4ResourcePool, searchCluster),
- new RpcInvokerFactory(rpcResourcePool, searchCluster), metric);
+ new RpcInvokerFactory(new RpcResourcePool(dispatchConfig), searchCluster, dispatchConfig.dispatchWithProtobuf()), metric);
}
public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4InvokerFactory fs4InvokerFactory,
@@ -82,12 +81,15 @@ public class Dispatcher extends AbstractComponent {
dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN);
this.multilevelDispatch = dispatchConfig.useMultilevelDispatch();
this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault();
+ this.dispatchWithProtobuf = dispatchConfig.dispatchWithProtobuf();
this.fs4InvokerFactory = fs4InvokerFactory;
this.rpcInvokerFactory = rpcInvokerFactory;
this.metric = metric;
this.metricContext = metric.createContext(null);
+
+ searchCluster.startClusterMonitoring(dispatchWithProtobuf ? rpcInvokerFactory : fs4InvokerFactory);
}
/** Returns the search cluster this dispatches to */
@@ -120,7 +122,8 @@ public class Dispatcher extends AbstractComponent {
return Optional.empty();
}
- InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, false) ? rpcInvokerFactory : fs4InvokerFactory;
+ InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, dispatchWithProtobuf)
+ ? rpcInvokerFactory : fs4InvokerFactory;
Optional<SearchInvoker> invoker = getSearchPathInvoker(query, factory, searcher);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index 815a2a257ea..8617c74ec41 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -1,9 +1,11 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
@@ -15,6 +17,7 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.concurrent.Callable;
/**
* @author ollivir
@@ -30,6 +33,8 @@ public abstract class InvokerFactory {
public abstract Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result);
+ public abstract Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor);
+
/**
* Create a {@link SearchInvoker} for a list of content nodes.
*
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 4422538cff6..cc37df04a62 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -2,6 +2,8 @@
package com.yahoo.search.dispatch.rpc;
import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.FastHit;
import java.util.List;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index c1b164aaeaa..b0a418241f8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -1,11 +1,13 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.InvokerFactory;
@@ -14,6 +16,7 @@ import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import java.util.Optional;
+import java.util.concurrent.Callable;
/**
* @author ollivir
@@ -23,10 +26,12 @@ public class RpcInvokerFactory extends InvokerFactory {
private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries");
private final RpcResourcePool rpcResourcePool;
+ private final boolean dispatchWithProtobuf;
- public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) {
+ public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster, boolean dispatchWithProtobuf) {
super(searchCluster);
this.rpcResourcePool = rpcResourcePool;
+ this.dispatchWithProtobuf = dispatchWithProtobuf;
}
@Override
@@ -40,7 +45,7 @@ public class RpcInvokerFactory extends InvokerFactory {
boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query);
- if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, false)) {
+ if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, dispatchWithProtobuf)) {
return Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(),
summaryNeedsQuery));
}
@@ -62,4 +67,9 @@ public class RpcInvokerFactory extends InvokerFactory {
public void release() {
rpcResourcePool.release();
}
+
+ @Override
+ public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) {
+ return new RpcPing(node, monitor, rpcResourcePool);
+ }
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
new file mode 100644
index 00000000000..f3479e2e4a9
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -0,0 +1,77 @@
+package com.yahoo.search.dispatch.rpc;
+
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.prelude.Pong;
+import com.yahoo.search.cluster.ClusterMonitor;
+import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse;
+import com.yahoo.search.dispatch.rpc.Client.ResponseOrError;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.yolean.Exceptions;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class RpcPing implements Callable<Pong> {
+ private static final String RPC_METHOD = "vespa.searchprotocol.ping";
+ private static final CompressionType PING_COMPRESSION = CompressionType.NONE;
+
+ private final Node node;
+ private final RpcResourcePool resourcePool;
+ private final ClusterMonitor<Node> clusterMonitor;
+
+ public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool) {
+ this.node = node;
+ this.resourcePool = rpcResourcePool;
+ this.clusterMonitor = clusterMonitor;
+ }
+
+ @Override
+ public Pong call() throws Exception {
+ try {
+ var queue = new LinkedBlockingQueue<ResponseOrError<ProtobufResponse>>(1);
+
+ sendPing(queue);
+
+ var responseOrError = queue.poll(clusterMonitor.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
+ if (responseOrError == null) {
+ return new Pong(ErrorMessage.createTimeout("Timed out waiting for pong from " + node));
+ } else if (responseOrError.error().isPresent()) {
+ return new Pong(ErrorMessage.createBackendCommunicationError(responseOrError.error().get()));
+ }
+
+ return decodeReply(responseOrError.response().get());
+ } catch (RuntimeException e) {
+ return new Pong(
+ ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + Exceptions.toMessageString(e)));
+ }
+ }
+
+ private void sendPing(LinkedBlockingQueue<ResponseOrError<ProtobufResponse>> queue) {
+ var connection = resourcePool.nodeConnections().get(node.key());
+ var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
+ double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0;
+ Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping);
+ resourcePool.client().request(RPC_METHOD, connection, compressionResult.type(), ping.length, compressionResult.data(),
+ rsp -> queue.add(rsp), timeoutSeconds);
+ }
+
+ private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException {
+ CompressionType compression = CompressionType.valueOf(response.compression());
+ byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize());
+ var reply = SearchProtocol.MonitorReply.parseFrom(responseBytes);
+
+ if (reply.getDistributionKey() != node.key()) {
+ return new Pong(ErrorMessage.createBackendCommunicationError(
+ "Expected pong from node id " + node.key() + ", response is from id " + reply.getDistributionKey()));
+ } else if (!reply.getOnline()) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Node id " + node.key() + " reports being offline"));
+ } else {
+ return new Pong(reply.getActiveDocs());
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
index 7e0e3117628..a71ce0354f9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
@@ -53,7 +53,7 @@ public class Node {
public boolean isWorking() { return working.get(); }
/** Updates the active documents on this node */
- void setActiveDocuments(long activeDocuments) {
+ public void setActiveDocuments(long activeDocuments) {
this.activeDocuments.set(activeDocuments);
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java
index 7c7a9cb1d1c..dea6f741bb0 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java
@@ -15,7 +15,7 @@ import java.util.concurrent.Callable;
* @author bratseth
* @author ollivir
*/
-class Pinger implements Callable<Pong> {
+public class Pinger implements Callable<Pong> {
private final Node node;
private final ClusterMonitor<Node> clusterMonitor;
private final FS4ResourcePool fs4ResourcePool;
@@ -30,8 +30,6 @@ class Pinger implements Callable<Pong> {
try {
Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()),
fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString());
- if (pong.activeDocuments().isPresent())
- node.setActiveDocuments(pong.activeDocuments().get());
return pong;
} catch (RuntimeException e) {
return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": "
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 5c3ef98c523..6c28352f27b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -7,12 +7,12 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.net.HostName;
+import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
+import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
-import com.yahoo.prelude.Pong;
-import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import java.util.LinkedHashMap;
import java.util.List;
@@ -46,6 +46,7 @@ public class SearchCluster implements NodeManager<Node> {
private final ImmutableList<Group> orderedGroups;
private final ClusterMonitor<Node> clusterMonitor;
private final VipStatus vipStatus;
+ private InvokerFactory pingFactory;
/**
* A search node on this local machine having the entire corpus, which we therefore
@@ -57,13 +58,9 @@ public class SearchCluster implements NodeManager<Node> {
*/
private final Optional<Node> directDispatchTarget;
- // Only needed until query requests are moved to rpc
- private final FS4ResourcePool fs4ResourcePool;
-
- public SearchCluster(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) {
+ public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) {
this.clusterId = clusterId;
this.dispatchConfig = dispatchConfig;
- this.fs4ResourcePool = fs4ResourcePool;
this.vipStatus = vipStatus;
List<Node> nodes = toNodes(dispatchConfig);
@@ -89,15 +86,20 @@ public class SearchCluster implements NodeManager<Node> {
this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize,
nodesByHost, groups);
- // Set up monitoring of the fs4 interface of the nodes
- // We can switch to monitoring the rpc interface instead when we move the query phase to rpc
this.clusterMonitor = new ClusterMonitor<>(this);
- for (Node node : nodes) {
- // cluster monitor will only call working() when the
- // node transitions from down to up, so we need to
- // register the initial (working) state here:
- working(node);
- clusterMonitor.add(node, true);
+ }
+
+ public void startClusterMonitoring(InvokerFactory pingFactory) {
+ this.pingFactory = pingFactory;
+
+ for (var group : orderedGroups) {
+ for (var node : group.nodes()) {
+ // cluster monitor will only call working() when the
+ // node transitions from down to up, so we need to
+ // register the initial (working) state here:
+ working(node);
+ clusterMonitor.add(node, true);
+ }
}
}
@@ -251,16 +253,21 @@ public class SearchCluster implements NodeManager<Node> {
/** Used by the cluster monitor to manage node status */
@Override
public void ping(Node node, Executor executor) {
- Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool);
- FutureTask<Pong> futurePong = new FutureTask<>(pinger);
+ if (pingFactory == null) // not initialized yet
+ return;
+ FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor));
executor.execute(futurePong);
Pong pong = getPong(futurePong, node);
futurePong.cancel(true);
- if (pong.badResponse())
+ if (pong.badResponse()) {
clusterMonitor.failed(node, pong.getError(0));
- else
+ } else {
+ if (pong.activeDocuments().isPresent()) {
+ node.setActiveDocuments(pong.activeDocuments().get());
+ }
clusterMonitor.responded(node);
+ }
}
/**
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index 859e10dbe2c..25aed879a48 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -137,7 +137,7 @@ public class DispatcherTest {
public class MockRpcInvokerFactory extends RpcInvokerFactory {
public MockRpcInvokerFactory() {
- super(null, null);
+ super(null, null, true);
}
@Override
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
index 141d87a41ab..1ebf7940f25 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
@@ -29,7 +29,7 @@ public class LoadBalancerTest {
@Test
public void requireThatLoadBalancerServesSingleNodeSetups() {
Node n1 = new Node(0, "test-node1", 0, 0);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), null, 1, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), 1, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
Optional<Group> grp = lb.takeGroup(null);
@@ -43,7 +43,7 @@ public class LoadBalancerTest {
public void requireThatLoadBalancerServesMultiGroupSetups() {
Node n1 = new Node(0, "test-node1", 0, 0);
Node n2 = new Node(1, "test-node2", 1, 1);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), 1, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
Optional<Group> grp = lb.takeGroup(null);
@@ -59,7 +59,7 @@ public class LoadBalancerTest {
Node n2 = new Node(1, "test-node2", 1, 0);
Node n3 = new Node(0, "test-node3", 0, 1);
Node n4 = new Node(1, "test-node4", 1, 1);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2, n3, n4), null, 2, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2, n3, n4), 2, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
Optional<Group> grp = lb.takeGroup(null);
@@ -70,7 +70,7 @@ public class LoadBalancerTest {
public void requireThatLoadBalancerReturnsDifferentGroups() {
Node n1 = new Node(0, "test-node1", 0, 0);
Node n2 = new Node(1, "test-node2", 1, 1);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), 1, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
// get first group
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
index 3919bc26bdc..42a22f6f86b 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
@@ -28,7 +28,7 @@ public class MockSearchCluster extends SearchCluster {
}
public MockSearchCluster(String clusterId, DispatchConfig dispatchConfig, int groups, int nodesPerGroup) {
- super(clusterId, dispatchConfig, null, 1, null);
+ super(clusterId, dispatchConfig, 1, null);
ImmutableList.Builder<Group> orderedGroupBuilder = ImmutableList.builder();
ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder();
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java
index 2adbd12a2aa..e059008acac 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java
@@ -39,7 +39,7 @@ public class FillTestCase {
nodes.put(1, client.createConnection("host1", 123));
nodes.put(2, client.createConnection("host2", 123));
RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
- RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null);
+ RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
Result result = new Result(query);
@@ -76,7 +76,7 @@ public class FillTestCase {
nodes.put(1, client.createConnection("host1", 123));
nodes.put(2, client.createConnection("host2", 123));
RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
- RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null);
+ RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
Result result = new Result(query);
@@ -116,7 +116,7 @@ public class FillTestCase {
Map<Integer, Client.NodeConnection> nodes = new HashMap<>();
nodes.put(0, client.createConnection("host0", 123));
RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
- RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null);
+ RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
Result result = new Result(query);
@@ -134,7 +134,7 @@ public class FillTestCase {
Map<Integer, Client.NodeConnection> nodes = new HashMap<>();
nodes.put(0, client.createConnection("host0", 123));
RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
- RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null);
+ RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
Result result = new Result(query);
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index 9de9f8f2ba4..dc50d237ab7 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -95,6 +95,12 @@ public class Flags {
"Takes effect at redeployment",
APPLICATION_ID);
+ public static final UnboundBooleanFlag DISPATCH_WITH_PROTOBUF = defineFeatureFlag(
+ "dispatch-with-protobuf", false,
+ "Should the java dispatcher use protobuf/jrt as the default",
+ "Takes effect at redeployment",
+ APPLICATION_ID);
+
public static final UnboundBooleanFlag ENABLE_DYNAMIC_PROVISIONING = defineFeatureFlag(
"enable-dynamic-provisioning", false,
"Provision a new docker host when we otherwise can't allocate a docker node",