diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2019-03-28 16:14:48 +0100 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2019-03-28 16:14:48 +0100 |
commit | 748ad31c704fbd53ec45b659002a72564dbe2c04 (patch) | |
tree | c2e82755e2060af95f873d8e0224315522471755 /container-search/src/main | |
parent | 5cb137c5209918bef89cf9fe14628c1078ac78f1 (diff) |
Feature flag to enable protobuf in search protocol as default; protobuf ping
Diffstat (limited to 'container-search/src/main')
11 files changed, 159 insertions, 39 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java index 13fb7d84408..37aaf7067a9 100644 --- a/container-search/src/main/java/com/yahoo/fs4/PongPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/PongPacket.java @@ -28,7 +28,7 @@ public class PongPacket extends BasicPacket { /** For testing */ public PongPacket(long activeDocuments) { - this.activeDocuments = Optional.of(activeDocuments); + this.activeDocuments = Optional.of(activeDocuments); } private int code; diff --git a/container-search/src/main/java/com/yahoo/prelude/Pong.java b/container-search/src/main/java/com/yahoo/prelude/Pong.java index cef64c293af..a6bc3e7975d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/Pong.java +++ b/container-search/src/main/java/com/yahoo/prelude/Pong.java @@ -1,15 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude; +import com.yahoo.fs4.PongPacket; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.statistics.ElapsedTime; + import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; -import com.yahoo.fs4.PongPacket; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.statistics.ElapsedTime; - /** * An answer from Ping. * @@ -21,20 +21,29 @@ public class Pong { private final List<ErrorMessage> errors = new ArrayList<>(1); private final Optional<PongPacket> pongPacket; private ElapsedTime elapsed = new ElapsedTime(); + private final Optional<Long> activeDocuments; public Pong() { this.pongPacket = Optional.empty(); + this.activeDocuments = Optional.empty(); } - + public Pong(ErrorMessage error) { errors.add(error); this.pongPacket = Optional.empty(); + this.activeDocuments = Optional.empty(); } - + public Pong(PongPacket pongPacket) { this.pongPacket = Optional.of(pongPacket); + this.activeDocuments = Optional.empty(); } - + + public Pong(long activeDocuments) { + this.pongPacket = Optional.empty(); + this.activeDocuments = Optional.of(activeDocuments); + } + public void addError(ErrorMessage error) { errors.add(error); } @@ -49,6 +58,7 @@ public class Pong { /** Returns the number of active documents in the backend responding in this Pong, if available */ public Optional<Long> activeDocuments() { + if (activeDocuments.isPresent()) return activeDocuments; if ( ! pongPacket.isPresent()) return Optional.empty(); return pongPacket.get().getActiveDocuments(); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index ac99675c9c5..b9af60089f8 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -3,13 +3,16 @@ package com.yahoo.prelude.fastsearch; import com.google.common.collect.ImmutableMap; import com.yahoo.fs4.mplex.Backend; +import com.yahoo.prelude.Pong; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InterleavedFillInvoker; import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.Pinger; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Hit; @@ -20,6 +23,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; /** * FS4InvokerFactory constructs {@link FillInvoker} and {@link SearchInvoker} objects that communicate with @@ -97,4 +101,8 @@ public class FS4InvokerFactory extends InvokerFactory { return requiredNodes; } + @Override + public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) { + return new Pinger(node, monitor, fs4ResourcePool); + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 74d9c38b273..f2dbb1e8557 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -56,6 +56,7 @@ public class Dispatcher extends AbstractComponent { private final LoadBalancer loadBalancer; private final boolean multilevelDispatch; private final boolean internalDispatchByDefault; + private final boolean dispatchWithProtobuf; private final FS4InvokerFactory fs4InvokerFactory; private final RpcInvokerFactory rpcInvokerFactory; @@ -65,14 +66,12 @@ public class Dispatcher extends AbstractComponent { public Dispatcher(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus, Metric metric) { - this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig, - fs4ResourcePool, new RpcResourcePool(dispatchConfig), metric); + this(new SearchCluster(clusterId, dispatchConfig, containerClusterSize, vipStatus), dispatchConfig, fs4ResourcePool, metric); } - public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, - RpcResourcePool rpcResourcePool, Metric metric) { + public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, Metric metric) { this(searchCluster, dispatchConfig, new FS4InvokerFactory(fs4ResourcePool, searchCluster), - new RpcInvokerFactory(rpcResourcePool, searchCluster), metric); + new RpcInvokerFactory(new RpcResourcePool(dispatchConfig), searchCluster, dispatchConfig.dispatchWithProtobuf()), metric); } public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4InvokerFactory fs4InvokerFactory, @@ -82,12 +81,15 @@ public class Dispatcher extends AbstractComponent { dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); this.multilevelDispatch = dispatchConfig.useMultilevelDispatch(); this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault(); + this.dispatchWithProtobuf = dispatchConfig.dispatchWithProtobuf(); this.fs4InvokerFactory = fs4InvokerFactory; this.rpcInvokerFactory = rpcInvokerFactory; this.metric = metric; this.metricContext = metric.createContext(null); + + searchCluster.startClusterMonitoring(dispatchWithProtobuf ? rpcInvokerFactory : fs4InvokerFactory); } /** Returns the search cluster this dispatches to */ @@ -120,7 +122,8 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } - InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, false) ? rpcInvokerFactory : fs4InvokerFactory; + InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, dispatchWithProtobuf) + ? rpcInvokerFactory : fs4InvokerFactory; Optional<SearchInvoker> invoker = getSearchPathInvoker(query, factory, searcher); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 815a2a257ea..8617c74ec41 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -1,9 +1,11 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; @@ -15,6 +17,7 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.concurrent.Callable; /** * @author ollivir @@ -30,6 +33,8 @@ public abstract class InvokerFactory { public abstract Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result); + public abstract Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor); + /** * Create a {@link SearchInvoker} for a list of content nodes. * diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index 4422538cff6..cc37df04a62 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -2,6 +2,8 @@ package com.yahoo.search.dispatch.rpc; import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.FastHit; import java.util.List; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index c1b164aaeaa..b0a418241f8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -1,11 +1,13 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InvokerFactory; @@ -14,6 +16,7 @@ import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.Optional; +import java.util.concurrent.Callable; /** * @author ollivir @@ -23,10 +26,12 @@ public class RpcInvokerFactory extends InvokerFactory { private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); private final RpcResourcePool rpcResourcePool; + private final boolean dispatchWithProtobuf; - public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) { + public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster, boolean dispatchWithProtobuf) { super(searchCluster); this.rpcResourcePool = rpcResourcePool; + this.dispatchWithProtobuf = dispatchWithProtobuf; } @Override @@ -40,7 +45,7 @@ public class RpcInvokerFactory extends InvokerFactory { boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query); - if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, false)) { + if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, dispatchWithProtobuf)) { return Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery)); } @@ -62,4 +67,9 @@ public class RpcInvokerFactory extends InvokerFactory { public void release() { rpcResourcePool.release(); } + + @Override + public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) { + return new RpcPing(node, monitor, rpcResourcePool); + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java new file mode 100644 index 00000000000..f3479e2e4a9 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -0,0 +1,77 @@ +package com.yahoo.search.dispatch.rpc; + +import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol; +import com.google.protobuf.InvalidProtocolBufferException; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.prelude.Pong; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse; +import com.yahoo.search.dispatch.rpc.Client.ResponseOrError; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.yolean.Exceptions; + +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class RpcPing implements Callable<Pong> { + private static final String RPC_METHOD = "vespa.searchprotocol.ping"; + private static final CompressionType PING_COMPRESSION = CompressionType.NONE; + + private final Node node; + private final RpcResourcePool resourcePool; + private final ClusterMonitor<Node> clusterMonitor; + + public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool) { + this.node = node; + this.resourcePool = rpcResourcePool; + this.clusterMonitor = clusterMonitor; + } + + @Override + public Pong call() throws Exception { + try { + var queue = new LinkedBlockingQueue<ResponseOrError<ProtobufResponse>>(1); + + sendPing(queue); + + var responseOrError = queue.poll(clusterMonitor.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS); + if (responseOrError == null) { + return new Pong(ErrorMessage.createTimeout("Timed out waiting for pong from " + node)); + } else if (responseOrError.error().isPresent()) { + return new Pong(ErrorMessage.createBackendCommunicationError(responseOrError.error().get())); + } + + return decodeReply(responseOrError.response().get()); + } catch (RuntimeException e) { + return new Pong( + ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + Exceptions.toMessageString(e))); + } + } + + private void sendPing(LinkedBlockingQueue<ResponseOrError<ProtobufResponse>> queue) { + var connection = resourcePool.nodeConnections().get(node.key()); + var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); + double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; + Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping); + resourcePool.client().request(RPC_METHOD, connection, compressionResult.type(), ping.length, compressionResult.data(), + rsp -> queue.add(rsp), timeoutSeconds); + } + + private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException { + CompressionType compression = CompressionType.valueOf(response.compression()); + byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize()); + var reply = SearchProtocol.MonitorReply.parseFrom(responseBytes); + + if (reply.getDistributionKey() != node.key()) { + return new Pong(ErrorMessage.createBackendCommunicationError( + "Expected pong from node id " + node.key() + ", response is from id " + reply.getDistributionKey())); + } else if (!reply.getOnline()) { + return new Pong(ErrorMessage.createBackendCommunicationError("Node id " + node.key() + " reports being offline")); + } else { + return new Pong(reply.getActiveDocs()); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index 7e0e3117628..a71ce0354f9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -53,7 +53,7 @@ public class Node { public boolean isWorking() { return working.get(); } /** Updates the active documents on this node */ - void setActiveDocuments(long activeDocuments) { + public void setActiveDocuments(long activeDocuments) { this.activeDocuments.set(activeDocuments); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java index 7c7a9cb1d1c..dea6f741bb0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java @@ -15,7 +15,7 @@ import java.util.concurrent.Callable; * @author bratseth * @author ollivir */ -class Pinger implements Callable<Pong> { +public class Pinger implements Callable<Pong> { private final Node node; private final ClusterMonitor<Node> clusterMonitor; private final FS4ResourcePool fs4ResourcePool; @@ -30,8 +30,6 @@ class Pinger implements Callable<Pong> { try { Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString()); - if (pong.activeDocuments().isPresent()) - node.setActiveDocuments(pong.activeDocuments().get()); return pong; } catch (RuntimeException e) { return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 5c3ef98c523..6c28352f27b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -7,12 +7,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; +import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; +import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; -import com.yahoo.prelude.Pong; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; import java.util.LinkedHashMap; import java.util.List; @@ -46,6 +46,7 @@ public class SearchCluster implements NodeManager<Node> { private final ImmutableList<Group> orderedGroups; private final ClusterMonitor<Node> clusterMonitor; private final VipStatus vipStatus; + private InvokerFactory pingFactory; /** * A search node on this local machine having the entire corpus, which we therefore @@ -57,13 +58,9 @@ public class SearchCluster implements NodeManager<Node> { */ private final Optional<Node> directDispatchTarget; - // Only needed until query requests are moved to rpc - private final FS4ResourcePool fs4ResourcePool; - - public SearchCluster(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { + public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) { this.clusterId = clusterId; this.dispatchConfig = dispatchConfig; - this.fs4ResourcePool = fs4ResourcePool; this.vipStatus = vipStatus; List<Node> nodes = toNodes(dispatchConfig); @@ -89,15 +86,20 @@ public class SearchCluster implements NodeManager<Node> { this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, nodesByHost, groups); - // Set up monitoring of the fs4 interface of the nodes - // We can switch to monitoring the rpc interface instead when we move the query phase to rpc this.clusterMonitor = new ClusterMonitor<>(this); - for (Node node : nodes) { - // cluster monitor will only call working() when the - // node transitions from down to up, so we need to - // register the initial (working) state here: - working(node); - clusterMonitor.add(node, true); + } + + public void startClusterMonitoring(InvokerFactory pingFactory) { + this.pingFactory = pingFactory; + + for (var group : orderedGroups) { + for (var node : group.nodes()) { + // cluster monitor will only call working() when the + // node transitions from down to up, so we need to + // register the initial (working) state here: + working(node); + clusterMonitor.add(node, true); + } } } @@ -251,16 +253,21 @@ public class SearchCluster implements NodeManager<Node> { /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { - Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool); - FutureTask<Pong> futurePong = new FutureTask<>(pinger); + if (pingFactory == null) // not initialized yet + return; + FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor)); executor.execute(futurePong); Pong pong = getPong(futurePong, node); futurePong.cancel(true); - if (pong.badResponse()) + if (pong.badResponse()) { clusterMonitor.failed(node, pong.getError(0)); - else + } else { + if (pong.activeDocuments().isPresent()) { + node.setActiveDocuments(pong.activeDocuments().get()); + } clusterMonitor.responded(node); + } } /** |