diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-02-13 13:34:30 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-13 13:34:30 +0100 |
commit | 8daccb7a9396d1ffa5fffb9d7bba04e3f15aca8e (patch) | |
tree | 24b756c1c7f1b842ba3070bb77ef74576f7fc830 /filedistribution | |
parent | 4c9c8a6a7b7025a09df28dc91dbba0fc7b26091e (diff) | |
parent | d5f1d54eb837310e6762956cb11c89324f2b4e1e (diff) |
Merge pull request #5025 from vespa-engine/hmusum/do-rpc-download-in-its-own-thread
Do work in RPC call for getting file in its own thread
Diffstat (limited to 'filedistribution')
2 files changed, 26 insertions, 17 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java index e9d2e9f7e8a..3e81321f92e 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.FileReference; import com.yahoo.jrt.DoubleArray; import com.yahoo.jrt.Int32Value; @@ -15,6 +16,8 @@ import java.io.File; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,6 +33,8 @@ public class FileDistributionRpcServer { private final Supervisor supervisor; private final FileDownloader downloader; + private final ExecutorService rpcDownloadExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), + new DaemonThreadFactory("Rpc executor")); public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) { this.supervisor = supervisor; @@ -74,22 +79,7 @@ public class FileDistributionRpcServer { @SuppressWarnings({"UnusedDeclaration"}) public final void getFile(Request req) { req.detach(); - FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); - Optional<File> pathToFile = downloader.getFile(fileReference); - try { - if (pathToFile.isPresent()) { - req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); - log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); - } else { - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); - req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); - } - } catch (Throwable e) { - log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage()); - req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); - } - req.returnRequest(); + rpcDownloadExecutor.execute(() -> downloadFile(req)); } @SuppressWarnings({"UnusedDeclaration"}) @@ -123,4 +113,23 @@ public class FileDistributionRpcServer { req.returnValues().add(new Int32Value(0)); } + private void downloadFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); + Optional<File> pathToFile = downloader.getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); + } + } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 66e04f8b92b..805b535bdf8 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -118,7 +118,7 @@ public class FileReferenceDownloader { log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress() + ", error code: " + request.errorCode()); if (request.isError() && request.errorCode() == ErrorCode.CONNECTION || request.errorCode() == ErrorCode.TIMEOUT) { - log.log(LogLevel.WARNING, "Setting error for connection " + connection.getAddress()); + log.log(LogLevel.INFO, "Mark connection " + connection.getAddress() + " with error"); connectionPool.setError(connection, request.errorCode()); } return false; |