summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-12-07 17:46:51 +0100
committerGitHub <noreply@github.com>2017-12-07 17:46:51 +0100
commitbe985ed5e179c9cfc92d3b5961930a8c23b2b533 (patch)
treef8a72ec69cc35effdceca553e5d81b7fefec34b8
parent31f7223c0b2fe8f1d23e9576ff498f06c125ee4d (diff)
parent24ec352f4288afd36a6da2e8577221e0fdccda84 (diff)
Merge pull request #4384 from vespa-engine/balder/rewire-downloader
Balder/rewire downloader
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java20
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java48
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java37
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java62
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java5
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java123
6 files changed, 148 insertions, 147 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java
index dd1f0d36abd..d352637a835 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java
@@ -60,27 +60,21 @@ class ClientUpdater {
log.log(LogLevel.DEBUG, "Delayed response queue has " + responseDelayQueue.size() + " elements");
}
}
- DelayedResponse[] responses = new DelayedResponse[1];
- responses = responseDelayQueue.toArray(responses);
+ DelayedResponse[] responses = responseDelayQueue.toArray(new DelayedResponse[0]);
boolean found = false;
- if (responses.length > 0) {
- for (DelayedResponse response : responses) {
- JRTServerConfigRequest request = response.getRequest();
- if (request.getConfigKey().equals(config.getKey())) {
- if (!delayedResponses.remove(response)) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Could not remove " + config.getKey() + " from delayed delayedResponses queue, already removed");
- }
- continue;
- }
+ for (DelayedResponse response : responses) {
+ JRTServerConfigRequest request = response.getRequest();
+ if (request.getConfigKey().equals(config.getKey())) {
+ if (delayedResponses.remove(response)) {
found = true;
if (log.isLoggable(LogLevel.DEBUG)) {
log.log(LogLevel.DEBUG, "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration());
}
rpcServer.returnOkResponse(request, config);
+ } else {
+ log.log(LogLevel.ERROR, "Could not remove " + config.getKey() + " from delayed delayedResponses queue, already removed");
}
}
-
}
if (!found) {
log.log(LogLevel.DEBUG, "Found no recipient for " + config.getKey() + " in delayed response queue");
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index 958f26632ef..a8a6b4d6776 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -7,6 +7,9 @@ import com.yahoo.config.FileReference;
import com.yahoo.config.model.api.FileDistribution;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.io.IOUtils;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.net.HostName;
@@ -30,9 +33,23 @@ public class FileServer {
private static final Logger log = Logger.getLogger(FileServer.class.getName());
private final FileDirectory root;
- private final ExecutorService executor;
+ private final ExecutorService pushExecutor;
+ private final ExecutorService pullExecutor;
private final FileDownloader downloader;
+ private enum FileApiErrorCodes {
+ OK(0, "OK"),
+ NOT_FOUND(1, "Filereference not found");
+ private final int code;
+ private final String description;
+ FileApiErrorCodes(int code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+ int getCode() { return code; }
+ String getDescription() { return description; }
+ }
+
public static class ReplayStatus {
private final int code;
private final String description;
@@ -62,7 +79,8 @@ public class FileServer {
private FileServer(ConnectionPool connectionPool, File rootDir) {
this.downloader = new FileDownloader(connectionPool);
this.root = new FileDirectory(rootDir);
- this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ this.pushExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ this.pullExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
public boolean hasFile(String fileName) {
@@ -81,7 +99,7 @@ public class FileServer {
File file = root.getFile(reference);
if (file.exists()) {
- executor.execute(() -> serveFile(reference, target));
+ pushExecutor.execute(() -> serveFile(reference, target));
}
return false;
}
@@ -122,6 +140,30 @@ public class FileServer {
return new FileReferenceData(reference, file.getName(), type, blob);
}
+ public void serveFile(Request request, Receiver receiver) {
+ pullExecutor.execute(() -> serveFile(request.parameters().get(0).asString(), request, receiver));
+ }
+ private void serveFile(String fileReference, Request request, Receiver receiver) {
+ FileApiErrorCodes result;
+ try {
+ // TODO remove once verified in system tests.
+ log.info("Received request for reference '" + fileReference + "'");
+ result = hasFile(fileReference)
+ ? FileApiErrorCodes.OK
+ : FileApiErrorCodes.NOT_FOUND;
+ if (result == FileApiErrorCodes.OK) {
+ startFileServing(fileReference, receiver);
+ } else {
+ download(new FileReference(fileReference));
+ }
+ } catch (IllegalArgumentException e) {
+ result = FileApiErrorCodes.NOT_FOUND;
+ log.warning("Failed serving file reference '" + fileReference + "' with error " + e.toString());
+ }
+ request.returnValues()
+ .add(new Int32Value(result.getCode()))
+ .add(new StringValue(result.getDescription()));
+ }
public void download(FileReference fileReference) {
downloader.getFile(fileReference);
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 5c50fbfc31b..17368b48e59 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -74,20 +74,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
static final int TRACELEVEL_DEBUG = 9;
private static final String THREADPOOL_NAME = "rpcserver worker pool";
private static final long SHUTDOWN_TIMEOUT = 60;
- private enum FileApiErrorCodes {
- OK(0, "OK"),
- NOT_FOUND(1, "Filereference not found");
- private final int code;
- private final String description;
- FileApiErrorCodes(int code, String description) {
- this.code = code;
- this.description = description;
- }
- int getCode() { return code; }
- String getDescription() { return description; }
- }
+
private final Supervisor supervisor = new Supervisor(new Transport());
- private Spec spec = null;
+ private Spec spec;
private final boolean useRequestVersion;
private final boolean hostedVespa;
@@ -455,25 +444,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@SuppressWarnings("UnusedDeclaration")
public final void serveFile(Request request) {
- String fileReference = request.parameters().get(0).asString();
- FileApiErrorCodes result;
- try {
- // TODO remove once verified in system tests.
- log.info("Received request for reference '" + fileReference + "'");
- result = fileServer.hasFile(fileReference)
- ? FileApiErrorCodes.OK
- : FileApiErrorCodes.NOT_FOUND;
- if (result == FileApiErrorCodes.OK) {
- fileServer.startFileServing(fileReference, new FileReceiver(request.target()));
- } else {
- fileServer.download(new FileReference(fileReference));
- }
- } catch (IllegalArgumentException e) {
- result = FileApiErrorCodes.NOT_FOUND;
- log.warning("Failed serving file reference '" + fileReference + "' with error " + e.toString());
- }
- request.returnValues()
- .add(new Int32Value(result.getCode()))
- .add(new StringValue(result.getDescription()));
+ request.detach();
+ fileServer.serveFile(request, new FileReceiver(request.target()));
}
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 9fe5eec54ff..ca8b8c27ad3 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -1,13 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.defaults.Defaults;
-import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.time.Duration;
@@ -17,6 +15,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
@@ -47,17 +46,28 @@ public class FileDownloader {
}
public Optional<File> getFile(FileReference fileReference) {
+ try {
+ return getFutureFile(fileReference).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ return Optional.empty();
+ }
+ }
+
+ public Future<Optional<File>> getFutureFile(FileReference fileReference) {
Objects.requireNonNull(fileReference, "file reference cannot be null");
File directory = new File(downloadDirectory, fileReference.value());
log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
Optional<File> file = getFileFromFileSystem(fileReference, directory);
if (file.isPresent()) {
- return file;
+ SettableFuture<Optional<File>> future = SettableFuture.create();
+ future.set(file);
+ return future;
} else {
log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " +
directory.getAbsolutePath() + ", starting download");
return queueForDownload(fileReference, timeout);
+
}
}
@@ -86,11 +96,9 @@ public class FileDownloader {
if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
File file = files[0];
if (!file.exists()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "' does not exist");
+ throw new RuntimeException("File with reference '" + fileReference.value() + "' does not exist");
} else if (!file.canRead()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "'exists, but unable to read it");
+ throw new RuntimeException("File with reference '" + fileReference.value() + "'exists, but unable to read it");
} else {
fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0);
return Optional.of(file);
@@ -99,46 +107,26 @@ public class FileDownloader {
return Optional.empty();
}
- private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) {
- if (fileReferenceDownloader.isDownloading(fileReference)) {
+ private synchronized Future<Optional<File>> queueForDownload(FileReference fileReference, Duration timeout) {
+ Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
+ if (inProgress != null) {
log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'");
- ListenableFuture<Optional<File>> future =
- fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " +
- Exceptions.toMessageString(e));
- }
+ return inProgress;
}
- SettableFuture<Optional<File>> future = SettableFuture.create();
- queueForDownload(new FileReferenceDownload(fileReference, future));
+ Future<Optional<File>> future = queueForDownload(fileReference);
log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
-
- try {
- Optional<File> fileDownloaded;
- try {
- log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download");
- fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
- log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded");
- } catch (TimeoutException e) {
- log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out");
- return Optional.empty();
- }
- return fileDownloaded;
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Could not download '" + fileReference.value() + "'");
- }
+ return future;
}
// We don't care about the future in this call
- private synchronized void queueForDownload(FileReference fileReference) {
- queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
+ private Future<Optional<File>> queueForDownload(FileReference fileReference) {
+ return queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
}
- private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) {
+ private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) {
fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
+ return fileReferenceDownload.future();
}
public FileReferenceDownloader fileReferenceDownloader() {
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index fb511411128..9de4c1fcd5b 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -2,13 +2,14 @@
package com.yahoo.vespa.filedistribution;
+import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import java.io.File;
import java.util.Optional;
-public class FileReferenceDownload {
+class FileReferenceDownload {
private final FileReference fileReference;
private final SettableFuture<Optional<File>> future;
@@ -20,9 +21,7 @@ public class FileReferenceDownload {
FileReference fileReference() {
return fileReference;
}
-
SettableFuture<Optional<File>> future() {
return future;
}
-
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index b51a4b68212..3a9af23e6d8 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -20,6 +20,7 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -44,10 +45,7 @@ public class FileReferenceDownloader {
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
- private ExecutorService readFromQueueExecutor =
- Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue"));
private final ConnectionPool connectionPool;
- private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>();
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
private final Duration downloadTimeout;
@@ -56,67 +54,57 @@ public class FileReferenceDownloader {
FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
this.connectionPool = connectionPool;
this.downloadTimeout = timeout;
- readFromQueueExecutor.submit(this::readFromQueue);
this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
}
- private synchronized Optional<File> startDownload(FileReference fileReference,
- Duration timeout,
- FileReferenceDownload fileReferenceDownload)
- throws ExecutionException, InterruptedException, TimeoutException {
- downloads.put(fileReference, fileReferenceDownload);
- setDownloadStatus(fileReference.value(), 0.0);
-
- int numAttempts = 0;
+ private void startDownload(FileReference fileReference, Duration timeout,
+ FileReferenceDownload fileReferenceDownload)
+ {
+ synchronized (downloads) {
+ downloads.put(fileReference, fileReferenceDownload);
+ downloadStatus.put(fileReference, 0.0);
+ }
+ long end = System.currentTimeMillis() + timeout.toMillis();
boolean downloadStarted = false;
- do {
- if (startDownloadRpc(fileReference))
- downloadStarted = true;
- else
- Thread.sleep(100);
- } while (!downloadStarted && ++numAttempts <= 10); // TODO: How long/many times to retry?
-
- if (downloadStarted) {
- return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
- } else {
+ while ((System.currentTimeMillis() < end) && !downloadStarted) {
+ try {
+ if (startDownloadRpc(fileReference)) {
+ downloadStarted = true;
+ } else {
+ Thread.sleep(10);
+ }
+ }
+ catch (InterruptedException | ExecutionException e) {}
+ }
+
+ if ( !downloadStarted) {
fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
- downloads.remove(fileReference);
- return Optional.empty();
+ synchronized (downloads) {
+ downloads.remove(fileReference);
+ }
}
}
- synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
- downloadQueue.add(fileReferenceDownload);
+ void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
+ log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
+ downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
}
void receiveFile(FileReferenceData fileReferenceData) {
fileReceiver.receiveFile(fileReferenceData);
}
- synchronized Set<FileReference> queuedDownloads() {
- return downloadQueue.stream()
- .map(FileReferenceDownload::fileReference)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
-
- private void readFromQueue() {
- do {
- FileReferenceDownload fileReferenceDownload = downloadQueue.poll();
- if (fileReferenceDownload == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) { /* ignore for now */}
+ void completedDownloading(FileReference fileReference, File file) {
+ synchronized (downloads) {
+ FileReferenceDownload download = downloads.get(fileReference);
+ if (download != null) {
+ downloadStatus.put(fileReference, 1.0);
+ downloads.remove(fileReference);
+ download.future().set(Optional.of(file));
} else {
- log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
- downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
+ log.warning("Received a file " + fileReference + " I did not ask for. Impossible");
}
- } while (true);
- }
-
- void completedDownloading(FileReference fileReference, File file) {
- if (downloads.containsKey(fileReference))
- downloads.get(fileReference).future().set(Optional.of(file));
- downloadStatus.put(fileReference, 100.0);
+ }
}
private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException {
@@ -146,14 +134,22 @@ public class FileReferenceDownloader {
}
}
- synchronized boolean isDownloading(FileReference fileReference) {
- return downloads.containsKey(fileReference);
+ boolean isDownloading(FileReference fileReference) {
+ synchronized (downloads) {
+ return downloads.containsKey(fileReference);
+ }
}
- synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
- FileReferenceDownload fileReferenceDownload = downloads.get(fileReference);
- fileReferenceDownload.future().addListener(runnable, downloadExecutor);
- return fileReferenceDownload.future();
+ ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
+ synchronized (downloads) {
+ FileReferenceDownload download = downloads.get(fileReference);
+ if (download != null) {
+ download.future().addListener(runnable, downloadExecutor);
+ return download.future();
+ }
+ }
+ return null;
+
}
private void execute(Request request, Connection connection) {
@@ -173,15 +169,26 @@ public class FileReferenceDownloader {
}
double downloadStatus(String file) {
- return downloadStatus.getOrDefault(new FileReference(file), 0.0);
+ double status = 0.0;
+ synchronized (downloads) {
+ Double download = downloadStatus.get(new FileReference(file));
+ if (download != null) {
+ status = download;
+ }
+ }
+ return status;
}
- void setDownloadStatus(String file, double percentageDownloaded) {
- downloadStatus.put(new FileReference(file), percentageDownloaded);
+ void setDownloadStatus(String file, double completeness) {
+ synchronized (downloads) {
+ downloadStatus.put(new FileReference(file), completeness);
+ }
}
Map<FileReference, Double> downloadStatus() {
- return ImmutableMap.copyOf(downloadStatus);
+ synchronized (downloads) {
+ return ImmutableMap.copyOf(downloadStatus);
+ }
}
public ConnectionPool connectionPool() {