aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
blob: c6f33a29410cbf8756a0e14b78bd0f62203ee773 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy.filedistribution;

import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.FileReference;
import com.yahoo.jrt.DoubleArray;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.filedistribution.FileDownloader;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;

import java.io.File;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
 * An RPC server that handles file distribution requests.
 *
 * @author hmusum
 */
class FileDistributionRpcServer {

    private final static Logger log = Logger.getLogger(FileDistributionRpcServer.class.getName());

    private final Supervisor supervisor;
    private final FileDownloader downloader;
    private final ExecutorService rpcDownloadExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
                                                                                     new DaemonThreadFactory("Rpc executor"));

    FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) {
        this.supervisor = supervisor;
        this.downloader = downloader;
        declareFileDistributionMethods();
    }

    void close() {
        rpcDownloadExecutor.shutdownNow();
        try {
            rpcDownloadExecutor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void declareFileDistributionMethods() {
        // Legacy method, needs to be the same name as used in filedistributor
        supervisor.addMethod(new Method("waitFor", "s", "s", this::getFile)
                                     .methodDesc("get path to file reference")
                                     .paramDesc(0, "file reference", "file reference")
                                     .returnDesc(0, "path", "path to file"));
        supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", this::getFile)
                                     .methodDesc("get path to file reference")
                                     .paramDesc(0, "file reference", "file reference")
                                     .returnDesc(0, "path", "path to file"));
        supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", this::getActiveFileReferencesStatus)
                                     .methodDesc("download status for file references")
                                     .returnDesc(0, "file references", "array of file references")
                                     .returnDesc(1, "download status", "percentage downloaded of each file reference in above array"));
        supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload)
                                     .methodDesc("set which file references to download")
                                     .paramDesc(0, "file references", "file reference to download")
                                     .returnDesc(0, "ret", "0 if success, 1 otherwise"));
    }


    //---------------- RPC methods ------------------------------------
    // TODO: Duplicate of code in FileAcquirerImpl. Find out where to put it. What about C++ code using this RPC call?
    private static final int baseErrorCode = 0x10000;
    private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000;

    private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode;

    private void getFile(Request req) {
        req.detach();
        rpcDownloadExecutor.execute(() -> downloadFile(req));
    }

    private void getActiveFileReferencesStatus(Request req) {
        Map<FileReference, Double> downloadStatus = downloader.downloadStatus();

        String[] fileRefArray = new String[downloadStatus.keySet().size()];
        fileRefArray = downloadStatus.keySet().stream()
                .map(FileReference::value)
                .collect(Collectors.toList())
                .toArray(fileRefArray);

        double[] downloadStatusArray = new double[downloadStatus.values().size()];
        int i = 0;
        for (Double d : downloadStatus.values()) {
            downloadStatusArray[i++] = d;
        }

        req.returnValues().add(new StringArray(fileRefArray));
        req.returnValues().add(new DoubleArray(downloadStatusArray));
    }

    private void setFileReferencesToDownload(Request req) {
        log.log(LogLevel.DEBUG, () -> "Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
        Arrays.stream(req.parameters().get(0).asStringArray())
                .map(FileReference::new)
                .forEach(fileReference -> downloader.downloadIfNeeded(new FileReferenceDownload(fileReference)));
        req.returnValues().add(new Int32Value(0));
    }

    private void downloadFile(Request req) {
        FileReference fileReference = new FileReference(req.parameters().get(0).asString());
        log.log(LogLevel.DEBUG, () -> "getFile() called for file reference '" + fileReference.value() + "'");
        Optional<File> file = downloader.getFile(fileReference);
        if (file.isPresent()) {
            new RequestTracker().trackRequest(file.get().getParentFile());
            req.returnValues().add(new StringValue(file.get().getAbsolutePath()));
            log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' available at " + file.get());
        } else {
            log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error");
            req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found");
        }

        req.returnRequest();
    }

}