aboutsummaryrefslogtreecommitdiffstats
path: root/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
blob: 78a8cda7c34cba78c3454e69619ab96f1ce6ac77 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.rpc;

import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.component.Version;
import com.yahoo.component.annotation.Inject;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.config.FileReference;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.TenantName;
import com.yahoo.jrt.Acceptor;
import com.yahoo.jrt.DataValue;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Int64Value;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.security.tls.Capability;
import com.yahoo.vespa.config.ErrorCode;
import com.yahoo.vespa.config.JRTMethods;
import com.yahoo.vespa.config.protocol.ConfigResponse;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
import com.yahoo.vespa.config.protocol.Trace;
import com.yahoo.vespa.config.server.ConfigActivationListener;
import com.yahoo.vespa.config.server.GetConfigContext;
import com.yahoo.vespa.config.server.RequestHandler;
import com.yahoo.vespa.config.server.SuperModelRequestHandler;
import com.yahoo.vespa.config.server.application.ApplicationVersions;
import com.yahoo.vespa.config.server.filedistribution.FileServer;
import com.yahoo.vespa.config.server.host.HostRegistry;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory;
import com.yahoo.vespa.config.server.rpc.security.RpcAuthorizer;
import com.yahoo.vespa.config.server.tenant.Tenant;
import com.yahoo.vespa.config.server.tenant.TenantListener;
import com.yahoo.vespa.config.server.tenant.TenantRepository;
import com.yahoo.vespa.filedistribution.FileDownloader;
import com.yahoo.vespa.filedistribution.FileReceiver;
import com.yahoo.vespa.filedistribution.FileReferenceData;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;

/**
 * An RPC server class that handles the config protocol RPC method "getConfigV3".
 * Mandatory hooks need to be implemented by subclasses.
 *
 * @author hmusum
 */
// TODO: Split business logic out of this
public class RpcServer implements Runnable, ConfigActivationListener, TenantListener {

    private static final int TRACELEVEL = 6;
    static final int TRACELEVEL_DEBUG = 9;
    private static final String THREADPOOL_NAME = "rpcserver worker pool";
    private static final long SHUTDOWN_TIMEOUT = 60;
    private static final int JRT_RPC_TRANSPORT_THREADS = threadsToUse();

    private final Supervisor supervisor = new Supervisor(new Transport("rpc", JRT_RPC_TRANSPORT_THREADS));
    private final Spec spec;
    private final boolean useRequestVersion;
    private final boolean hostedVespa;
    private final boolean canReturnEmptySentinelConfig;

    private static final Logger log = Logger.getLogger(RpcServer.class.getName());

    private final DelayedConfigResponses delayedConfigResponses;

    private final HostRegistry hostRegistry;
    private final Map<TenantName, Tenant> tenants = new ConcurrentHashMap<>();
    private final Map<ApplicationId, ApplicationState> applicationStateMap = new ConcurrentHashMap<>();
    private final SuperModelRequestHandler superModelRequestHandler;
    private final MetricUpdater metrics;
    private final MetricUpdaterFactory metricUpdaterFactory;
    private final FileServer fileServer;
    private final RpcAuthorizer rpcAuthorizer;

    private final ThreadPoolExecutor executorService;
    private final FileDownloader downloader;
    private volatile boolean allTenantsLoaded = false;
    private boolean isRunning = false;
    boolean isServingConfigRequests = false;

    static class ApplicationState {
        private final AtomicLong activeGeneration = new AtomicLong(0);
        ApplicationState(long generation) {
            activeGeneration.set(generation);
        }
        long getActiveGeneration() { return activeGeneration.get(); }
        void setActiveGeneration(long generation) { activeGeneration.set(generation); }
    }
    /**
     * Creates an RpcServer listening on the specified <code>port</code>.
     *
     * @param config The config to use for setting up this server
     */
    @Inject
    public RpcServer(ConfigserverConfig config, SuperModelRequestHandler superModelRequestHandler,
                     MetricUpdaterFactory metrics, HostRegistry hostRegistry,
                     FileServer fileServer, RpcAuthorizer rpcAuthorizer,
                     RpcRequestHandlerProvider handlerProvider) {
        this.superModelRequestHandler = superModelRequestHandler;
        metricUpdaterFactory = metrics;
        supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize());
        this.metrics = metrics.getOrCreateMetricUpdater(Collections.emptyMap());
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(config.maxgetconfigclients());
        int rpcWorkerThreads = (config.numRpcThreads() == 0) ? threadsToUse() : config.numRpcThreads();
        executorService = new ThreadPoolExecutor(rpcWorkerThreads, rpcWorkerThreads,
                0, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getDaemonThreadFactory(THREADPOOL_NAME));
        delayedConfigResponses = new DelayedConfigResponses(this, config.numDelayedResponseThreads());
        spec = new Spec(null, config.rpcport());
        this.hostRegistry = hostRegistry;
        this.useRequestVersion = config.useVespaVersionInRequest();
        this.hostedVespa = config.hostedVespa();
        this.canReturnEmptySentinelConfig = config.canReturnEmptySentinelConfig();
        this.fileServer = fileServer;
        this.rpcAuthorizer = rpcAuthorizer;
        downloader = fileServer.downloader();
        handlerProvider.setInstance(this);
        setUpFileDistributionHandlers();
    }

    private static int threadsToUse() {
        return Math.max(8, Runtime.getRuntime().availableProcessors()/2);
    }

    /**
     * Handles RPC method "config.v3.getConfig" requests.
     * Uses the template pattern to call methods in classes that extend RpcServer.
     */
    private void getConfigV3(Request req) {
        req.detach();
        rpcAuthorizer.authorizeConfigRequest(req)
                .thenRun(() -> addToRequestQueue(JRTServerConfigRequestV3.createFromRequest(req)));
    }

    /**
     * Returns 0 if server is alive.
     */
    private void ping(Request req) {
        req.returnValues().add(new Int32Value(0));
    }

    /**
     * Returns a String with statistics data for the server.
     *
     * @param req a Request
     */
    private void printStatistics(Request req) {
        req.returnValues().add(new StringValue("Delayed responses queue size: " + delayedConfigResponses.size()));
    }

    @Override
    public void run() {
        log.log(FINE, "Rpc server will listen on port " + spec.port());
        try {
            Acceptor acceptor = supervisor.listen(spec);
            isRunning = true;
            supervisor.transport().join();
            acceptor.shutdown().join();
        } catch (ListenFailedException e) {
            stop();
            throw new RuntimeException("Could not listen at " + spec, e);
        }
    }

    public void stop() {
        executorService.shutdown();
        try {
            executorService.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.interrupted(); // Ignore and continue shutdown.
        }
        delayedConfigResponses.stop();
        fileServer.close();
        supervisor.transport().shutdown().join();
        isRunning = false;
    }

    public boolean isRunning() {
        return isRunning;
    }

    /**
     * Set up RPC method handlers, except handlers for getting config (see #setUpGetConfigHandlers())
     */
    private void setUpFileDistributionHandlers() {
        getSupervisor().addMethod(new Method("ping", "", "i", this::ping)
                                  .methodDesc("ping")
                                  .returnDesc(0, "ret code", "return code, 0 is OK"));
        getSupervisor().addMethod(new Method("printStatistics", "", "s", this::printStatistics)
                                  .methodDesc("printStatistics")
                                  .returnDesc(0, "statistics", "Statistics for server"));
        getSupervisor().addMethod(new Method("filedistribution.serveFile", "si*", "is", this::serveFile)
                                  .requireCapabilities(Capability.CONFIGSERVER__FILEDISTRIBUTION_API));
        getSupervisor().addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload)
                                  .requireCapabilities(Capability.CONFIGSERVER__FILEDISTRIBUTION_API)
                                  .methodDesc("set which file references to download")
                                  .paramDesc(0, "file references", "file reference to download")
                                  .returnDesc(0, "ret", "0 if success, 1 otherwise"));
    }

    /**
     * Set up RPC method handlers for getting config
     */
    public void setUpGetConfigHandlers() {
        // The getConfig method in this class will handle RPC calls for getting config
        getSupervisor().addMethod(JRTMethods.createConfigV3GetConfigMethod(this::getConfigV3)
                                          .requireCapabilities(Capability.CONFIGSERVER__CONFIG_API));
        isServingConfigRequests = true;
    }


    public boolean isServingConfigRequests() {
        return isServingConfigRequests;
    }

    private ApplicationState getState(ApplicationId id) {
        return applicationStateMap.computeIfAbsent(id, __ -> new ApplicationState(0));
    }

    boolean hasNewerGeneration(ApplicationId id, long generation) {
        return getState(id).getActiveGeneration() > generation;
    }

    /**
     * Checks all delayed responses for config changes and waits until all has been answered.
     * This method should be called when config is activated in the server.
     */
    @Override
    public void configActivated(ApplicationVersions applicationVersions) {
        ApplicationId applicationId = applicationVersions.getId();
        ApplicationState state = getState(applicationId);
        state.setActiveGeneration(applicationVersions.applicationGeneration());
        reloadSuperModel(applicationVersions);
        configActivated(applicationId);
    }

    private void reloadSuperModel(ApplicationVersions applicationVersions) {
        superModelRequestHandler.activateConfig(applicationVersions);
        configActivated(ApplicationId.global());
    }

    void configActivated(ApplicationId applicationId) {
        List<DelayedConfigResponses.DelayedConfigResponse> responses = delayedConfigResponses.drainQueue(applicationId);
        String logPre = TenantRepository.logPre(applicationId);
        log.log(FINE, () -> logPre + "Start of configActivated: " + responses.size() + " requests on delayed requests queue");
        int responsesSent = 0;
        CompletionService<Boolean> completionService = new ExecutorCompletionService<>(executorService);
        while (!responses.isEmpty()) {
            DelayedConfigResponses.DelayedConfigResponse delayedConfigResponse = responses.remove(0);
            // Discard the ones that we have already answered
            // Doing cancel here deals with the case where the timer is already running or has not run, so
            // there is no need for any extra check.
            if (delayedConfigResponse.cancel()) {
                log.log(FINE, () -> logPre + "Timer cancelled for " + delayedConfigResponse.request);
                // Do not wait for this request if we were unable to execute
                if (addToRequestQueue(delayedConfigResponse.request, false, completionService)) {
                    responsesSent++;
                }
            } else {
                log.log(FINE, () -> logPre + "Timer already cancelled or finished or never scheduled");
            }
        }

        for (int i = 0; i < responsesSent; i++) {
            try {
                completionService.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void applicationRemoved(ApplicationId applicationId) {
        superModelRequestHandler.removeApplication(applicationId);
        configActivated(applicationId);
        configActivated(ApplicationId.global());
    }

    public void respond(JRTServerConfigRequest request) {
        log.log(FINE, () -> "Trace when responding:\n" + request.getRequestTrace().toString());
        request.getRequest().returnRequest();
    }

    /**
     * Returns the tenant for this request, empty if there is none
     * (which on hosted Vespa means that the requesting host is not currently active for any tenant)
     */
    Optional<TenantName> resolveTenant(JRTServerConfigRequest request, Trace trace) {
        if ("*".equals(request.getConfigKey().getConfigId())) return Optional.of(ApplicationId.global().tenant());

        String hostname = request.getClientHostName();
        ApplicationId applicationId = hostRegistry.getApplicationId(hostname);
        if (applicationId == null) {
            if (GetConfigProcessor.logDebug(trace)) {
                String message = "Did not find tenant for host '" + hostname + "', using " + TenantName.defaultName() +
                                 ". Hosts in host registry: " + hostRegistry.getAllHosts();
                log.log(FINE, () -> message);
                trace.trace(6, message);
            }
            return Optional.empty();
        }
        return Optional.of(applicationId.tenant());
    }

    public ConfigResponse resolveConfig(JRTServerConfigRequest request, GetConfigContext context, Optional<Version> vespaVersion) {
        context.trace().trace(TRACELEVEL, "RpcServer.resolveConfig()");
        return context.requestHandler().resolveConfig(context.applicationId(), request, vespaVersion);
    }

    private Supervisor getSupervisor() {
        return supervisor;
    }

    private void addToRequestQueue(JRTServerConfigRequest request) {
        addToRequestQueue(request, false, null);
    }

    public Boolean addToRequestQueue(JRTServerConfigRequest request, boolean forceResponse, CompletionService<Boolean> completionService) {
        // It's no longer delayed if we get here
        request.setDelayedResponse(false);
        try {
            final GetConfigProcessor task = new GetConfigProcessor(this, request, forceResponse);
            if (completionService == null) {
                executorService.submit(task);
            } else {
                completionService.submit(() -> { task.run(); return true; });
            }
            updateWorkQueueMetrics();
            return true;
        } catch (RejectedExecutionException e) {
            request.addErrorResponse(ErrorCode.INTERNAL_ERROR, "getConfig request queue size is larger than configured max limit");
            respond(request);
            return false;
        }
    }

    private void updateWorkQueueMetrics() {
        int queued = executorService.getQueue().size();
        metrics.setRpcServerQueueSize(queued);
    }

    /**
     * Returns the context for this request, or null if the server is not properly set up with handlers
     */
    GetConfigContext createGetConfigContext(Optional<TenantName> optionalTenant, JRTServerConfigRequest request, Trace trace) {
        if ("*".equals(request.getConfigKey().getConfigId())) {
            return GetConfigContext.create(ApplicationId.global(), superModelRequestHandler, trace);
        }
        // TODO: Look into if this fallback really is needed
        TenantName tenant = optionalTenant.orElse(TenantName.defaultName());
        Optional<RequestHandler> requestHandler = getRequestHandler(tenant);
        if (requestHandler.isEmpty()) {
            String msg = TenantRepository.logPre(tenant) + "Unable to find request handler for tenant '" + tenant  +
                         "'. Request from host '" + request.getClientHostName() + "'";
            metrics.incUnknownHostRequests();
            trace.trace(TRACELEVEL, msg);
            log.log(WARNING, msg);
            return GetConfigContext.empty();
        }
        RequestHandler handler = requestHandler.get();
        ApplicationId applicationId = handler.resolveApplicationId(request.getClientHostName());
        // TODO: Look into if this fallback really is needed
        if (applicationId == null && tenant.equals(TenantName.defaultName()))
                applicationId = ApplicationId.defaultId();
        if (trace.shouldTrace(TRACELEVEL_DEBUG)) {
            trace.trace(TRACELEVEL_DEBUG, "Host '" + request.getClientHostName() + "' should have config from application '" + applicationId + "'");
        }
        return GetConfigContext.create(applicationId, handler, trace);
    }

    Optional<RequestHandler> getRequestHandler(TenantName tenant) {
        return Optional.ofNullable(tenants.get(tenant))
                .map(Tenant::getRequestHandler);
    }

    void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
        delayedConfigResponses.delayResponse(request, context);
    }

    @Override
    public void onTenantDelete(TenantName tenant) {
        log.log(FINE, () -> TenantRepository.logPre(tenant) +
                            "Tenant deleted, removing request handler and cleaning host registry");
        tenants.remove(tenant);
    }

    @Override
    public void onTenantsLoaded() {
        allTenantsLoaded = true;
        superModelRequestHandler.enable();
    }

    @Override
    public void onTenantCreate(Tenant tenant) {
        tenants.put(tenant.getName(), tenant);
    }

    /** Returns true only after all tenants are loaded */
    public boolean allTenantsLoaded() { return allTenantsLoaded; }

    /** Returns true if this rpc server is currently running in a hosted Vespa configuration */
    public boolean isHostedVespa() { return hostedVespa; }

    /** Returns true if empty sentinel config can be returned when a request from a host that is
     * not part of an application asks for sentinel config */
    public boolean canReturnEmptySentinelConfig() { return canReturnEmptySentinelConfig; }
    
    MetricUpdaterFactory metricUpdaterFactory() {
        return metricUpdaterFactory;
    }

    boolean useRequestVersion() {
        return useRequestVersion;
    }

    static class ChunkedFileReceiver implements FileServer.Receiver {
        final Target target;
        ChunkedFileReceiver(Target target) {
            this.target = target;
        }

        @Override
        public String toString() {
            return target.toString();
        }

        @Override
        public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
            int session = sendMeta(fileData);
            sendParts(session, fileData);
            sendEof(session, fileData, status);
        }

        private void sendParts(int session, FileReferenceData fileData) {
            ByteBuffer bb = ByteBuffer.allocate(0x100000);
            for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) {
                byte [] buf = bb.array();
                if (buf.length != bb.position()) {
                    buf = new byte [bb.position()];
                    bb.flip();
                    bb.get(buf);
                }
                sendPart(session, fileData.fileReference(), partId, buf);
                bb.clear();
            }
        }

        private int sendMeta(FileReferenceData fileData) {
            Request request = createMetaRequest(fileData);
            invokeRpcIfValidConnection(request);
            if (request.isError()) {
                log.log(WARNING, () -> "Failed delivering meta for reference '" + fileData.fileReference().value() +
                        "' with file '" + fileData.filename() + "' to " +
                        target.toString() + " with error: '" + request.errorMessage() + "'.");
                return 1;
            } else {
                if (request.returnValues().get(0).asInt32() != 0) {
                    throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName());
                }
                return request.returnValues().get(1).asInt32();
            }
        }

        // non-private for testing
        static Request createMetaRequest(FileReferenceData fileData) {
            Request request = new Request(FileReceiver.RECEIVE_META_METHOD);
            request.parameters().add(new StringValue(fileData.fileReference().value()));
            request.parameters().add(new StringValue(fileData.filename()));
            request.parameters().add(new StringValue(fileData.type().name()));
            request.parameters().add(new Int64Value(fileData.size()));
            // Only add parameter if not gzip, this is default and old clients will not handle the extra parameter
            // TODO Always add parameter in Vespa 9
            if (fileData.compressionType() != CompressionType.gzip)
                request.parameters().add(new StringValue(fileData.compressionType().name()));
            return request;
        }

        private void sendPart(int session, FileReference ref, int partId, byte [] buf) {
            Request request = new Request(FileReceiver.RECEIVE_PART_METHOD);
            request.parameters().add(new StringValue(ref.value()));
            request.parameters().add(new Int32Value(session));
            request.parameters().add(new Int32Value(partId));
            request.parameters().add(new DataValue(buf));
            invokeRpcIfValidConnection(request);
            if (request.isError()) {
                throw new IllegalArgumentException("Failed delivering part of reference '" + ref.value() + "' to " +
                                                           target.toString() + " with error: '" + request.errorMessage() + "'.");
            } else {
                if (request.returnValues().get(0).asInt32() != 0) {
                    throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName());
                }
            }
        }

        private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) {
            Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD);
            request.parameters().add(new StringValue(fileData.fileReference().value()));
            request.parameters().add(new Int32Value(session));
            request.parameters().add(new Int64Value(fileData.xxhash()));
            request.parameters().add(new Int32Value(status.getCode()));
            request.parameters().add(new StringValue(status.getDescription()));
            invokeRpcIfValidConnection(request);
            if (request.isError()) {
                throw new IllegalArgumentException("Failed delivering eof for reference '" + fileData.fileReference().value() +
                                                           "' with file '" + fileData.filename() + "' to " +
                                                           target.toString() + " with error: '" + request.errorMessage() + "'.");
            } else {
                if (request.returnValues().get(0).asInt32() != 0) {
                    throw new IllegalArgumentException("Unknown error from target '" + target.toString() + "' during rpc call " + request.methodName());
                }
            }
        }

        private void invokeRpcIfValidConnection(Request request) {
            if (target.isValid()) {
                target.invokeSync(request, Duration.ofMinutes(10));
            } else {
                throw new RuntimeException("Connection to " + target + " is invalid", target.getConnectionLostReason());
            }
        }
    }

    private void serveFile(Request request) {
        request.detach();
        rpcAuthorizer.authorizeFileRequest(request)
                .thenRun(() -> { // okay to do in authorizer thread as serveFile is async
                    FileServer.Receiver receiver = new ChunkedFileReceiver(request.target());

                    FileReference reference = new FileReference(request.parameters().get(0).asString());
                    boolean downloadFromOtherSourceIfNotFound = request.parameters().get(1).asInt32() == 0;
                    Set<FileReferenceData.CompressionType> acceptedCompressionTypes = Set.of(CompressionType.gzip);
                    // Newer clients specify accepted compression types in request
                    // TODO Require acceptedCompressionTypes parameter in Vespa 9
                    if (request.parameters().size() > 2)
                        acceptedCompressionTypes = Arrays.stream(request.parameters().get(2).asStringArray())
                                                         .map(CompressionType::valueOf)
                                                         .collect(Collectors.toSet());

                    fileServer.serveFile(reference, downloadFromOtherSourceIfNotFound, acceptedCompressionTypes, request, receiver);
                });
    }

    private void setFileReferencesToDownload(Request req) {
        req.detach();
        rpcAuthorizer.authorizeFileRequest(req)
                .thenRun(() -> { // okay to do in authorizer thread as downloadIfNeeded is async
                    String[] fileReferenceStrings = req.parameters().get(0).asStringArray();
                    Stream.of(fileReferenceStrings)
                            .map(FileReference::new)
                            .forEach(fileReference -> downloader.downloadIfNeeded(
                                    new FileReferenceDownload(fileReference,
                                                              req.target().toString(),
                                                              false /* downloadFromOtherSourceIfNotFound */)));
                    req.returnValues().add(new Int32Value(0));
                });
    }
}