diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /fileacquirer/src/main/java/com/yahoo/filedistribution |
Publish
Diffstat (limited to 'fileacquirer/src/main/java/com/yahoo/filedistribution')
9 files changed, 372 insertions, 0 deletions
diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirer.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirer.java new file mode 100644 index 00000000000..0913b3cd380 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirer.java @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +import com.yahoo.config.FileReference; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +/** + * Retrieves the path to a file or directory on the local file system + * that has been transferred with the vespa file distribution + * mechanism. + * + * @author Tony Vaagenes + */ +public interface FileAcquirer { + + /** + * Returns the path to a file or directory corresponding to the + * given file reference. File references are produced by the + * config system. + * + * @throws TimeoutException if the file or directory could not be retrieved in time. + * @throws FileReferenceDoesNotExistException if the file is no + * longer available(due to reloading of config). + */ + File waitFor(FileReference fileReference, long timeout, TimeUnit timeUnit) throws InterruptedException; + + void shutdown(); + +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirerFactory.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirerFactory.java new file mode 100644 index 00000000000..e26ff838cb0 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirerFactory.java @@ -0,0 +1,13 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +/** + * Hides the real file acquirer type from 3rd party developers. + * Not intended to be used by 3rd parties. + * @author tonytv + */ +public class FileAcquirerFactory { + public static FileAcquirer create(String configId) { + return new FileAcquirerImpl(configId); + } +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirerImpl.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirerImpl.java new file mode 100644 index 00000000000..8bbb0817be8 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileAcquirerImpl.java @@ -0,0 +1,173 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.cloud.config.filedistribution.*; +import com.yahoo.config.FileReference; +import com.yahoo.jrt.*; +import com.yahoo.log.LogLevel; + +import java.util.logging.Logger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; +import java.io.File; + +/** + * Retrieves the path to a file or directory on the local file system + * that has been transferred with the vespa file distribution + * mechanism. + * + * Intended to be the only real implementation of FileAcquirer. + * + * @author Tony Vaagenes + */ +class FileAcquirerImpl implements FileAcquirer { + static final class FileDistributionErrorCode { + public static final int baseErrorCode = 0x10000; + public static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; + + public static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; + public static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; + } + + private static final Logger log = Logger.getLogger(FileAcquirerImpl.class.getName()); + + private final Supervisor supervisor = new Supervisor(new Transport()); + private final ConfigSubscriber configSubscriber; + + private class Connection implements ConfigSubscriber.SingleSubscriber<FiledistributorrpcConfig> { + private final Lock targetLock = new ReentrantLock(); + private Target target; + + private volatile Spec spec; + private long pauseTime = 0; //milliseconds + + private long nextLogTime = 0; + private long logCount = 0; + + private void connect(Timer timer) throws InterruptedException { + while (timer.isTimeLeft()) { + pause(); + target = supervisor.connectSync(spec); + if (target.isValid()) { + log.log(LogLevel.DEBUG, "Successfully connected to '" + spec + "', this = " + System.identityHashCode(this)); + pauseTime = 0; + logCount = 0; + return; + } else { + logWarning(); + } + } + } + + private void pause() throws InterruptedException { + if (pauseTime > 0) { + Thread.sleep(pauseTime); + pauseTime = Math.min((long)(pauseTime*1.5), TimeUnit.MINUTES.toMillis(1)); + } else { + pauseTime = 500; + } + } + + private void logWarning() { + if (logCount == 0 || System.currentTimeMillis() > nextLogTime ) { + log.warning("Could not connect to the file distributor '" + spec.toString() + "'" + " - " + this + "@" + System.identityHashCode(this)); + + nextLogTime = System.currentTimeMillis() + + Math.min(TimeUnit.DAYS.toMillis(1), + TimeUnit.SECONDS.toMillis(30) * (++logCount)); + log.info("Next log time = " + nextLogTime + ", current = " + System.currentTimeMillis()); + } + } + + @Override + public void configure(FiledistributorrpcConfig filedistributorrpcConfig) { + spec = new Spec(filedistributorrpcConfig.connectionspec()); + } + + public Target getTarget(Timer timer) throws InterruptedException { + TimeUnit unit = TimeUnit.MILLISECONDS; + + targetLock.tryLock(timer.timeLeft(unit) , unit ); + try { + if (target == null || !target.isValid()) + connect(timer); + return target; + } finally { + targetLock.unlock(); + } + } + } + + private final Connection connection = new Connection(); + + private boolean temporaryError(int errorCode) { + switch (errorCode) { + case ErrorCode.ABORT: + case ErrorCode.CONNECTION: + case ErrorCode.GENERAL_ERROR: + case ErrorCode.OVERLOAD: + case ErrorCode.TIMEOUT: + return true; + default: + return false; + } + } + + public FileAcquirerImpl(String configId) { + configSubscriber = new ConfigSubscriber(); + configSubscriber.subscribe(connection, FiledistributorrpcConfig.class, configId); + } + + public void shutdown() { + configSubscriber.close(); + supervisor.transport().shutdown().join(); + } + + /** + * Returns the path to a file or directory corresponding to the + * given file reference. File references are produced by the + * config system. + * + * @throws TimeoutException if the file or directory could not be + * retrieved in time. + * @throws FileReferenceDoesNotExistException if the file is no + * longer available(due to reloading of config). + */ + public File waitFor(FileReference fileReference, long timeout, TimeUnit timeUnit) + throws InterruptedException { + Timer timer = new Timer(timeout, timeUnit); + do { + Target target = connection.getTarget(timer); + if (target == null) + break; + + Request request = new Request("waitFor"); + request.parameters().add(new StringValue(fileReference.value())); + + log.log(LogLevel.DEBUG, "InvokeSync waitFor " + fileReference + " with waiting time " + timeout + timeUnit); + target.invokeSync(request, timer.timeLeft(TimeUnit.SECONDS)); + + if (request.checkReturnTypes("s")) { + return new File(request.returnValues().get(0).asString()); + } else if (!request.isError()) { + throw new RuntimeException("Invalid answer from file distributor: " + request.returnValues()); + } else if (temporaryError(request.errorCode())) { + log.log(LogLevel.INFO, "Retrying waitFor: " + request.errorCode() + " -- " + request.errorMessage()); + Thread.sleep(1000); + } else { + if (request.errorCode() == FileDistributionErrorCode.fileReferenceDoesNotExists) + throw new FileReferenceDoesNotExistException(fileReference.value()); + else if (request.errorCode() == FileDistributionErrorCode.fileReferenceRemoved) + throw new FileReferenceRemovedException(fileReference.value()); + else { + throw new RuntimeException("Wait for blocking failed:" + + request.errorMessage() + + " (" + request.errorCode() + ")"); + } + } + } while ( timer.isTimeLeft() ); + throw new TimeoutException("Timed out waiting for " + fileReference + " after " + timeout + " " + timeUnit.name().toLowerCase()); + } +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileReferenceDoesNotExistException.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileReferenceDoesNotExistException.java new file mode 100644 index 00000000000..d610663bdbc --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileReferenceDoesNotExistException.java @@ -0,0 +1,16 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +/** + * @author tonytv + */ +public class FileReferenceDoesNotExistException extends RuntimeException { + + public final String fileReference; + + FileReferenceDoesNotExistException(String fileReference) { + super("Could not retrieve file with file reference '" + fileReference + "'"); + this.fileReference = fileReference; + } + +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileReferenceRemovedException.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileReferenceRemovedException.java new file mode 100644 index 00000000000..9847402b506 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/FileReferenceRemovedException.java @@ -0,0 +1,14 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +/** + * @author tonytv + */ +public class FileReferenceRemovedException extends RuntimeException { + public final String fileReference; + + FileReferenceRemovedException(String fileReference) { + super("The file with file reference '" + fileReference + "' was removed while waiting."); + this.fileReference = fileReference; + } +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/MockFileAcquirer.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/MockFileAcquirer.java new file mode 100644 index 00000000000..b2ffa2c3f88 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/MockFileAcquirer.java @@ -0,0 +1,63 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +import com.yahoo.config.FileReference; +import java.io.File; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * For use when testing searchers that uses file distribution. + * @author tonytv + */ +public abstract class MockFileAcquirer implements FileAcquirer { + /** Creates a FileAcquirer that always returns the given file. **/ + public static FileAcquirer returnFile(final File file) { + return new MockFileAcquirer() { + @Override + public File waitFor(FileReference fileReference, + long timeout, TimeUnit timeUnit) throws InterruptedException { + return file; + } + }; + } + + /** Creates a FileAcquirer that maps from fileReference.value to a file. **/ + public static FileAcquirer returnFiles(final Map<String, File> files) { + return new MockFileAcquirer() { + @Override + public File waitFor(FileReference fileReference, + long timeout, TimeUnit timeUnit) throws InterruptedException { + return files.get(fileReference.value()); + } + }; + } + + /** Creates a FileAcquirer that throws TimeoutException **/ + public static FileAcquirer throwTimeoutException() { + return new MockFileAcquirer() { + @Override + public File waitFor(FileReference fileReference, + long timeout, TimeUnit timeUnit) throws InterruptedException { + throw new TimeoutException("Timed out"); + } + }; + } + + /** Creates a FileAcquirer that throws FileReferenceDoesNotExistException **/ + public static FileAcquirer throwFileReferenceDoesNotExistException() { + return new MockFileAcquirer() { + @Override + public File waitFor(FileReference fileReference, + long timeout, TimeUnit timeUnit) throws InterruptedException { + throw new FileReferenceDoesNotExistException(null); + } + }; + } + + @Override + public void shutdown() {} +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/TimeoutException.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/TimeoutException.java new file mode 100644 index 00000000000..b1aeaf13a92 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/TimeoutException.java @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +/** + * @author tonytv + */ +public class TimeoutException extends RuntimeException { + + /** Do not use this constructor */ + public TimeoutException() { + super(); + } + + public TimeoutException(String message) { + super(message); + } + + public TimeoutException(String message,Throwable cause) { + super(message,cause); + } + +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/Timer.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/Timer.java new file mode 100644 index 00000000000..a3dcc141eb5 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/Timer.java @@ -0,0 +1,33 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.filedistribution.fileacquirer; + +import java.util.concurrent.TimeUnit; + +/** + * Handles timeout of a task. + * @author tonytv + */ +class Timer { + private final long endTime; + + private long timeLeft() { + return endTime - System.currentTimeMillis(); + } + + public Timer(long timeout, TimeUnit timeUnit) { + endTime = System.currentTimeMillis() + timeUnit.toMillis(timeout); + } + + public long timeLeft(TimeUnit timeUnit) { + long remaining = timeUnit.convert(timeLeft(), TimeUnit.MILLISECONDS); + + if (remaining > 0) + return remaining; + else + throw new TimeoutException("Timed out"); + } + + public boolean isTimeLeft() { + return timeLeft() > 0; + } +} diff --git a/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/package-info.java b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/package-info.java new file mode 100644 index 00000000000..093a5b15600 --- /dev/null +++ b/fileacquirer/src/main/java/com/yahoo/filedistribution/fileacquirer/package-info.java @@ -0,0 +1,7 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi +package com.yahoo.filedistribution.fileacquirer; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; |