summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-02-13 13:34:30 +0100
committerGitHub <noreply@github.com>2018-02-13 13:34:30 +0100
commit8daccb7a9396d1ffa5fffb9d7bba04e3f15aca8e (patch)
tree24b756c1c7f1b842ba3070bb77ef74576f7fc830 /filedistribution
parent4c9c8a6a7b7025a09df28dc91dbba0fc7b26091e (diff)
parentd5f1d54eb837310e6762956cb11c89324f2b4e1e (diff)
Merge pull request #5025 from vespa-engine/hmusum/do-rpc-download-in-its-own-thread
Do work in RPC call for getting file in its own thread
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java41
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java2
2 files changed, 26 insertions, 17 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java
index e9d2e9f7e8a..3e81321f92e 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.FileReference;
import com.yahoo.jrt.DoubleArray;
import com.yahoo.jrt.Int32Value;
@@ -15,6 +16,8 @@ import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -30,6 +33,8 @@ public class FileDistributionRpcServer {
private final Supervisor supervisor;
private final FileDownloader downloader;
+ private final ExecutorService rpcDownloadExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+ new DaemonThreadFactory("Rpc executor"));
public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) {
this.supervisor = supervisor;
@@ -74,22 +79,7 @@ public class FileDistributionRpcServer {
@SuppressWarnings({"UnusedDeclaration"})
public final void getFile(Request req) {
req.detach();
- FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'");
- Optional<File> pathToFile = downloader.getFile(fileReference);
- try {
- if (pathToFile.isPresent()) {
- req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath()));
- log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' available at " + pathToFile.get());
- } else {
- log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error");
- req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found");
- }
- } catch (Throwable e) {
- log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage());
- req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed");
- }
- req.returnRequest();
+ rpcDownloadExecutor.execute(() -> downloadFile(req));
}
@SuppressWarnings({"UnusedDeclaration"})
@@ -123,4 +113,23 @@ public class FileDistributionRpcServer {
req.returnValues().add(new Int32Value(0));
}
+ private void downloadFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'");
+ Optional<File> pathToFile = downloader.getFile(fileReference);
+ try {
+ if (pathToFile.isPresent()) {
+ req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath()));
+ log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' available at " + pathToFile.get());
+ } else {
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error");
+ req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found");
+ }
+ } catch (Throwable e) {
+ log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage());
+ req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed");
+ }
+ req.returnRequest();
+ }
+
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 66e04f8b92b..805b535bdf8 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -118,7 +118,7 @@ public class FileReferenceDownloader {
log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress() +
", error code: " + request.errorCode());
if (request.isError() && request.errorCode() == ErrorCode.CONNECTION || request.errorCode() == ErrorCode.TIMEOUT) {
- log.log(LogLevel.WARNING, "Setting error for connection " + connection.getAddress());
+ log.log(LogLevel.INFO, "Mark connection " + connection.getAddress() + " with error");
connectionPool.setError(connection, request.errorCode());
}
return false;