diff options
author | Harald Musum <musum@oath.com> | 2018-02-13 22:12:39 +0100 |
---|---|---|
committer | Harald Musum <musum@oath.com> | 2018-02-13 22:12:39 +0100 |
commit | 7178e8d4bc949097a59c35a5d229c4852361fe03 (patch) | |
tree | e12352f55cec36ec7cef3dcba6d19057f185825a /filedistribution | |
parent | 1c0e1046e8480736b7f0cdc3ff4bbb04eec2687b (diff) |
Do download in a separate thread
Avoid holding the transport thread
Diffstat (limited to 'filedistribution')
2 files changed, 32 insertions, 22 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java index e9d2e9f7e8a..3e81321f92e 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.FileReference; import com.yahoo.jrt.DoubleArray; import com.yahoo.jrt.Int32Value; @@ -15,6 +16,8 @@ import java.io.File; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,6 +33,8 @@ public class FileDistributionRpcServer { private final Supervisor supervisor; private final FileDownloader downloader; + private final ExecutorService rpcDownloadExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), + new DaemonThreadFactory("Rpc executor")); public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) { this.supervisor = supervisor; @@ -74,22 +79,7 @@ public class FileDistributionRpcServer { @SuppressWarnings({"UnusedDeclaration"}) public final void getFile(Request req) { req.detach(); - FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); - Optional<File> pathToFile = downloader.getFile(fileReference); - try { - if (pathToFile.isPresent()) { - req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); - log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); - } else { - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); - req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); - } - } catch (Throwable e) { - log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage()); - req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); - } - req.returnRequest(); + rpcDownloadExecutor.execute(() -> downloadFile(req)); } @SuppressWarnings({"UnusedDeclaration"}) @@ -123,4 +113,23 @@ public class FileDistributionRpcServer { req.returnValues().add(new Int32Value(0)); } + private void downloadFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); + Optional<File> pathToFile = downloader.getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); + } + } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 66e04f8b92b..8e230a1a2cc 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -51,10 +51,6 @@ public class FileReferenceDownloader { private void startDownload(Duration timeout, FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); - synchronized (downloads) { - downloads.put(fileReference, fileReferenceDownload); - downloadStatus.put(fileReference, 0.0); - } long end = System.currentTimeMillis() + timeout.toMillis(); boolean downloadStarted = false; while ((System.currentTimeMillis() < end) && !downloadStarted) { @@ -77,7 +73,12 @@ public class FileReferenceDownloader { } void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { - log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "' with timeout " + downloadTimeout); + FileReference fileReference = fileReferenceDownload.fileReference(); + log.log(LogLevel.DEBUG, "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout); + synchronized (downloads) { + downloads.put(fileReference, fileReferenceDownload); + downloadStatus.put(fileReference, 0.0); + } downloadExecutor.submit(() -> startDownload(downloadTimeout, fileReferenceDownload)); } @@ -118,7 +119,7 @@ public class FileReferenceDownloader { log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress() + ", error code: " + request.errorCode()); if (request.isError() && request.errorCode() == ErrorCode.CONNECTION || request.errorCode() == ErrorCode.TIMEOUT) { - log.log(LogLevel.WARNING, "Setting error for connection " + connection.getAddress()); + log.log(LogLevel.INFO, "Mark connection " + connection.getAddress() + " with error"); connectionPool.setError(connection, request.errorCode()); } return false; |