aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-12-07 15:45:00 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-12-07 15:53:29 +0100
commitb9b0b7d153cde0f6d023d849c8ffdcefb533a277 (patch)
treed7e029c0fe9d4f7b9fd2f3dbd0dacd4a6f0cc474
parent135ef16d40619bc7fe186b1cdfe825a39b22492d (diff)
Detach and serve from separate thread.
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java40
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java37
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java62
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java5
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java123
5 files changed, 136 insertions, 131 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index 958f26632ef..90792759d2c 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -7,6 +7,9 @@ import com.yahoo.config.FileReference;
import com.yahoo.config.model.api.FileDistribution;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.io.IOUtils;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.net.HostName;
@@ -33,6 +36,19 @@ public class FileServer {
private final ExecutorService executor;
private final FileDownloader downloader;
+ private enum FileApiErrorCodes {
+ OK(0, "OK"),
+ NOT_FOUND(1, "Filereference not found");
+ private final int code;
+ private final String description;
+ FileApiErrorCodes(int code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+ int getCode() { return code; }
+ String getDescription() { return description; }
+ }
+
public static class ReplayStatus {
private final int code;
private final String description;
@@ -122,6 +138,30 @@ public class FileServer {
return new FileReferenceData(reference, file.getName(), type, blob);
}
+ public void serveFile(Request request, Receiver receiver) {
+ executor.execute(() -> serveFile(request.parameters().get(0).asString(), request, receiver));
+ }
+ private void serveFile(String fileReference, Request request, Receiver receiver) {
+ FileApiErrorCodes result;
+ try {
+ // TODO remove once verified in system tests.
+ log.info("Received request for reference '" + fileReference + "'");
+ result = hasFile(fileReference)
+ ? FileApiErrorCodes.OK
+ : FileApiErrorCodes.NOT_FOUND;
+ if (result == FileApiErrorCodes.OK) {
+ startFileServing(fileReference, receiver);
+ } else {
+ download(new FileReference(fileReference));
+ }
+ } catch (IllegalArgumentException e) {
+ result = FileApiErrorCodes.NOT_FOUND;
+ log.warning("Failed serving file reference '" + fileReference + "' with error " + e.toString());
+ }
+ request.returnValues()
+ .add(new Int32Value(result.getCode()))
+ .add(new StringValue(result.getDescription()));
+ }
public void download(FileReference fileReference) {
downloader.getFile(fileReference);
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 5c50fbfc31b..17368b48e59 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -74,20 +74,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
static final int TRACELEVEL_DEBUG = 9;
private static final String THREADPOOL_NAME = "rpcserver worker pool";
private static final long SHUTDOWN_TIMEOUT = 60;
- private enum FileApiErrorCodes {
- OK(0, "OK"),
- NOT_FOUND(1, "Filereference not found");
- private final int code;
- private final String description;
- FileApiErrorCodes(int code, String description) {
- this.code = code;
- this.description = description;
- }
- int getCode() { return code; }
- String getDescription() { return description; }
- }
+
private final Supervisor supervisor = new Supervisor(new Transport());
- private Spec spec = null;
+ private Spec spec;
private final boolean useRequestVersion;
private final boolean hostedVespa;
@@ -455,25 +444,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@SuppressWarnings("UnusedDeclaration")
public final void serveFile(Request request) {
- String fileReference = request.parameters().get(0).asString();
- FileApiErrorCodes result;
- try {
- // TODO remove once verified in system tests.
- log.info("Received request for reference '" + fileReference + "'");
- result = fileServer.hasFile(fileReference)
- ? FileApiErrorCodes.OK
- : FileApiErrorCodes.NOT_FOUND;
- if (result == FileApiErrorCodes.OK) {
- fileServer.startFileServing(fileReference, new FileReceiver(request.target()));
- } else {
- fileServer.download(new FileReference(fileReference));
- }
- } catch (IllegalArgumentException e) {
- result = FileApiErrorCodes.NOT_FOUND;
- log.warning("Failed serving file reference '" + fileReference + "' with error " + e.toString());
- }
- request.returnValues()
- .add(new Int32Value(result.getCode()))
- .add(new StringValue(result.getDescription()));
+ request.detach();
+ fileServer.serveFile(request, new FileReceiver(request.target()));
}
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 9fe5eec54ff..ca8b8c27ad3 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -1,13 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.defaults.Defaults;
-import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.time.Duration;
@@ -17,6 +15,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
@@ -47,17 +46,28 @@ public class FileDownloader {
}
public Optional<File> getFile(FileReference fileReference) {
+ try {
+ return getFutureFile(fileReference).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ return Optional.empty();
+ }
+ }
+
+ public Future<Optional<File>> getFutureFile(FileReference fileReference) {
Objects.requireNonNull(fileReference, "file reference cannot be null");
File directory = new File(downloadDirectory, fileReference.value());
log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
Optional<File> file = getFileFromFileSystem(fileReference, directory);
if (file.isPresent()) {
- return file;
+ SettableFuture<Optional<File>> future = SettableFuture.create();
+ future.set(file);
+ return future;
} else {
log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " +
directory.getAbsolutePath() + ", starting download");
return queueForDownload(fileReference, timeout);
+
}
}
@@ -86,11 +96,9 @@ public class FileDownloader {
if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
File file = files[0];
if (!file.exists()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "' does not exist");
+ throw new RuntimeException("File with reference '" + fileReference.value() + "' does not exist");
} else if (!file.canRead()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "'exists, but unable to read it");
+ throw new RuntimeException("File with reference '" + fileReference.value() + "'exists, but unable to read it");
} else {
fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0);
return Optional.of(file);
@@ -99,46 +107,26 @@ public class FileDownloader {
return Optional.empty();
}
- private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) {
- if (fileReferenceDownloader.isDownloading(fileReference)) {
+ private synchronized Future<Optional<File>> queueForDownload(FileReference fileReference, Duration timeout) {
+ Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
+ if (inProgress != null) {
log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'");
- ListenableFuture<Optional<File>> future =
- fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " +
- Exceptions.toMessageString(e));
- }
+ return inProgress;
}
- SettableFuture<Optional<File>> future = SettableFuture.create();
- queueForDownload(new FileReferenceDownload(fileReference, future));
+ Future<Optional<File>> future = queueForDownload(fileReference);
log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
-
- try {
- Optional<File> fileDownloaded;
- try {
- log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download");
- fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
- log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded");
- } catch (TimeoutException e) {
- log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out");
- return Optional.empty();
- }
- return fileDownloaded;
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Could not download '" + fileReference.value() + "'");
- }
+ return future;
}
// We don't care about the future in this call
- private synchronized void queueForDownload(FileReference fileReference) {
- queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
+ private Future<Optional<File>> queueForDownload(FileReference fileReference) {
+ return queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
}
- private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) {
+ private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) {
fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
+ return fileReferenceDownload.future();
}
public FileReferenceDownloader fileReferenceDownloader() {
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index fb511411128..9de4c1fcd5b 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -2,13 +2,14 @@
package com.yahoo.vespa.filedistribution;
+import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import java.io.File;
import java.util.Optional;
-public class FileReferenceDownload {
+class FileReferenceDownload {
private final FileReference fileReference;
private final SettableFuture<Optional<File>> future;
@@ -20,9 +21,7 @@ public class FileReferenceDownload {
FileReference fileReference() {
return fileReference;
}
-
SettableFuture<Optional<File>> future() {
return future;
}
-
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index b51a4b68212..3a9af23e6d8 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -20,6 +20,7 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -44,10 +45,7 @@ public class FileReferenceDownloader {
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
- private ExecutorService readFromQueueExecutor =
- Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue"));
private final ConnectionPool connectionPool;
- private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>();
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
private final Duration downloadTimeout;
@@ -56,67 +54,57 @@ public class FileReferenceDownloader {
FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
this.connectionPool = connectionPool;
this.downloadTimeout = timeout;
- readFromQueueExecutor.submit(this::readFromQueue);
this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
}
- private synchronized Optional<File> startDownload(FileReference fileReference,
- Duration timeout,
- FileReferenceDownload fileReferenceDownload)
- throws ExecutionException, InterruptedException, TimeoutException {
- downloads.put(fileReference, fileReferenceDownload);
- setDownloadStatus(fileReference.value(), 0.0);
-
- int numAttempts = 0;
+ private void startDownload(FileReference fileReference, Duration timeout,
+ FileReferenceDownload fileReferenceDownload)
+ {
+ synchronized (downloads) {
+ downloads.put(fileReference, fileReferenceDownload);
+ downloadStatus.put(fileReference, 0.0);
+ }
+ long end = System.currentTimeMillis() + timeout.toMillis();
boolean downloadStarted = false;
- do {
- if (startDownloadRpc(fileReference))
- downloadStarted = true;
- else
- Thread.sleep(100);
- } while (!downloadStarted && ++numAttempts <= 10); // TODO: How long/many times to retry?
-
- if (downloadStarted) {
- return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
- } else {
+ while ((System.currentTimeMillis() < end) && !downloadStarted) {
+ try {
+ if (startDownloadRpc(fileReference)) {
+ downloadStarted = true;
+ } else {
+ Thread.sleep(10);
+ }
+ }
+ catch (InterruptedException | ExecutionException e) {}
+ }
+
+ if ( !downloadStarted) {
fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
- downloads.remove(fileReference);
- return Optional.empty();
+ synchronized (downloads) {
+ downloads.remove(fileReference);
+ }
}
}
- synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
- downloadQueue.add(fileReferenceDownload);
+ void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
+ log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
+ downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
}
void receiveFile(FileReferenceData fileReferenceData) {
fileReceiver.receiveFile(fileReferenceData);
}
- synchronized Set<FileReference> queuedDownloads() {
- return downloadQueue.stream()
- .map(FileReferenceDownload::fileReference)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
-
- private void readFromQueue() {
- do {
- FileReferenceDownload fileReferenceDownload = downloadQueue.poll();
- if (fileReferenceDownload == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) { /* ignore for now */}
+ void completedDownloading(FileReference fileReference, File file) {
+ synchronized (downloads) {
+ FileReferenceDownload download = downloads.get(fileReference);
+ if (download != null) {
+ downloadStatus.put(fileReference, 1.0);
+ downloads.remove(fileReference);
+ download.future().set(Optional.of(file));
} else {
- log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
- downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
+ log.warning("Received a file " + fileReference + " I did not ask for. Impossible");
}
- } while (true);
- }
-
- void completedDownloading(FileReference fileReference, File file) {
- if (downloads.containsKey(fileReference))
- downloads.get(fileReference).future().set(Optional.of(file));
- downloadStatus.put(fileReference, 100.0);
+ }
}
private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException {
@@ -146,14 +134,22 @@ public class FileReferenceDownloader {
}
}
- synchronized boolean isDownloading(FileReference fileReference) {
- return downloads.containsKey(fileReference);
+ boolean isDownloading(FileReference fileReference) {
+ synchronized (downloads) {
+ return downloads.containsKey(fileReference);
+ }
}
- synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
- FileReferenceDownload fileReferenceDownload = downloads.get(fileReference);
- fileReferenceDownload.future().addListener(runnable, downloadExecutor);
- return fileReferenceDownload.future();
+ ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
+ synchronized (downloads) {
+ FileReferenceDownload download = downloads.get(fileReference);
+ if (download != null) {
+ download.future().addListener(runnable, downloadExecutor);
+ return download.future();
+ }
+ }
+ return null;
+
}
private void execute(Request request, Connection connection) {
@@ -173,15 +169,26 @@ public class FileReferenceDownloader {
}
double downloadStatus(String file) {
- return downloadStatus.getOrDefault(new FileReference(file), 0.0);
+ double status = 0.0;
+ synchronized (downloads) {
+ Double download = downloadStatus.get(new FileReference(file));
+ if (download != null) {
+ status = download;
+ }
+ }
+ return status;
}
- void setDownloadStatus(String file, double percentageDownloaded) {
- downloadStatus.put(new FileReference(file), percentageDownloaded);
+ void setDownloadStatus(String file, double completeness) {
+ synchronized (downloads) {
+ downloadStatus.put(new FileReference(file), completeness);
+ }
}
Map<FileReference, Double> downloadStatus() {
- return ImmutableMap.copyOf(downloadStatus);
+ synchronized (downloads) {
+ return ImmutableMap.copyOf(downloadStatus);
+ }
}
public ConnectionPool connectionPool() {