summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java56
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java41
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java (renamed from container-search/src/main/java/com/yahoo/search/dispatch/Client.java)56
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java73
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java223
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java (renamed from container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java)55
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java (renamed from container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java)3
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java98
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java (renamed from container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java)26
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java118
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java7
13 files changed, 722 insertions, 51 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 146b132be22..0aee51e1e32 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -3,7 +3,6 @@ package com.yahoo.search.dispatch;
import com.yahoo.component.AbstractComponent;
import com.yahoo.container.handler.VipStatus;
-import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.prelude.fastsearch.FS4InvokerFactory;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
@@ -11,6 +10,8 @@ import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
+import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
+import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
@@ -42,25 +43,38 @@ public class Dispatcher extends AbstractComponent {
/** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */
private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");
+ /** If enabled, search queries will use protobuf rpc */
+ private static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf");
+
/** A model of the search cluster this dispatches to */
private final SearchCluster searchCluster;
private final LoadBalancer loadBalancer;
- private final RpcResourcePool rpcResourcePool;
private final boolean multilevelDispatch;
private final boolean internalDispatchByDefault;
+ private final FS4InvokerFactory fs4InvokerFactory;
+ private final RpcInvokerFactory rpcInvokerFactory;
+
public Dispatcher(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) {
- this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig);
+ this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig,
+ fs4ResourcePool, new RpcResourcePool(dispatchConfig));
+ }
+
+ public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, RpcResourcePool rpcResourcePool) {
+ this(searchCluster, dispatchConfig, new FS4InvokerFactory(fs4ResourcePool, searchCluster),
+ new RpcInvokerFactory(rpcResourcePool, searchCluster));
}
- public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig) {
+ public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4InvokerFactory fs4InvokerFactory, RpcInvokerFactory rpcInvokerFactory) {
this.searchCluster = searchCluster;
this.loadBalancer = new LoadBalancer(searchCluster,
dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN);
- this.rpcResourcePool = new RpcResourcePool(dispatchConfig);
this.multilevelDispatch = dispatchConfig.useMultilevelDispatch();
this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault();
+
+ this.fs4InvokerFactory = fs4InvokerFactory;
+ this.rpcInvokerFactory = rpcInvokerFactory;
}
/** Returns the search cluster this dispatches to */
@@ -70,17 +84,16 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
- rpcResourcePool.release();
+ rpcInvokerFactory.release();
}
- public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb,
- FS4InvokerFactory fs4InvokerFactory) {
- Optional<FillInvoker> rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb);
+ public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher) {
+ Optional<FillInvoker> rpcInvoker = rpcInvokerFactory.createFillInvoker(searcher, result);
if (rpcInvoker.isPresent()) {
return rpcInvoker;
}
if (result.getQuery().properties().getBoolean(dispatchInternal, internalDispatchByDefault)) {
- Optional<FillInvoker> fs4Invoker = fs4InvokerFactory.getFillInvoker(result);
+ Optional<FillInvoker> fs4Invoker = fs4InvokerFactory.createFillInvoker(searcher, result);
if (fs4Invoker.isPresent()) {
return fs4Invoker;
}
@@ -88,15 +101,17 @@ public class Dispatcher extends AbstractComponent {
return Optional.empty();
}
- public Optional<SearchInvoker> getSearchInvoker(Query query, FS4InvokerFactory fs4InvokerFactory) {
+ public Optional<SearchInvoker> getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
if (multilevelDispatch || ! query.properties().getBoolean(dispatchInternal, internalDispatchByDefault)) {
return Optional.empty();
}
- Optional<SearchInvoker> invoker = getSearchPathInvoker(query, fs4InvokerFactory::getSearchInvoker);
+ InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, false) ? rpcInvokerFactory : fs4InvokerFactory;
+
+ Optional<SearchInvoker> invoker = getSearchPathInvoker(query, factory, searcher);
if (!invoker.isPresent()) {
- invoker = getInternalInvoker(query, fs4InvokerFactory::getSearchInvoker);
+ invoker = getInternalInvoker(query, factory, searcher);
}
if (invoker.isPresent() && query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
query.setHits(0);
@@ -105,13 +120,12 @@ public class Dispatcher extends AbstractComponent {
return invoker;
}
- @FunctionalInterface
- private interface SearchInvokerSupplier {
- Optional<SearchInvoker> supply(Query query, OptionalInt groupId, List<Node> nodes, boolean acceptIncompleteCoverage);
+ public FS4InvokerFactory getFS4InvokerFactory() {
+ return fs4InvokerFactory;
}
// build invoker based on searchpath
- private Optional<SearchInvoker> getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) {
+ private Optional<SearchInvoker> getSearchPathInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) {
String searchPath = query.getModel().getSearchPath();
if(searchPath == null) {
return Optional.empty();
@@ -122,19 +136,19 @@ public class Dispatcher extends AbstractComponent {
return Optional.empty();
} else {
query.trace(false, 2, "Dispatching internally with search path ", searchPath);
- return invokerFactory.supply(query, OptionalInt.empty(), nodes, true);
+ return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), nodes, true);
}
} catch (InvalidSearchPathException e) {
return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage())));
}
}
- private Optional<SearchInvoker> getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) {
+ private Optional<SearchInvoker> getInternalInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) {
Optional<Node> directNode = searchCluster.directDispatchTarget();
if (directNode.isPresent()) {
Node node = directNode.get();
query.trace(false, 2, "Dispatching directly to ", node);
- return invokerFactory.supply(query, OptionalInt.empty(), Arrays.asList(node), true);
+ return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), Arrays.asList(node), true);
}
int covered = searchCluster.groupsWithSufficientCoverage();
@@ -149,7 +163,7 @@ public class Dispatcher extends AbstractComponent {
}
Group group = groupInCluster.get();
boolean acceptIncompleteCoverage = (i == max - 1);
- Optional<SearchInvoker> invoker = invokerFactory.supply(query, OptionalInt.of(group.id()), group.nodes(),
+ Optional<SearchInvoker> invoker = invokerFactory.createSearchInvoker(searcher, query, OptionalInt.of(group.id()), group.nodes(),
acceptIncompleteCoverage);
if (invoker.isPresent()) {
query.trace(false, 2, "Dispatching internally to search group ", group.id());
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
new file mode 100644
index 00000000000..ca471fb2baa
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -0,0 +1,41 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.result.Coverage;
+import com.yahoo.search.result.ErrorMessage;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+
+/**
+ * @author ollivir
+ */
+public abstract class InvokerFactory {
+ public abstract Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId,
+ List<Node> nodes, boolean acceptIncompleteCoverage);
+
+ public abstract Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result);
+
+ protected static SearchInvoker createCoverageErrorInvoker(List<Node> nodes, Set<Integer> failed) {
+ StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: ");
+ int count = 0;
+ for (Node node : nodes) {
+ if (failed.contains(node.key())) {
+ if (count > 0) {
+ down.append(", ");
+ }
+ count++;
+ down.append(node.key());
+ }
+ }
+ Coverage coverage = new Coverage(0, 0, 0);
+ coverage.setNodesTried(count);
+ return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
index 0d7ef53bb50..1650494db3a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
@@ -5,6 +5,8 @@ import com.yahoo.fs4.QueryPacket;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.result.Coverage;
+import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.searchchain.Execution;
import java.io.IOException;
@@ -53,4 +55,13 @@ public abstract class SearchInvoker extends CloseableInvoker {
protected Optional<Integer> distributionKey() {
return node.map(Node::key);
}
+
+ protected Result errorResult(Query query, ErrorMessage errorMessage) {
+ Result error = new Result(query, errorMessage);
+ Coverage errorCoverage = new Coverage(0, 0, 0);
+ errorCoverage.setNodesTried(1);
+ error.setCoverage(errorCoverage);
+ return error;
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 431b36c2623..019e07221a6 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -1,5 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
+package com.yahoo.search.dispatch.rpc;
import com.yahoo.compress.CompressionType;
import com.yahoo.prelude.fastsearch.FastHit;
@@ -18,6 +18,10 @@ interface Client {
int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
double timeoutSeconds);
+ void search(NodeConnection node, CompressionType compression,
+ int uncompressedLength, byte[] compressedPayload, RpcSearchInvoker responseReceiver,
+ double timeoutSeconds);
+
/** Creates a connection to a particular node in this */
NodeConnection createConnection(String hostname, int port);
@@ -87,4 +91,54 @@ interface Client {
}
+ class SearchResponseOrError {
+ // One of these will be non empty and the other not
+ private Optional<SearchResponse> response;
+ private Optional<String> error;
+
+ public static SearchResponseOrError fromResponse(SearchResponse response) {
+ return new SearchResponseOrError(Optional.of(response), Optional.empty());
+ }
+
+ public static SearchResponseOrError fromError(String error) {
+ return new SearchResponseOrError(Optional.empty(), Optional.of(error));
+ }
+
+ private SearchResponseOrError(Optional<SearchResponse> response, Optional<String> error) {
+ this.response = response;
+ this.error = error;
+ }
+
+ /** Returns the response, or empty if there is an error */
+ public Optional<SearchResponse> response() { return response; }
+
+ /** Returns the error or empty if there is a response */
+ public Optional<String> error() { return error; }
+
+ }
+
+ class SearchResponse {
+ private final byte compression;
+ private final int uncompressedSize;
+ private final byte[] compressedPayload;
+
+ public SearchResponse(byte compression, int uncompressedSize, byte[] compressedPayload) {
+ this.compression = compression;
+ this.uncompressedSize = uncompressedSize;
+ this.compressedPayload = compressedPayload;
+ }
+
+ public byte compression() {
+ return compression;
+ }
+
+ public int uncompressedSize() {
+ return uncompressedSize;
+ }
+
+ public byte[] compressedPayload() {
+ return compressedPayload;
+ }
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java
new file mode 100644
index 00000000000..817ecfe0091
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java
@@ -0,0 +1,73 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.StringProperty;
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.TensorProperty;
+import com.google.protobuf.ByteString;
+import com.yahoo.tensor.Tensor;
+import com.yahoo.tensor.serialization.TypedBinaryFormat;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author ollivir
+ */
+public class MapConverter {
+ @FunctionalInterface
+ public interface PropertyInserter<T> {
+ void add(T prop);
+ }
+
+ public static void convertMapTensors(Map<String, Object> map, PropertyInserter<TensorProperty.Builder> inserter) {
+ for (var entry : map.entrySet()) {
+ var value = entry.getValue();
+ if (value instanceof Tensor) {
+ byte[] tensor = TypedBinaryFormat.encode((Tensor) value);
+ inserter.add(TensorProperty.newBuilder().setName(entry.getKey()).setValue(ByteString.copyFrom(tensor)));
+ }
+ }
+ }
+
+ public static void convertMapStrings(Map<String, Object> map, PropertyInserter<StringProperty.Builder> inserter) {
+ for (var entry : map.entrySet()) {
+ var value = entry.getValue();
+ if (!(value instanceof Tensor)) {
+ inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addValues(value.toString()));
+ }
+ }
+ }
+
+ public static void convertStringMultiMap(Map<String, List<String>> map, PropertyInserter<StringProperty.Builder> inserter) {
+ for (var entry : map.entrySet()) {
+ var values = entry.getValue();
+ if (values != null) {
+ inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addAllValues(values));
+ }
+ }
+ }
+
+ public static void convertMultiMap(Map<String, List<Object>> map, PropertyInserter<StringProperty.Builder> stringInserter,
+ PropertyInserter<TensorProperty.Builder> tensorInserter) {
+ for (var entry : map.entrySet()) {
+ if (entry.getValue() != null) {
+ var key = entry.getKey();
+ var stringValues = new LinkedList<String>();
+ for (var value : entry.getValue()) {
+ if (value != null) {
+ if (value instanceof Tensor) {
+ byte[] tensor = TypedBinaryFormat.encode((Tensor) value);
+ tensorInserter.add(TensorProperty.newBuilder().setName(key).setValue(ByteString.copyFrom(tensor)));
+ } else {
+ stringValues.add(value.toString());
+ }
+ }
+ }
+ if (!stringValues.isEmpty()) {
+ stringInserter.add(StringProperty.newBuilder().setName(key).addAllValues(stringValues));
+ }
+ }
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
new file mode 100644
index 00000000000..0a00162143e
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
@@ -0,0 +1,223 @@
+package com.yahoo.search.dispatch.rpc;
+
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.SearchRequest.Builder;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.yahoo.document.GlobalId;
+import com.yahoo.fs4.GetDocSumsPacket;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.prelude.fastsearch.DocumentDatabase;
+import com.yahoo.prelude.fastsearch.FastHit;
+import com.yahoo.prelude.fastsearch.GroupingListHit;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.grouping.vespa.GroupingExecutor;
+import com.yahoo.search.query.Model;
+import com.yahoo.search.query.Presentation;
+import com.yahoo.search.query.Ranking;
+import com.yahoo.search.query.Sorting;
+import com.yahoo.search.query.Sorting.Order;
+import com.yahoo.search.query.ranking.RankFeatures;
+import com.yahoo.search.query.ranking.RankProperties;
+import com.yahoo.search.result.Coverage;
+import com.yahoo.search.result.Relevance;
+import com.yahoo.searchlib.aggregation.Grouping;
+import com.yahoo.vespa.objects.BufferSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProtobufSerialization {
+ private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024;
+
+ public static byte[] serializeQuery(Query query, String serverId, boolean includeQueryData) {
+ return convertFromQuery(query, serverId, includeQueryData).toByteArray();
+ }
+
+ public static byte[] serializeResult(Result searchResult) {
+ return convertFromResult(searchResult).toByteArray();
+ }
+
+ public static Result deserializeToResult(byte[] payload, Query query, VespaBackEndSearcher searcher)
+ throws InvalidProtocolBufferException {
+ var protobuf = SearchProtocol.SearchReply.parseFrom(payload);
+ var result = convertToResult(query, protobuf, searcher.getDocumentDatabase(query));
+ return result;
+ }
+
+ private static SearchProtocol.SearchRequest convertFromQuery(Query query, String serverId, boolean includeQueryData) {
+ var builder = SearchProtocol.SearchRequest.newBuilder().setHits(query.getHits()).setOffset(query.getOffset())
+ .setTimeout((int) query.getTimeLeft());
+
+ mergeToRequestFromRanking(query.getRanking(), builder, includeQueryData);
+ mergeToRequestFromModel(query.getModel(), builder);
+
+ if (query.getGroupingSessionCache() || query.getRanking().getQueryCache()) {
+ // TODO verify that the session key is included whenever rank properties would have been
+ builder.setSessionKey(query.getSessionId(serverId).toString());
+ }
+ if (query.properties().getBoolean(Model.ESTIMATE)) {
+ builder.setHits(0);
+ }
+ if (GroupingExecutor.hasGroupingList(query)) {
+ List<Grouping> groupingList = GroupingExecutor.getGroupingList(query);
+ BufferSerializer gbuf = new BufferSerializer(new GrowableByteBuffer());
+ gbuf.putInt(null, groupingList.size());
+ for (Grouping g : groupingList) {
+ g.serialize(gbuf);
+ }
+ gbuf.getBuf().flip();
+ builder.setGroupingBlob(ByteString.copyFrom(gbuf.getBuf().getByteBuffer()));
+ }
+
+ mergeToRequestFromPresentation(query.getPresentation(), builder, includeQueryData);
+ if (query.getGroupingSessionCache()) {
+ builder.setCacheGrouping(true);
+ }
+
+ return builder.build();
+ }
+
+ private static void mergeToRequestFromModel(Model model, SearchProtocol.SearchRequest.Builder builder) {
+ if (model.getDocumentDb() != null) {
+ builder.setDocumentType(model.getDocumentDb());
+ }
+ int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE;
+ boolean success = false;
+ while (!success) {
+ try {
+ ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize);
+ model.getQueryTree().encode(treeBuffer);
+ treeBuffer.flip();
+ builder.setQueryTreeBlob(ByteString.copyFrom(treeBuffer));
+ success = true;
+ } catch (java.nio.BufferOverflowException e) {
+ bufferSize *= 2;
+ }
+ }
+ }
+
+ private static void mergeToRequestFromPresentation(Presentation presentation, SearchProtocol.SearchRequest.Builder builder,
+ boolean includeQueryData) {
+ if (includeQueryData && presentation.getHighlight() != null) {
+ MapConverter.convertStringMultiMap(presentation.getHighlight().getHighlightTerms(), builder::addHighlightTerms);
+ }
+ }
+
+ private static void mergeToRequestFromSorting(Sorting sorting, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) {
+ for (var field : sorting.fieldOrders()) {
+ var sortField = SearchProtocol.SortField.newBuilder().setField(field.getSorter().getName())
+ .setAscending(field.getSortOrder() == Order.ASCENDING).build();
+ builder.addSorting(sortField);
+ }
+ }
+
+ private static void mergeToRequestFromRanking(Ranking ranking, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) {
+ builder.setRankProfile(ranking.getProfile());
+ if (ranking.getQueryCache()) {
+ builder.setCacheQuery(true);
+ }
+ if (ranking.getSorting() != null) {
+ mergeToRequestFromSorting(ranking.getSorting(), builder, includeQueryData);
+ }
+ if (ranking.getLocation() != null) {
+ builder.setGeoLocation(ranking.getLocation().toString());
+ }
+ mergeToRequestFromRankFeatures(ranking.getFeatures(), builder, includeQueryData);
+ mergeToRequestFromRankProperties(ranking.getProperties(), builder, includeQueryData);
+ }
+
+ private static void mergeToRequestFromRankFeatures(RankFeatures features, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) {
+ if (includeQueryData) {
+ MapConverter.convertMapStrings(features.asMap(), builder::addFeatureOverrides);
+ MapConverter.convertMapTensors(features.asMap(), builder::addTensorFeatureOverrides);
+ }
+ }
+
+ private static void mergeToRequestFromRankProperties(RankProperties properties, Builder builder, boolean includeQueryData) {
+ if (includeQueryData) {
+ MapConverter.convertMultiMap(properties.asMap(), propB -> {
+ if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) {
+ builder.addRankProperties(propB);
+ }
+ }, builder::addTensorRankProperties);
+ }
+ }
+
+ private static Result convertToResult(Query query, SearchProtocol.SearchReply protobuf, DocumentDatabase documentDatabase) {
+ var result = new Result(query);
+
+ result.setTotalHitCount(protobuf.getTotalHitCount());
+ result.setCoverage(convertToCoverage(protobuf));
+
+ if (protobuf.getGroupingBlob() != null && !protobuf.getGroupingBlob().isEmpty()) {
+ ArrayList<Grouping> list = new ArrayList<>();
+ BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(protobuf.getGroupingBlob().asReadOnlyByteBuffer()));
+ int cnt = buf.getInt(null);
+ for (int i = 0; i < cnt; i++) {
+ Grouping g = new Grouping();
+ g.deserialize(buf);
+ list.add(g);
+ }
+ GroupingListHit hit = new GroupingListHit(list, documentDatabase.getDocsumDefinitionSet());
+ hit.setQuery(query);
+ result.hits().add(hit);
+ }
+
+ for (var replyHit : protobuf.getHitsList()) {
+ FastHit hit = new FastHit();
+ hit.setQuery(query);
+
+ hit.setRelevance(new Relevance(replyHit.getRelevance()));
+ hit.setGlobalId(new GlobalId(replyHit.getGlobalId().toByteArray()));
+
+ hit.setFillable();
+ hit.setCached(false);
+
+ result.hits().add(hit);
+ }
+
+ return result;
+ }
+
+ private static Coverage convertToCoverage(SearchProtocol.SearchReply protobuf) {
+ var coverage = new Coverage(protobuf.getCoverageDocs(), protobuf.getActiveDocs(), 1);
+ coverage.setNodesTried(1).setSoonActive(protobuf.getSoonActiveDocs());
+
+ int degradedReason = 0;
+ if (protobuf.getDegradedByMatchPhase())
+ degradedReason |= Coverage.DEGRADED_BY_MATCH_PHASE;
+ if (protobuf.getDegradedBySoftTimeout())
+ degradedReason |= Coverage.DEGRADED_BY_TIMEOUT;
+ coverage.setDegradedReason(degradedReason);
+
+ return coverage;
+ }
+
+ private static SearchProtocol.SearchReply convertFromResult(Result result) {
+ var builder = SearchProtocol.SearchReply.newBuilder();
+
+ var coverage = result.getCoverage(false);
+ if (coverage != null) {
+ builder.setCoverageDocs(coverage.getDocs()).setActiveDocs(coverage.getActive()).setSoonActiveDocs(coverage.getSoonActive())
+ .setDegradedBySoftTimeout(coverage.isDegradedByTimeout()).setDegradedByMatchPhase(coverage.isDegradedByMatchPhase());
+ }
+
+ result.hits().iterator().forEachRemaining(hit -> {
+ var hitBuilder = SearchProtocol.Hit.newBuilder();
+ if (hit.getRelevance() != null) {
+ hitBuilder.setRelevance(hit.getRelevance().getScore());
+ }
+ if (hit instanceof FastHit) {
+ FastHit fhit = (FastHit) hit;
+ hitBuilder.setGlobalId(ByteString.copyFrom(fhit.getGlobalId().getRawId()));
+ }
+ builder.addHits(hitBuilder);
+ });
+ return builder.build();
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 2a4767bc389..32a7917d43c 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -1,5 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
+package com.yahoo.search.dispatch.rpc;
import com.yahoo.compress.CompressionType;
import com.yahoo.jrt.DataValue;
@@ -40,7 +40,19 @@ class RpcClient implements Client {
request.setContext(hits);
RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcResponseWaiter(rpcNode, responseReceiver));
+ rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver));
+ }
+
+ @Override
+ public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ RpcSearchInvoker responseReceiver, double timeoutSeconds) {
+ Request request = new Request("vespa.searchprotocol.search");
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedPayload));
+
+ RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
+ rpcNode.invokeAsync(request, timeoutSeconds, new RpcSearchResponseWaiter(rpcNode, responseReceiver));
}
private static class RpcNodeConnection implements NodeConnection {
@@ -83,7 +95,7 @@ class RpcClient implements Client {
}
- private static class RpcResponseWaiter implements RequestWaiter {
+ private static class RpcDocsumResponseWaiter implements RequestWaiter {
/** The node to which we made the request we are waiting for - for error messages only */
private final RpcNodeConnection node;
@@ -91,7 +103,7 @@ class RpcClient implements Client {
/** The handler to which the response is forwarded */
private final RpcFillInvoker.GetDocsumsResponseReceiver handler;
- public RpcResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) {
+ public RpcDocsumResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) {
this.node = node;
this.handler = handler;
}
@@ -124,4 +136,39 @@ class RpcClient implements Client {
}
+ private static class RpcSearchResponseWaiter implements RequestWaiter {
+
+ /** The node to which we made the request we are waiting for - for error messages only */
+ private final RpcNodeConnection node;
+
+ /** The handler to which the response is forwarded */
+ private final RpcSearchInvoker handler;
+
+ public RpcSearchResponseWaiter(RpcNodeConnection node, RpcSearchInvoker handler) {
+ this.node = node;
+ this.handler = handler;
+ }
+
+ @Override
+ public void handleRequestDone(Request requestWithResponse) {
+ if (requestWithResponse.isError()) {
+ handler.receive(SearchResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage()));
+ return;
+ }
+
+ Values returnValues = requestWithResponse.returnValues();
+ if (returnValues.size() < 3) {
+ handler.receive(SearchResponseOrError.fromError(
+ "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size()));
+ return;
+ }
+
+ byte compression = returnValues.get(0).asInt8();
+ int uncompressedSize = returnValues.get(1).asInt32();
+ byte[] compressedPayload = returnValues.get(2).asData();
+ handler.receive(SearchResponseOrError.fromResponse(new SearchResponse(compression, uncompressedSize, compressedPayload)));
+ }
+
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
index 578c447dfbe..b7286997514 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
@@ -1,5 +1,5 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
+package com.yahoo.search.dispatch.rpc;
import com.yahoo.collections.ListMap;
import com.yahoo.compress.CompressionType;
@@ -12,6 +12,7 @@ import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.query.SessionId;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
new file mode 100644
index 00000000000..c8019278710
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -0,0 +1,98 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.prelude.fastsearch.DocumentDatabase;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.dispatch.InterleavedSearchInvoker;
+import com.yahoo.search.dispatch.InvokerFactory;
+import com.yahoo.search.dispatch.SearchInvoker;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+
+/**
+ * @author ollivir
+ */
+public class RpcInvokerFactory extends InvokerFactory {
+ /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */
+ private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries");
+
+ private final RpcResourcePool rpcResourcePool;
+ private final SearchCluster searchCluster;
+
+ public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) {
+ this.rpcResourcePool = rpcResourcePool;
+ this.searchCluster = searchCluster;
+ }
+
+ @Override
+ public Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, List<Node> nodes,
+ boolean acceptIncompleteCoverage) {
+ List<SearchInvoker> invokers = new ArrayList<>(nodes.size());
+ Set<Integer> failed = null;
+ for (Node node : nodes) {
+ if (node.isWorking()) {
+ invokers.add(new RpcSearchInvoker(searcher, node, rpcResourcePool));
+ } else {
+ if (failed == null) {
+ failed = new HashSet<>();
+ }
+ failed.add(node.key());
+ }
+ }
+
+ if (failed != null) {
+ List<Node> success = new ArrayList<>(nodes.size() - failed.size());
+ for (Node node : nodes) {
+ if (!failed.contains(node.key())) {
+ success.add(node);
+ }
+ }
+ if (!searchCluster.isPartialGroupCoverageSufficient(groupId, success)) {
+ if (acceptIncompleteCoverage) {
+ invokers.add(createCoverageErrorInvoker(nodes, failed));
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+
+ if (invokers.size() == 1) {
+ return Optional.of(invokers.get(0));
+ } else {
+ return Optional.of(new InterleavedSearchInvoker(invokers, searcher, searchCluster));
+ }
+ }
+
+ @Override
+ public Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result) {
+ Query query = result.getQuery();
+ if (query.properties().getBoolean(dispatchSummaries, true)
+ && ! searcher.summaryNeedsQuery(query)
+ && query.getRanking().getLocation() == null)
+ {
+ return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query)));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ // for testing
+ public FillInvoker createFillInvoker(DocumentDatabase documentDb) {
+ return new RpcFillInvoker(rpcResourcePool, documentDb);
+ }
+
+ public void release() {
+ rpcResourcePool.release();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 29641080ba6..830ba45ef0f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -1,16 +1,13 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
+package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
import com.yahoo.compress.Compressor;
-import com.yahoo.prelude.fastsearch.DocumentDatabase;
-import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
-import com.yahoo.search.Query;
+import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Map;
-import java.util.Optional;
/**
* RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains
@@ -22,9 +19,6 @@ public class RpcResourcePool {
/** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
- /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */
- private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries");
-
private final Compressor compressor = new Compressor();
private final Client client;
@@ -47,22 +41,6 @@ public class RpcResourcePool {
this.nodeConnections = nodeConnectionsBuilder.build();
}
- public Optional<FillInvoker> getFillInvoker(Query query, VespaBackEndSearcher searcher, DocumentDatabase documentDb) {
- if (query.properties().getBoolean(dispatchSummaries, true)
- && ! searcher.summaryNeedsQuery(query)
- && query.getRanking().getLocation() == null)
- {
- return Optional.of(new RpcFillInvoker(this, documentDb));
- } else {
- return Optional.empty();
- }
- }
-
- // for testing
- public FillInvoker getFillInvoker(DocumentDatabase documentDb) {
- return new RpcFillInvoker(this, documentDb);
- }
-
public Compressor compressor() {
return compressor;
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
new file mode 100644
index 00000000000..88d77c760e3
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -0,0 +1,118 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.prelude.fastsearch.FastHit;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.SearchInvoker;
+import com.yahoo.search.dispatch.rpc.Client.SearchResponse;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.searchchain.Execution;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SearchInvoker} implementation using RPC
+ *
+ * @author ollivir
+ */
+public class RpcSearchInvoker extends SearchInvoker {
+ private final VespaBackEndSearcher searcher;
+ private final Node node;
+ private final RpcResourcePool resourcePool;
+ private final BlockingQueue<Client.SearchResponseOrError> responses;
+
+ private Query query;
+
+ RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) {
+ super(Optional.of(node));
+ this.searcher = searcher;
+ this.node = node;
+ this.resourcePool = resourcePool;
+ this.responses = new LinkedBlockingQueue<>(1);
+ }
+
+ @Override
+ protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
+ this.query = query;
+
+ CompressionType compression = CompressionType
+ .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
+
+ Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key());
+ if (nodeConnection == null) {
+ responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key()));
+ responseAvailable();
+ return;
+ }
+
+ var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true);
+ double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
+ Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
+ resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this,
+ timeoutSeconds);
+ }
+
+ @Override
+ protected Result getSearchResult(Execution execution) throws IOException {
+ long timeLeftMs = query.getTimeLeft();
+ if (timeLeftMs <= 0) {
+ return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
+ }
+ Client.SearchResponseOrError response = null;
+ try {
+ response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // handled as timeout
+ }
+ if (response == null) {
+ return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
+ }
+ if (response.error().isPresent()) {
+ return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get()));
+ }
+ if (response.response().isEmpty()) {
+ return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available"));
+ }
+
+ SearchResponse searchResponse = response.response().get();
+ CompressionType compression = CompressionType.valueOf(searchResponse.compression());
+ byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression,
+ searchResponse.uncompressedSize());
+ var result = ProtobufSerialization.deserializeToResult(payload, query, searcher);
+ result.hits().unorderedIterator().forEachRemaining(hit -> {
+ if(hit instanceof FastHit) {
+ FastHit fhit = (FastHit) hit;
+ fhit.setPartId(node.pathIndex());
+ fhit.setDistributionKey(node.key());
+ }
+ hit.setSource(getName());
+ });
+
+ return result;
+ }
+
+ @Override
+ protected void release() {
+ // nothing to release
+ }
+
+ public void receive(Client.SearchResponseOrError response) {
+ responses.add(response);
+ responseAvailable();
+ }
+
+ private String getName() {
+ return searcher.getName();
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index cb86f19e761..bafc72b9b43 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -25,6 +25,12 @@ public class Group {
public Group(int id, List<Node> nodes) {
this.id = id;
this.nodes = ImmutableList.copyOf(nodes);
+
+ int idx = 0;
+ for(var node: nodes) {
+ node.setPathIndex(idx);
+ idx++;
+ }
}
/** Returns the unique identity of this group */
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
index 98deb9e3199..7e0e3117628 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
@@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class Node {
private final int key;
+ private int pathIndex;
private final String hostname;
private final int fs4port;
final int group;
@@ -31,6 +32,12 @@ public class Node {
/** Returns the unique and stable distribution key of this node */
public int key() { return key; }
+ public int pathIndex() { return pathIndex; }
+
+ void setPathIndex(int index) {
+ pathIndex = index;
+ }
+
public String hostname() { return hostname; }
public int fs4port() { return fs4port; }