summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java52
1 files changed, 34 insertions, 18 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 53b46ee1344..1b775d2c46f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -9,7 +9,6 @@ import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.container.protect.Error;
-import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.slime.ArrayTraverser;
import com.yahoo.data.access.slime.SlimeAdapter;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
@@ -23,9 +22,12 @@ import com.yahoo.search.result.Hit;
import com.yahoo.data.access.Inspector;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.Cursor;
+import com.yahoo.slime.JsonFormat;
import com.yahoo.slime.Slime;
import com.yahoo.vespa.config.search.DispatchConfig;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -36,11 +38,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * A dispatcher communicates with search nodes to perform queries and fill hits.
- *
- * This is currently not functionally complete: Queries can only be dispatched to a single node,
- * and summaries can only be requested when they do not need the query.
- *
+ * A dispatcher communicates with search nodes to (in the future) perform queries and (now) fill hits.
* This class is multithread safe.
*
* @author bratseth
@@ -84,7 +82,7 @@ public class Dispatcher extends AbstractComponent {
public SearchCluster searchCluster() { return searchCluster; }
/** Fills the given summary class by sending RPC requests to the right search nodes */
- public void fill(Result result, String summaryClass, DocumentDatabase documentDb, CompressionType compression) {
+ public void fill(Result result, String summaryClass, CompressionType compression) {
try {
ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
@@ -92,7 +90,7 @@ public class Dispatcher extends AbstractComponent {
for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver);
}
- responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb);
+ responseReceiver.processResponses(result.getQuery(), summaryClass);
}
catch (TimeoutException e) {
result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
@@ -194,7 +192,7 @@ public class Dispatcher extends AbstractComponent {
* Call this from the dispatcher thread to initiate and complete processing of responses.
* This will block until all responses are available and processed, or to timeout.
*/
- public void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException {
+ public void processResponses(Query query, String summaryClass) throws TimeoutException {
try {
int skippedHits = 0;
while (outstandingResponses > 0) {
@@ -205,12 +203,11 @@ public class Dispatcher extends AbstractComponent {
Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
if (response == null)
throwTimeout();
- skippedHits += processResponse(response, summaryClass, documentDb);
+ skippedHits += processResponse(response);
outstandingResponses--;
}
if (skippedHits != 0) {
- result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " +
- summaryClass + " for " + skippedHits + " hits"));
+ result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + summaryClass + " for " + skippedHits + " hits"));
}
}
catch (InterruptedException e) {
@@ -218,9 +215,7 @@ public class Dispatcher extends AbstractComponent {
}
}
- private int processResponse(Client.GetDocsumsResponseOrError responseOrError,
- String summaryClass,
- DocumentDatabase documentDb) {
+ private int processResponse(Client.GetDocsumsResponseOrError responseOrError) {
if (responseOrError.error().isPresent()) {
if (hasReportedError) return 0;
String error = responseOrError.error().get();
@@ -231,7 +226,7 @@ public class Dispatcher extends AbstractComponent {
Client.GetDocsumsResponse response = responseOrError.response().get();
CompressionType compression = CompressionType.valueOf(response.compression());
byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize());
- return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes);
+ return fill(response.hitsContext(), slimeBytes);
}
return 0;
}
@@ -246,7 +241,7 @@ public class Dispatcher extends AbstractComponent {
});
}
- private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) {
+ private int fill(List<FastHit> hits, byte[] slimeBytes) {
com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get();
com.yahoo.slime.Inspector errors = root.field("errors");
boolean hasErrors = errors.valid() && (errors.entries() > 0);
@@ -261,7 +256,7 @@ public class Dispatcher extends AbstractComponent {
for (int i = 0; i < hits.size(); i++) {
Inspector summary = summaries.entry(i).field("docsum");
if (summary.fieldCount() != 0) {
- hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsumDefinition(summaryClass), summary);
+ fill(hits.get(i), summary);
} else {
skippedHits++;
}
@@ -269,6 +264,27 @@ public class Dispatcher extends AbstractComponent {
return skippedHits;
}
+ private void fill(FastHit hit, Inspector summary) {
+ hit.reserve(summary.fieldCount());
+ summary.traverse((String name, Inspector value) -> {
+ hit.setField(name, nativeTypeOf(value));
+ });
+ }
+
+ private Object nativeTypeOf(Inspector inspector) {
+ switch (inspector.type()) {
+ case ARRAY: return inspector;
+ case OBJECT: return inspector;
+ case BOOL: return inspector.asBool();
+ case DATA: return inspector.asData();
+ case DOUBLE: return inspector.asDouble();
+ case LONG: return inspector.asLong();
+ case STRING: return inspector.asString(); // TODO: Keep as utf8
+ case EMPTY : return null;
+ default: throw new IllegalArgumentException("Unexpected Slime type " + inspector.type());
+ }
+ }
+
}
}