diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-12-07 15:45:00 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-12-07 15:53:29 +0100 |
commit | b9b0b7d153cde0f6d023d849c8ffdcefb533a277 (patch) | |
tree | d7e029c0fe9d4f7b9fd2f3dbd0dacd4a6f0cc474 | |
parent | 135ef16d40619bc7fe186b1cdfe825a39b22492d (diff) |
Detach and serve from separate thread.
5 files changed, 136 insertions, 131 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 958f26632ef..90792759d2c 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -7,6 +7,9 @@ import com.yahoo.config.FileReference; import com.yahoo.config.model.api.FileDistribution; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.io.IOUtils; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringValue; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; import com.yahoo.net.HostName; @@ -33,6 +36,19 @@ public class FileServer { private final ExecutorService executor; private final FileDownloader downloader; + private enum FileApiErrorCodes { + OK(0, "OK"), + NOT_FOUND(1, "Filereference not found"); + private final int code; + private final String description; + FileApiErrorCodes(int code, String description) { + this.code = code; + this.description = description; + } + int getCode() { return code; } + String getDescription() { return description; } + } + public static class ReplayStatus { private final int code; private final String description; @@ -122,6 +138,30 @@ public class FileServer { return new FileReferenceData(reference, file.getName(), type, blob); } + public void serveFile(Request request, Receiver receiver) { + executor.execute(() -> serveFile(request.parameters().get(0).asString(), request, receiver)); + } + private void serveFile(String fileReference, Request request, Receiver receiver) { + FileApiErrorCodes result; + try { + // TODO remove once verified in system tests. + log.info("Received request for reference '" + fileReference + "'"); + result = hasFile(fileReference) + ? FileApiErrorCodes.OK + : FileApiErrorCodes.NOT_FOUND; + if (result == FileApiErrorCodes.OK) { + startFileServing(fileReference, receiver); + } else { + download(new FileReference(fileReference)); + } + } catch (IllegalArgumentException e) { + result = FileApiErrorCodes.NOT_FOUND; + log.warning("Failed serving file reference '" + fileReference + "' with error " + e.toString()); + } + request.returnValues() + .add(new Int32Value(result.getCode())) + .add(new StringValue(result.getDescription())); + } public void download(FileReference fileReference) { downloader.getFile(fileReference); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 5c50fbfc31b..17368b48e59 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -74,20 +74,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { static final int TRACELEVEL_DEBUG = 9; private static final String THREADPOOL_NAME = "rpcserver worker pool"; private static final long SHUTDOWN_TIMEOUT = 60; - private enum FileApiErrorCodes { - OK(0, "OK"), - NOT_FOUND(1, "Filereference not found"); - private final int code; - private final String description; - FileApiErrorCodes(int code, String description) { - this.code = code; - this.description = description; - } - int getCode() { return code; } - String getDescription() { return description; } - } + private final Supervisor supervisor = new Supervisor(new Transport()); - private Spec spec = null; + private Spec spec; private final boolean useRequestVersion; private final boolean hostedVespa; @@ -455,25 +444,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @SuppressWarnings("UnusedDeclaration") public final void serveFile(Request request) { - String fileReference = request.parameters().get(0).asString(); - FileApiErrorCodes result; - try { - // TODO remove once verified in system tests. - log.info("Received request for reference '" + fileReference + "'"); - result = fileServer.hasFile(fileReference) - ? FileApiErrorCodes.OK - : FileApiErrorCodes.NOT_FOUND; - if (result == FileApiErrorCodes.OK) { - fileServer.startFileServing(fileReference, new FileReceiver(request.target())); - } else { - fileServer.download(new FileReference(fileReference)); - } - } catch (IllegalArgumentException e) { - result = FileApiErrorCodes.NOT_FOUND; - log.warning("Failed serving file reference '" + fileReference + "' with error " + e.toString()); - } - request.returnValues() - .add(new Int32Value(result.getCode())) - .add(new StringValue(result.getDescription())); + request.detach(); + fileServer.serveFile(request, new FileReceiver(request.target())); } } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 9fe5eec54ff..ca8b8c27ad3 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -1,13 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.defaults.Defaults; -import com.yahoo.yolean.Exceptions; import java.io.File; import java.time.Duration; @@ -17,6 +15,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; @@ -47,17 +46,28 @@ public class FileDownloader { } public Optional<File> getFile(FileReference fileReference) { + try { + return getFutureFile(fileReference).get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + return Optional.empty(); + } + } + + public Future<Optional<File>> getFutureFile(FileReference fileReference) { Objects.requireNonNull(fileReference, "file reference cannot be null"); File directory = new File(downloadDirectory, fileReference.value()); log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); Optional<File> file = getFileFromFileSystem(fileReference, directory); if (file.isPresent()) { - return file; + SettableFuture<Optional<File>> future = SettableFuture.create(); + future.set(file); + return future; } else { log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + directory.getAbsolutePath() + ", starting download"); return queueForDownload(fileReference, timeout); + } } @@ -86,11 +96,9 @@ public class FileDownloader { if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { File file = files[0]; if (!file.exists()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "' does not exist"); + throw new RuntimeException("File with reference '" + fileReference.value() + "' does not exist"); } else if (!file.canRead()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "'exists, but unable to read it"); + throw new RuntimeException("File with reference '" + fileReference.value() + "'exists, but unable to read it"); } else { fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0); return Optional.of(file); @@ -99,46 +107,26 @@ public class FileDownloader { return Optional.empty(); } - private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) { - if (fileReferenceDownloader.isDownloading(fileReference)) { + private synchronized Future<Optional<File>> queueForDownload(FileReference fileReference, Duration timeout) { + Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); + if (inProgress != null) { log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); - ListenableFuture<Optional<File>> future = - fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); - try { - return future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " + - Exceptions.toMessageString(e)); - } + return inProgress; } - SettableFuture<Optional<File>> future = SettableFuture.create(); - queueForDownload(new FileReferenceDownload(fileReference, future)); + Future<Optional<File>> future = queueForDownload(fileReference); log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); - - try { - Optional<File> fileDownloaded; - try { - log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); - fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); - log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); - } catch (TimeoutException e) { - log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); - return Optional.empty(); - } - return fileDownloaded; - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Could not download '" + fileReference.value() + "'"); - } + return future; } // We don't care about the future in this call - private synchronized void queueForDownload(FileReference fileReference) { - queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); + private Future<Optional<File>> queueForDownload(FileReference fileReference) { + return queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); } - private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) { + private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) { fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); + return fileReferenceDownload.future(); } public FileReferenceDownloader fileReferenceDownloader() { diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index fb511411128..9de4c1fcd5b 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -2,13 +2,14 @@ package com.yahoo.vespa.filedistribution; +import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import java.io.File; import java.util.Optional; -public class FileReferenceDownload { +class FileReferenceDownload { private final FileReference fileReference; private final SettableFuture<Optional<File>> future; @@ -20,9 +21,7 @@ public class FileReferenceDownload { FileReference fileReference() { return fileReference; } - SettableFuture<Optional<File>> future() { return future; } - } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index b51a4b68212..3a9af23e6d8 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -20,6 +20,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -44,10 +45,7 @@ public class FileReferenceDownloader { private final ExecutorService downloadExecutor = Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); - private ExecutorService readFromQueueExecutor = - Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue")); private final ConnectionPool connectionPool; - private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>(); private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); private final Map<FileReference, Double> downloadStatus = new HashMap<>(); private final Duration downloadTimeout; @@ -56,67 +54,57 @@ public class FileReferenceDownloader { FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { this.connectionPool = connectionPool; this.downloadTimeout = timeout; - readFromQueueExecutor.submit(this::readFromQueue); this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory); } - private synchronized Optional<File> startDownload(FileReference fileReference, - Duration timeout, - FileReferenceDownload fileReferenceDownload) - throws ExecutionException, InterruptedException, TimeoutException { - downloads.put(fileReference, fileReferenceDownload); - setDownloadStatus(fileReference.value(), 0.0); - - int numAttempts = 0; + private void startDownload(FileReference fileReference, Duration timeout, + FileReferenceDownload fileReferenceDownload) + { + synchronized (downloads) { + downloads.put(fileReference, fileReferenceDownload); + downloadStatus.put(fileReference, 0.0); + } + long end = System.currentTimeMillis() + timeout.toMillis(); boolean downloadStarted = false; - do { - if (startDownloadRpc(fileReference)) - downloadStarted = true; - else - Thread.sleep(100); - } while (!downloadStarted && ++numAttempts <= 10); // TODO: How long/many times to retry? - - if (downloadStarted) { - return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } else { + while ((System.currentTimeMillis() < end) && !downloadStarted) { + try { + if (startDownloadRpc(fileReference)) { + downloadStarted = true; + } else { + Thread.sleep(10); + } + } + catch (InterruptedException | ExecutionException e) {} + } + + if ( !downloadStarted) { fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); - downloads.remove(fileReference); - return Optional.empty(); + synchronized (downloads) { + downloads.remove(fileReference); + } } } - synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { - downloadQueue.add(fileReferenceDownload); + void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { + log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'"); + downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); } void receiveFile(FileReferenceData fileReferenceData) { fileReceiver.receiveFile(fileReferenceData); } - synchronized Set<FileReference> queuedDownloads() { - return downloadQueue.stream() - .map(FileReferenceDownload::fileReference) - .collect(Collectors.toCollection(LinkedHashSet::new)); - } - - private void readFromQueue() { - do { - FileReferenceDownload fileReferenceDownload = downloadQueue.poll(); - if (fileReferenceDownload == null) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { /* ignore for now */} + void completedDownloading(FileReference fileReference, File file) { + synchronized (downloads) { + FileReferenceDownload download = downloads.get(fileReference); + if (download != null) { + downloadStatus.put(fileReference, 1.0); + downloads.remove(fileReference); + download.future().set(Optional.of(file)); } else { - log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'"); - downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + log.warning("Received a file " + fileReference + " I did not ask for. Impossible"); } - } while (true); - } - - void completedDownloading(FileReference fileReference, File file) { - if (downloads.containsKey(fileReference)) - downloads.get(fileReference).future().set(Optional.of(file)); - downloadStatus.put(fileReference, 100.0); + } } private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { @@ -146,14 +134,22 @@ public class FileReferenceDownloader { } } - synchronized boolean isDownloading(FileReference fileReference) { - return downloads.containsKey(fileReference); + boolean isDownloading(FileReference fileReference) { + synchronized (downloads) { + return downloads.containsKey(fileReference); + } } - synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { - FileReferenceDownload fileReferenceDownload = downloads.get(fileReference); - fileReferenceDownload.future().addListener(runnable, downloadExecutor); - return fileReferenceDownload.future(); + ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { + synchronized (downloads) { + FileReferenceDownload download = downloads.get(fileReference); + if (download != null) { + download.future().addListener(runnable, downloadExecutor); + return download.future(); + } + } + return null; + } private void execute(Request request, Connection connection) { @@ -173,15 +169,26 @@ public class FileReferenceDownloader { } double downloadStatus(String file) { - return downloadStatus.getOrDefault(new FileReference(file), 0.0); + double status = 0.0; + synchronized (downloads) { + Double download = downloadStatus.get(new FileReference(file)); + if (download != null) { + status = download; + } + } + return status; } - void setDownloadStatus(String file, double percentageDownloaded) { - downloadStatus.put(new FileReference(file), percentageDownloaded); + void setDownloadStatus(String file, double completeness) { + synchronized (downloads) { + downloadStatus.put(new FileReference(file), completeness); + } } Map<FileReference, Double> downloadStatus() { - return ImmutableMap.copyOf(downloadStatus); + synchronized (downloads) { + return ImmutableMap.copyOf(downloadStatus); + } } public ConnectionPool connectionPool() { |