1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.InvokerResult;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.searchchain.Execution;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* {@link SearchInvoker} implementation using RPC
*
* @author ollivir
*/
public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseReceiver {
private static final String RPC_METHOD = "vespa.searchprotocol.search";
private final VespaBackEndSearcher searcher;
private final Node node;
private final RpcResourcePool resourcePool;
private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses;
private Query query;
RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) {
super(Optional.of(node));
this.searcher = searcher;
this.node = node;
this.resourcePool = resourcePool;
this.responses = new LinkedBlockingQueue<>(1);
}
@Override
protected void sendSearchRequest(Query query) throws IOException {
this.query = query;
Client.NodeConnection nodeConnection = resourcePool.getConnection(node.key());
if (nodeConnection == null) {
responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key()));
responseAvailable();
return;
}
query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key());
var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId());
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compress(query, payload);
nodeConnection.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds);
}
@Override
protected InvokerResult getSearchResult(Execution execution) throws IOException {
long timeLeftMs = query.getTimeLeft();
if (timeLeftMs <= 0) {
return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
}
Client.ResponseOrError<ProtobufResponse> response = null;
try {
response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// handled as timeout
}
if (response == null) {
return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
}
if (response.error().isPresent()) {
return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get()));
}
if (response.response().isEmpty()) {
return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available"));
}
ProtobufResponse protobufResponse = response.response().get();
CompressionType compression = CompressionType.valueOf(protobufResponse.compression());
byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize());
var result = ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key());
return result;
}
@Override
protected void release() {
// nothing to release
}
public void receive(Client.ResponseOrError<ProtobufResponse> response) {
responses.add(response);
responseAvailable();
}
private String getName() {
return searcher.getName();
}
}
|