diff options
7 files changed, 65 insertions, 53 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index abcd3e13bfa..001ef751e69 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -21,6 +21,7 @@ import com.yahoo.vespa.filedistribution.CompressedFileReference; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDataBlob; +import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import com.yahoo.yolean.Exceptions; @@ -148,18 +149,27 @@ public class FileServer { private void serveFile(String fileReference, Request request, Receiver receiver) { FileApiErrorCodes result; try { - log.log(LogLevel.DEBUG, () -> "Received request for reference '" + fileReference + "'"); + log.log(LogLevel.DEBUG, () -> "Received request for reference '" + fileReference + "' from " + request.target()); result = hasFile(fileReference) ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND; if (result == FileApiErrorCodes.OK) { startFileServing(fileReference, receiver); } else { - download(new FileReference(fileReference)); + // Non-zero second parameter means that the request should never lead + // to a new download typically because the request comes from another config server. + // This is to avoid config servers asking each other for a file that does not exist + if (request.parameters().size() == 1 || request.parameters().get(1).asInt32() == 0) { + log.log(LogLevel.DEBUG, "File not found, downloading from another source"); + downloader.getFile(new FileReferenceDownload(new FileReference(fileReference), false /* downloadFromOtherSourceIfNotFound */)); + } else { + log.log(LogLevel.DEBUG, "File not found, will not download from another source since request came from another config server"); + result = FileApiErrorCodes.NOT_FOUND; + } } } catch (IllegalArgumentException e) { result = FileApiErrorCodes.NOT_FOUND; - log.warning("Failed serving file reference '" + fileReference + "' with error " + e.toString()); + log.warning("Failed serving file reference '" + fileReference + "', request was from " + request.target() + ", with error " + e.toString()); } request.returnValues() .add(new Int32Value(result.getCode())) @@ -167,10 +177,6 @@ public class FileServer { request.returnRequest(); } - public void download(FileReference fileReference) { - downloader.getFile(fileReference); - } - public FileDownloader downloader() { return downloader; } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 33b7eb928e9..a1066524f9c 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -44,6 +44,7 @@ import com.yahoo.vespa.config.server.tenant.Tenants; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReceiver; import com.yahoo.vespa.filedistribution.FileReferenceData; +import com.yahoo.vespa.filedistribution.FileReferenceDownload; import java.nio.ByteBuffer; import java.util.Collection; @@ -61,7 +62,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -206,7 +206,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { getSupervisor().addMethod(new Method("printStatistics", "", "s", this, "printStatistics") .methodDesc("printStatistics") .returnDesc(0, "statistics", "Statistics for server")); - getSupervisor().addMethod(new Method("filedistribution.serveFile", "s", "is", this, "serveFile")); + // TODO: Change parameters to "si" instead of "s*" when all clients have been updated + getSupervisor().addMethod(new Method("filedistribution.serveFile", "s*", "is", this, "serveFile")); getSupervisor().addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this, "setFileReferencesToDownload") .methodDesc("set which file references to download") @@ -534,11 +535,10 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @SuppressWarnings({"UnusedDeclaration"}) public final void setFileReferencesToDownload(Request req) { String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); - List<FileReference> fileReferences = Stream.of(fileReferenceStrings) + Stream.of(fileReferenceStrings) .map(FileReference::new) - .collect(Collectors.toList()); - downloader.queueForAsyncDownload(fileReferences); - + .forEach(fileReference -> downloader.queueForAsyncDownload( + new FileReferenceDownload(fileReference, false /* downloadFromOtherSourceIfNotFound */))); req.returnValues().add(new Int32Value(0)); } } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java index f24d76fe8be..3b15d5c83b5 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java @@ -13,14 +13,13 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.log.LogLevel; import java.io.File; -import java.util.List; +import java.util.Arrays; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * An RPC server that handles file distribution requests. @@ -104,12 +103,10 @@ public class FileDistributionRpcServer { @SuppressWarnings({"UnusedDeclaration"}) public final void setFileReferencesToDownload(Request req) { - String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); - List<FileReference> fileReferences = Stream.of(fileReferenceStrings) + Arrays.stream(req.parameters().get(0).asStringArray()) .map(FileReference::new) - .collect(Collectors.toList()); - downloader.queueForAsyncDownload(fileReferences); - + .forEach(fileReference -> downloader.queueForAsyncDownload( + new FileReferenceDownload(fileReference))); req.returnValues().add(new Int32Value(0)); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 02d6e1ad022..f2a82ac6ead 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -10,7 +10,6 @@ import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.time.Duration; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -48,16 +47,21 @@ public class FileDownloader { } public Optional<File> getFile(FileReference fileReference) { + return getFile(new FileReferenceDownload(fileReference)); + } + + public Optional<File> getFile(FileReferenceDownload fileReferenceDownload) { try { - return getFutureFile(fileReference).get(timeout.toMillis(), TimeUnit.MILLISECONDS); + return getFutureFile(fileReferenceDownload).get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - log.log(LogLevel.WARNING, "Failed downloading file, removing from download queue: " + e.getMessage()); - fileReferenceDownloader.failedDownloading(fileReference); + log.log(LogLevel.WARNING, "Failed downloading '" + fileReferenceDownload.fileReference().value() + "', removing from download queue: " + e.getMessage()); + fileReferenceDownloader.failedDownloading(fileReferenceDownload.fileReference()); return Optional.empty(); } } - private Future<Optional<File>> getFutureFile(FileReference fileReference) { + private Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { + FileReference fileReference = fileReferenceDownload.fileReference(); Objects.requireNonNull(fileReference, "file reference cannot be null"); File directory = new File(downloadDirectory, fileReference.value()); log.log(LogLevel.DEBUG, () -> "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); @@ -70,21 +74,10 @@ public class FileDownloader { } else { log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' not found in " + directory.getAbsolutePath() + ", starting download"); - return queueForAsyncDownload(fileReference, timeout); + return queueForAsyncDownload(fileReferenceDownload, timeout); } } - // Start downloading, but there is no Future used get file being downloaded - public void queueForAsyncDownload(List<FileReference> fileReferences) { - fileReferences.forEach(fileReference -> { - if (fileReferenceDownloader.isDownloading(fileReference)) { - log.log(LogLevel.DEBUG, () -> "Already downloading '" + fileReference.value() + "'"); - } else { - queueForAsyncDownload(fileReference); - } - }); - } - void receiveFile(FileReferenceData fileReferenceData) { fileReferenceDownloader.receiveFile(fileReferenceData); } @@ -117,20 +110,20 @@ public class FileDownloader { return Optional.empty(); } - private synchronized Future<Optional<File>> queueForAsyncDownload(FileReference fileReference, Duration timeout) { - Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); + public synchronized Future<Optional<File>> queueForAsyncDownload(FileReferenceDownload fileReferenceDownload, Duration timeout) { + FileReference fileReference = fileReferenceDownload.fileReference(); + Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReferenceDownload)); if (inProgress != null) { log.log(LogLevel.DEBUG, () -> "Already downloading '" + fileReference.value() + "'"); return inProgress; } - Future<Optional<File>> future = queueForAsyncDownload(fileReference); + Future<Optional<File>> future = queueForAsyncDownload(fileReferenceDownload); log.log(LogLevel.DEBUG, () -> "Queued '" + fileReference.value() + "' for download with timeout " + timeout); return future; } - private Future<Optional<File>> queueForAsyncDownload(FileReference fileReference) { - FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(fileReference, SettableFuture.create()); + public Future<Optional<File>> queueForAsyncDownload(FileReferenceDownload fileReferenceDownload) { fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); return fileReferenceDownload.future(); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index 904eaae8c4a..4a9fadf1a61 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -8,19 +8,33 @@ import com.yahoo.config.FileReference; import java.io.File; import java.util.Optional; -class FileReferenceDownload { +public class FileReferenceDownload { + private final FileReference fileReference; private final SettableFuture<Optional<File>> future; + // If a config server wants to download from another config server (because it does not have the + // file itself) we set this flag to true to avoid an eternal loop + private final boolean downloadFromOtherSourceIfNotFound; + + public FileReferenceDownload(FileReference fileReference) { + this(fileReference, true); + } - FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) { + public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound) { this.fileReference = fileReference; - this.future = future; + this.future = SettableFuture.create(); + this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound; } FileReference fileReference() { return fileReference; } + SettableFuture<Optional<File>> future() { return future; } + + boolean downloadFromOtherSourceIfNotFound() { + return downloadFromOtherSourceIfNotFound; + } } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 2084c00d31b..1008df229f1 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -6,6 +6,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.FileReference; import com.yahoo.jrt.ErrorCode; +import com.yahoo.jrt.Int32Value; import com.yahoo.jrt.Request; import com.yahoo.jrt.StringValue; import com.yahoo.log.LogLevel; @@ -58,7 +59,7 @@ public class FileReferenceDownloader { boolean downloadStarted = false; while ((System.currentTimeMillis() < end) && !downloadStarted) { try { - if (startDownloadRpc(fileReference)) { + if (startDownloadRpc(fileReferenceDownload)) { downloadStarted = true; } else { Thread.sleep(sleepBetweenRetries.toMillis()); @@ -109,19 +110,21 @@ public class FileReferenceDownloader { } } - private boolean startDownloadRpc(FileReference fileReference) { + private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload) { Connection connection = connectionPool.getCurrent(); Request request = new Request("filedistribution.serveFile"); - request.parameters().add(new StringValue(fileReference.value())); + String fileReference = fileReferenceDownload.fileReference().value(); + request.parameters().add(new StringValue(fileReference)); + request.parameters().add(new Int32Value(fileReferenceDownload.downloadFromOtherSourceIfNotFound() ? 0 : 1)); execute(request, connection); if (validateResponse(request)) { log.log(LogLevel.DEBUG, () -> "Request callback, OK. Req: " + request + "\nSpec: " + connection); if (request.returnValues().get(0).asInt32() == 0) { - log.log(LogLevel.DEBUG, () -> "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress()); + log.log(LogLevel.DEBUG, () -> "Found file reference '" + fileReference + "' available at " + connection.getAddress()); return true; } else { - log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress()); + log.log(LogLevel.DEBUG, "File reference '" + fileReference + "' not found for " + connection.getAddress()); connectionPool.setNewCurrentConnection(); return false; } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index d433dc25105..e0ef2ecf7e4 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -22,7 +22,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; -import java.util.List; import java.util.Optional; import static com.yahoo.jrt.ErrorCode.CONNECTION; @@ -190,8 +189,8 @@ public class FileDownloaderTest { FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries); FileReference foo = new FileReference("foo"); FileReference bar = new FileReference("bar"); - List<FileReference> fileReferences = Arrays.asList(foo, bar); - fileDownloader.queueForAsyncDownload(fileReferences); + fileDownloader.queueForAsyncDownload(new FileReferenceDownload(foo)); + fileDownloader.queueForAsyncDownload(new FileReferenceDownload(bar)); // Verify download status assertDownloadStatus(fileDownloader, foo, 0.0); |