aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
blob: bb0bbf4b529ed571b7d7d2f3b83fdb60807f707a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;

import com.yahoo.compress.CompressionType;
import com.yahoo.jrt.DataValue;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Int8Value;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.RequestWaiter;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.Values;
import com.yahoo.prelude.fastsearch.FastHit;

import java.util.List;

/**
 * A client which uses rpc request to search nodes to implement the Client API.
 *
 * @author bratseth
 */
class RpcClient implements Client {

    private final Supervisor supervisor;

    public RpcClient(String name, int transportThreads) {
        supervisor = new Supervisor(new Transport(name, transportThreads));
    }

    @Override
    public void close() {
        supervisor.transport().shutdown().join();
    }

    @Override
    public NodeConnection createConnection(String hostname, int port) {
        return new RpcNodeConnection(hostname, port, supervisor);
    }

    private static class RpcNodeConnection implements NodeConnection {

        // Information about the connected node
        private final Supervisor supervisor;
        private final String hostname;
        private final int port;
        private final String description;

        // The current shared connection. This will be recycled when it becomes invalid.
        // All access to this must be synchronized
        private Target target;

        public RpcNodeConnection(String hostname, int port, Supervisor supervisor) {
            this.supervisor = supervisor;
            this.hostname = hostname;
            this.port = port;
            description = "rpc node connection to " + hostname + ":" + port;
            target = supervisor.connect(new Spec(hostname, port));
        }

        @Override
        public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength,
                               byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
            Request request = new Request("proton.getDocsums");
            request.parameters().add(new Int8Value(compression.getCode()));
            request.parameters().add(new Int32Value(uncompressedLength));
            request.parameters().add(new DataValue(compressedSlime));

            request.setContext(hits);
            invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver));
        }

        @Override
        public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
                            ResponseReceiver responseReceiver, double timeoutSeconds) {
            Request request = new Request(rpcMethod);
            request.parameters().add(new Int8Value(compression.getCode()));
            request.parameters().add(new Int32Value(uncompressedLength));
            request.parameters().add(new DataValue(compressedPayload));

            invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(this, responseReceiver));
        }

        private void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
            // TODO: Consider replacing this by a watcher on the target
            synchronized(this) { // ensure we have exactly 1 valid connection across threads
                if (! target.isValid()) {
                    target = supervisor.connect(new Spec(hostname, port));
                }
            }
            target.invokeAsync(req, timeout, waiter);
        }

        @Override
        public void close() {
            target.close();
        }

        @Override
        public String toString() {
            return description;
        }

    }

    private static class RpcDocsumResponseWaiter implements RequestWaiter {

        /** The node to which we made the request we are waiting for - for error messages only */
        private final RpcNodeConnection node;

        /** The handler to which the response is forwarded */
        private final RpcFillInvoker.GetDocsumsResponseReceiver handler;

        public RpcDocsumResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) {
            this.node = node;
            this.handler = handler;
        }

        @Override
        public void handleRequestDone(Request requestWithResponse) {
            if (requestWithResponse.isError()) {
                handler.receive(ResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage()));
                return;
            }

            Values returnValues = requestWithResponse.returnValues();
            if (returnValues.size() < 3) {
                handler.receive(ResponseOrError.fromError(
                        "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size()));
                return;
            }

            byte compression = returnValues.get(0).asInt8();
            int uncompressedSize = returnValues.get(1).asInt32();
            byte[] compressedSlimeBytes = returnValues.get(2).asData();
            @SuppressWarnings("unchecked") // TODO: Non-protobuf rpc docsums to be removed soon
            List<FastHit> hits = (List<FastHit>) requestWithResponse.getContext();
            handler.receive(
                    ResponseOrError.fromResponse(new GetDocsumsResponse(compression, uncompressedSize, compressedSlimeBytes, hits)));
        }

    }

    private static class RpcProtobufResponseWaiter implements RequestWaiter {

        /** The node to which we made the request we are waiting for - for error messages only */
        private final RpcNodeConnection node;

        /** The handler to which the response is forwarded */
        private final ResponseReceiver handler;

        public RpcProtobufResponseWaiter(RpcNodeConnection node, ResponseReceiver handler) {
            this.node = node;
            this.handler = handler;
        }

        @Override
        public void handleRequestDone(Request requestWithResponse) {
            if (requestWithResponse.isError()) {
                handler.receive(ResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage()));
                return;
            }

            Values returnValues = requestWithResponse.returnValues();
            if (returnValues.size() < 3) {
                handler.receive(ResponseOrError.fromError(
                        "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size()));
                return;
            }

            byte compression = returnValues.get(0).asInt8();
            int uncompressedSize = returnValues.get(1).asInt32();
            byte[] compressedPayload = returnValues.get(2).asData();
            handler.receive(ResponseOrError.fromResponse(new ProtobufResponse(compression, uncompressedSize, compressedPayload)));
        }

    }

}