diff options
Diffstat (limited to 'configserver')
27 files changed, 382 insertions, 300 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index ca56a200c2c..2a15f724b29 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -96,6 +96,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -631,15 +632,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye .stream() .filter(fileReference -> ! fileReferencesInUse.contains(fileReference)) .filter(fileReference -> isLastFileAccessBefore(new File(fileReferencesPath, fileReference), instant)) - .sorted((a, b) -> { - if (a.equals(b)) - return 0; - else if (lastAccessed(new File(fileReferencesPath, a)) - .isBefore(lastAccessed(new File(fileReferencesPath, b)))) - return -1; - else - return 1; - }) + .sorted(Comparator.comparing(a -> lastAccessed(new File(fileReferencesPath, a)))) .collect(Collectors.toList()); } @@ -804,7 +797,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye .map(lock -> new ApplicationTransaction(lock, transaction)); try (var sessionLock = tenant.getApplicationRepo().lock(applicationId)) { Optional<Session> activeSession = getActiveSession(applicationId); - CompletionWaiter waiter = session.getSessionZooKeeperClient().createActiveWaiter(); + var sessionZooKeeperClient = tenant.getSessionRepository().createSessionZooKeeperClient(session.getSessionId()); + CompletionWaiter waiter = sessionZooKeeperClient.createActiveWaiter(); transaction.add(deactivateCurrentActivateNew(activeSession, session, force)); if (applicationTransaction.isPresent()) { @@ -911,14 +905,10 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye sessionsPerTenant.keySet().forEach(tenant -> tenant.getSessionRepository().deleteExpiredSessions(activeSessions)); } - public int deleteExpiredRemoteSessions(Duration expiryTime) { - return deleteExpiredRemoteSessions(clock, expiryTime); - } - - public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { + public int deleteExpiredRemoteSessions(Clock clock) { return tenantRepository.getAllTenants() .stream() - .map(tenant -> tenant.getSessionRepository().deleteExpiredRemoteSessions(clock, expiryTime)) + .map(tenant -> tenant.getSessionRepository().deleteExpiredRemoteSessions(clock)) .mapToInt(i -> i) .sum(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ReloadListener.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigActivationListener.java index b41f31d9dcb..f7e9e270b9c 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ReloadListener.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ConfigActivationListener.java @@ -7,13 +7,13 @@ import com.yahoo.vespa.config.server.application.ApplicationSet; import java.util.Collection; /** - * A ReloadListener is used to signal to a component that config has been - * reloaded. It only exists because the RpcServer cannot distinguish between a - * successful reload of a new application and a reload of the same application. + * A ConfigActivationListener is used to signal to a component that config has been + * activated. It only exists because the RpcServer cannot distinguish between a + * successful activation of a new application and an activation of the same application. * * @author Ulf Lilleengen */ -public interface ReloadListener { +public interface ConfigActivationListener { /** * Signals the listener that hosts used by a particular tenant. diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelController.java b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelController.java index f6ed98d904b..fd939b91388 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelController.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelController.java @@ -16,7 +16,7 @@ import java.io.StringReader; /** * Handler for global configs that must be resolved using the global SuperModel instance. Deals with - * reloading of config as well. + * activation of config as well. * * @author Ulf Lilleengen */ diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelRequestHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelRequestHandler.java index aa7dceba95c..93bb44e25d3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelRequestHandler.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelRequestHandler.java @@ -43,12 +43,12 @@ public class SuperModelRequestHandler implements RequestHandler { } /** - * Signals that config has been reloaded for an {@link com.yahoo.vespa.config.server.application.Application} + * Signals that config has been activated for an {@link com.yahoo.vespa.config.server.application.Application} * belonging to a tenant. * - * @param applicationSet The reloaded set of {@link com.yahoo.vespa.config.server.application.Application}. + * @param applicationSet The activated set of {@link com.yahoo.vespa.config.server.application.Application}. */ - public synchronized void reloadConfig(ApplicationSet applicationSet) { + public synchronized void activateConfig(ApplicationSet applicationSet) { superModelManager.configActivated(applicationSet); updateHandler(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java index 28487106268..d794fa4e27f 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java @@ -14,7 +14,7 @@ import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.GetConfigRequest; import com.yahoo.vespa.config.protocol.ConfigResponse; import com.yahoo.vespa.config.server.NotFoundException; -import com.yahoo.vespa.config.server.ReloadListener; +import com.yahoo.vespa.config.server.ConfigActivationListener; import com.yahoo.vespa.config.server.RequestHandler; import com.yahoo.vespa.config.server.deploy.TenantFileSystemDirs; import com.yahoo.vespa.config.server.host.HostRegistry; @@ -27,13 +27,11 @@ import com.yahoo.vespa.curator.CompletionTimeoutException; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.curator.transaction.CuratorTransaction; -import com.yahoo.vespa.flags.FetchVector; import com.yahoo.vespa.flags.FlagSource; import com.yahoo.vespa.flags.ListFlag; import com.yahoo.vespa.flags.PermanentFlags; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; - import java.nio.file.Files; import java.nio.file.Paths; import java.time.Clock; @@ -69,7 +67,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica private final Executor zkWatcherExecutor; private final Metrics metrics; private final TenantName tenant; - private final ReloadListener reloadListener; + private final ConfigActivationListener configActivationListener; private final ConfigResponseFactory responseFactory; private final HostRegistry hostRegistry; private final ApplicationMapper applicationMapper = new ApplicationMapper(); @@ -80,7 +78,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica private final ListFlag<String> incompatibleVersions; public TenantApplications(TenantName tenant, Curator curator, StripedExecutor<TenantName> zkWatcherExecutor, - ExecutorService zkCacheExecutor, Metrics metrics, ReloadListener reloadListener, + ExecutorService zkCacheExecutor, Metrics metrics, ConfigActivationListener configActivationListener, ConfigserverConfig configserverConfig, HostRegistry hostRegistry, TenantFileSystemDirs tenantFileSystemDirs, Clock clock, FlagSource flagSource) { this.curator = curator; @@ -91,7 +89,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica this.directoryCache.addListener(this::childEvent); this.directoryCache.start(); this.metrics = metrics; - this.reloadListener = reloadListener; + this.configActivationListener = configActivationListener; this.responseFactory = ConfigResponseFactory.create(configserverConfig); this.tenantMetricUpdater = metrics.getOrCreateMetricUpdater(Metrics.createDimensions(tenant)); this.hostRegistry = hostRegistry; @@ -213,23 +211,22 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica @Override public ConfigResponse resolveConfig(ApplicationId appId, GetConfigRequest req, Optional<Version> vespaVersion) { Application application = getApplication(appId, vespaVersion); - log.log(Level.FINE, () -> TenantRepository.logPre(appId) + "Resolving for tenant '" + tenant + - "' with handler for application '" + application + "'"); + log.log(Level.FINE, () -> TenantRepository.logPre(appId) + "Resolving config"); return application.resolveConfig(req, responseFactory); } - private void notifyReloadListeners(ApplicationSet applicationSet) { + private void notifyConfigActivationListeners(ApplicationSet applicationSet) { if (applicationSet.getAllApplications().isEmpty()) throw new IllegalArgumentException("application set cannot be empty"); - reloadListener.hostsUpdated(applicationSet.getAllApplications().get(0).toApplicationInfo().getApplicationId(), - applicationSet.getAllHosts()); - reloadListener.configActivated(applicationSet); + configActivationListener.hostsUpdated(applicationSet.getAllApplications().get(0).toApplicationInfo().getApplicationId(), + applicationSet.getAllHosts()); + configActivationListener.configActivated(applicationSet); } /** * Activates the config of the given app. Notifies listeners * - * @param applicationSet the {@link ApplicationSet} to be reloaded + * @param applicationSet the {@link ApplicationSet} to be activated */ public void activateApplication(ApplicationSet applicationSet, long activeSessionId) { ApplicationId id = applicationSet.getId(); @@ -239,8 +236,8 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica if (applicationSet.getApplicationGeneration() != activeSessionId) return; // Application activated a new session before we got here. - setLiveApp(applicationSet); - notifyReloadListeners(applicationSet); + setActiveApp(applicationSet); + notifyConfigActivationListeners(applicationSet); } } @@ -257,7 +254,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica if (hasApplication(applicationId)) { applicationMapper.remove(applicationId); hostRegistry.removeHostsForKey(applicationId); - reloadListenersOnRemove(applicationId); + configActivationListenersOnRemove(applicationId); tenantMetricUpdater.setApplications(applicationMapper.numApplications()); metrics.removeMetricUpdater(Metrics.createDimensions(applicationId)); getRemoveApplicationWaiter(applicationId).notifyCompletion(); @@ -279,12 +276,12 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica } } - private void reloadListenersOnRemove(ApplicationId applicationId) { - reloadListener.hostsUpdated(applicationId, hostRegistry.getHostsForKey(applicationId)); - reloadListener.applicationRemoved(applicationId); + private void configActivationListenersOnRemove(ApplicationId applicationId) { + configActivationListener.hostsUpdated(applicationId, hostRegistry.getHostsForKey(applicationId)); + configActivationListener.applicationRemoved(applicationId); } - private void setLiveApp(ApplicationSet applicationSet) { + private void setActiveApp(ApplicationSet applicationSet) { ApplicationId id = applicationSet.getId(); Collection<String> hostsForApp = applicationSet.getAllHosts(); hostRegistry.update(id, hostsForApp); @@ -402,7 +399,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica @Override public void verifyHosts(ApplicationId applicationId, Collection<String> newHosts) { hostRegistry.verifyHosts(applicationId, newHosts); - reloadListener.verifyHostsAreAvailable(applicationId, newHosts); + configActivationListener.verifyHostsAreAvailable(applicationId, newHosts); } public HostValidator<ApplicationId> getHostValidator() { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 47d1193cd4c..068323f7784 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -197,7 +197,6 @@ public class ModelContextImpl implements ModelContext { private final boolean useV8GeoPositions; private final int maxCompactBuffers; private final List<String> ignoredHttpUserAgents; - private final boolean enableServerOcspStapling; private final String mergeThrottlingPolicy; private final double persistenceThrottlingWsDecrementFactor; private final double persistenceThrottlingWsBackoff; @@ -215,6 +214,12 @@ public class ModelContextImpl implements ModelContext { private final boolean mbus_dispatch_on_encode; private final int mbus_threads; private final int mbus_network_threads; + private int mbus_java_num_targets; + private int mbus_java_events_before_wakeup; + private int mbus_cpp_num_targets; + private int mbus_cpp_events_before_wakeup; + private int rpc_num_targets; + private int rpc_events_before_wakeup; public FeatureFlags(FlagSource source, ApplicationId appId, Version version) { this.defaultTermwiseLimit = flagValue(source, appId, version, Flags.DEFAULT_TERM_WISE_LIMIT); @@ -252,7 +257,6 @@ public class ModelContextImpl implements ModelContext { this.useV8GeoPositions = flagValue(source, appId, version, Flags.USE_V8_GEO_POSITIONS); this.maxCompactBuffers = flagValue(source, appId, version, Flags.MAX_COMPACT_BUFFERS); this.ignoredHttpUserAgents = flagValue(source, appId, version, PermanentFlags.IGNORED_HTTP_USER_AGENTS); - this.enableServerOcspStapling = flagValue(source, appId, version, Flags.ENABLE_SERVER_OCSP_STAPLING); this.mergeThrottlingPolicy = flagValue(source, appId, version, Flags.MERGE_THROTTLING_POLICY); this.persistenceThrottlingWsDecrementFactor = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_DECREMENT_FACTOR); this.persistenceThrottlingWsBackoff = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_BACKOFF); @@ -266,6 +270,12 @@ public class ModelContextImpl implements ModelContext { this.enableProxyProtocolMixedMode = flagValue(source, appId, version, Flags.ENABLE_PROXY_PROTOCOL_MIXED_MODE); this.sharedStringRepoNoReclaim = flagValue(source, appId, version, Flags.SHARED_STRING_REPO_NO_RECLAIM); this.logFileCompressionAlgorithm = flagValue(source, appId, version, Flags.LOG_FILE_COMPRESSION_ALGORITHM); + this.mbus_java_num_targets = flagValue(source, appId, version, Flags.MBUS_JAVA_NUM_TARGETS); + this.mbus_java_events_before_wakeup = flagValue(source, appId, version, Flags.MBUS_JAVA_EVENTS_BEFORE_WAKEUP); + this.mbus_cpp_num_targets = flagValue(source, appId, version, Flags.MBUS_CPP_NUM_TARGETS); + this.mbus_cpp_events_before_wakeup = flagValue(source, appId, version, Flags.MBUS_CPP_EVENTS_BEFORE_WAKEUP); + this.rpc_num_targets = flagValue(source, appId, version, Flags.RPC_NUM_TARGETS); + this.rpc_events_before_wakeup = flagValue(source, appId, version, Flags.RPC_EVENTS_BEFORE_WAKEUP); } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @@ -305,7 +315,6 @@ public class ModelContextImpl implements ModelContext { @Override public boolean useV8GeoPositions() { return useV8GeoPositions; } @Override public int maxCompactBuffers() { return maxCompactBuffers; } @Override public List<String> ignoredHttpUserAgents() { return ignoredHttpUserAgents; } - @Override public boolean enableServerOcspStapling() { return enableServerOcspStapling; } @Override public String mergeThrottlingPolicy() { return mergeThrottlingPolicy; } @Override public double persistenceThrottlingWsDecrementFactor() { return persistenceThrottlingWsDecrementFactor; } @Override public double persistenceThrottlingWsBackoff() { return persistenceThrottlingWsBackoff; } @@ -318,6 +327,12 @@ public class ModelContextImpl implements ModelContext { @Override public Architecture adminClusterArchitecture() { return adminClusterArchitecture; } @Override public boolean enableProxyProtocolMixedMode() { return enableProxyProtocolMixedMode; } @Override public boolean sharedStringRepoNoReclaim() { return sharedStringRepoNoReclaim; } + @Override public int mbusJavaRpcNumTargets() { return mbus_java_num_targets; } + @Override public int mbusJavaEventsBeforeWakeup() { return mbus_java_events_before_wakeup; } + @Override public int mbusCppRpcNumTargets() { return mbus_cpp_num_targets; } + @Override public int mbusCppEventsBeforeWakeup() { return mbus_cpp_events_before_wakeup; } + @Override public int rpcNumTargets() { return rpc_num_targets; } + @Override public int rpcEventsBeforeWakeup() { return rpc_events_before_wakeup; } @Override public String logFileCompressionAlgorithm(String defVal) { var fflag = this.logFileCompressionAlgorithm; if (fflag != null && ! fflag.equals("")) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java index 46ae2fd15d5..7f120a88a05 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java @@ -60,16 +60,13 @@ public class FileDirectory { public File getFile(FileReference reference) { ensureRootExist(); File dir = new File(getPath(reference)); - if (!dir.exists()) { + if (!dir.exists()) throw new IllegalArgumentException("File reference '" + reference.value() + "' with absolute path '" + dir.getAbsolutePath() + "' does not exist."); - } - if (!dir.isDirectory()) { + if (!dir.isDirectory()) throw new IllegalArgumentException("File reference '" + reference.value() + "' with absolute path '" + dir.getAbsolutePath() + "' is not a directory."); - } File [] files = dir.listFiles(new Filter()); - if (files == null || files.length == 0) { + if (files == null || files.length == 0) throw new IllegalArgumentException("File reference '" + reference.value() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain any files"); - } return files[0]; } @@ -82,7 +79,7 @@ public class FileDirectory { if (file.isDirectory()) { return Files.walk(file.toPath(), 100).map(path -> { try { - log.log(Level.FINE, () -> "Calculating hash for '" + path + "'"); + log.log(Level.FINEST, () -> "Calculating hash for '" + path + "'"); return hash(path.toFile(), hasher); } catch (IOException e) { log.log(Level.WARNING, "Failed getting hash from '" + path + "'"); @@ -144,7 +141,7 @@ public class FileDirectory { File destination = new File(tempDestinationDir.toFile(), source.getName()); if (!destinationDir.exists()) { destinationDir.mkdir(); - log.log(Level.FINE, () -> "file reference ' " + reference.value() + "', source: " + source.getAbsolutePath() ); + log.log(Level.FINE, () -> "file reference '" + reference.value() + "', source: " + source.getAbsolutePath() ); if (source.isDirectory()) { log.log(Level.FINE, () -> "Copying source " + source.getAbsolutePath() + " to " + destination.getAbsolutePath()); IOUtils.copyDirectory(source, destination, -1); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 88405b3eef9..770352e6bfc 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -13,14 +13,16 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.defaults.Defaults; -import com.yahoo.vespa.filedistribution.FileReferenceCompressor; import com.yahoo.vespa.filedistribution.EmptyFileReferenceData; import com.yahoo.vespa.filedistribution.FileDistributionConnectionPool; import com.yahoo.vespa.filedistribution.FileDownloader; +import com.yahoo.vespa.filedistribution.FileReferenceCompressor; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import com.yahoo.vespa.filedistribution.LazyTemporaryStorageFileReferenceData; +import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.flags.Flags; import com.yahoo.yolean.Exceptions; import java.io.File; import java.io.IOException; @@ -28,15 +30,21 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type; public class FileServer { @@ -48,10 +56,13 @@ public class FileServer { private final FileDirectory root; private final ExecutorService executor; private final FileDownloader downloader; + private final List<CompressionType> compressionTypes; // compression types to use, in preferred order + // TODO: Move to filedistribution module, so that it can be used by both clients and servers private enum FileApiErrorCodes { OK(0, "OK"), - NOT_FOUND(1, "Filereference not found"); + NOT_FOUND(1, "File reference not found"), + TIMEOUT(2, "Timeout"); private final int code; private final String description; FileApiErrorCodes(int code, String description) { @@ -80,21 +91,24 @@ public class FileServer { @SuppressWarnings("WeakerAccess") // Created by dependency injection @Inject - public FileServer(ConfigserverConfig configserverConfig) { + public FileServer(ConfigserverConfig configserverConfig, FlagSource flagSource) { this(new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())), - createFileDownloader(getOtherConfigServersInCluster(configserverConfig))); + createFileDownloader(getOtherConfigServersInCluster(configserverConfig), + compressionTypes(Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value())), + compressionTypesAsList(Flags.FILE_DISTRIBUTION_COMPRESSION_TYPES_TO_SERVE.bindTo(flagSource).value())); } // For testing only public FileServer(File rootDir) { - this(rootDir, createFileDownloader(List.of())); + this(rootDir, createFileDownloader(List.of(), Set.of(gzip)), List.of(gzip)); } - public FileServer(File rootDir, FileDownloader fileDownloader) { + FileServer(File rootDir, FileDownloader fileDownloader, List<CompressionType> compressionTypes) { this.downloader = fileDownloader; this.root = new FileDirectory(rootDir); this.executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()), new DaemonThreadFactory("file-server-")); + this.compressionTypes = compressionTypes; } boolean hasFile(String fileReference) { @@ -112,21 +126,14 @@ public class FileServer { FileDirectory getRootDir() { return root; } - void startFileServing(String fileName, Receiver target) { - FileReference reference = new FileReference(fileName); - File file = root.getFile(reference); + void startFileServing(FileReference reference, Receiver target, Set<CompressionType> acceptedCompressionTypes) { + if ( ! root.getFile(reference).exists()) return; - if (file.exists()) { - serveFile(reference, target); - } - } - - private void serveFile(FileReference reference, Receiver target) { File file = root.getFile(reference); log.log(Level.FINE, () -> "Start serving " + reference + " with file '" + file.getAbsolutePath() + "'"); FileReferenceData fileData = EmptyFileReferenceData.empty(reference, file.getName()); try { - fileData = readFileReferenceData(reference); + fileData = readFileReferenceData(reference, acceptedCompressionTypes); target.receive(fileData, new ReplayStatus(0, "OK")); log.log(Level.FINE, () -> "Done serving " + reference.value() + " with file '" + file.getAbsolutePath() + "'"); } catch (IOException e) { @@ -140,56 +147,71 @@ public class FileServer { } } - private FileReferenceData readFileReferenceData(FileReference reference) throws IOException { + private FileReferenceData readFileReferenceData(FileReference reference, Set<CompressionType> acceptedCompressionTypes) throws IOException { File file = root.getFile(reference); if (file.isDirectory()) { Path tempFile = Files.createTempFile("filereferencedata", reference.value()); - File compressedFile = new FileReferenceCompressor(compressed).compress(file.getParentFile(), tempFile.toFile()); - return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile); + CompressionType compressionType = chooseCompressionType(acceptedCompressionTypes); + log.log(Level.FINE, () -> "accepted compression types=" + acceptedCompressionTypes + ", compression type to use=" + compressionType); + File compressedFile = new FileReferenceCompressor(compressed, compressionType).compress(file.getParentFile(), tempFile.toFile()); + return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile, compressionType); } else { - return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file); + return new LazyFileReferenceData(reference, file.getName(), Type.file, file, gzip); } } - public void serveFile(String fileReference, boolean downloadFromOtherSourceIfNotFound, Request request, Receiver receiver) { + public void serveFile(FileReference fileReference, + boolean downloadFromOtherSourceIfNotFound, + Set<CompressionType> acceptedCompressionTypes, + Request request, Receiver receiver) { if (executor instanceof ThreadPoolExecutor) log.log(Level.FINE, () -> "Active threads: " + ((ThreadPoolExecutor) executor).getActiveCount()); log.log(Level.FINE, () -> "Received request for file reference '" + fileReference + "' from " + request.target()); Instant deadline = Instant.now().plus(timeout); - executor.execute(() -> serveFileInternal(fileReference, downloadFromOtherSourceIfNotFound, request, receiver, deadline)); + String client = request.target().toString(); + executor.execute(() -> { + var result = serveFileInternal(fileReference, downloadFromOtherSourceIfNotFound, client, receiver, deadline, acceptedCompressionTypes); + request.returnValues() + .add(new Int32Value(result.getCode())) + .add(new StringValue(result.getDescription())); + request.returnRequest(); + }); } - private void serveFileInternal(String fileReference, - boolean downloadFromOtherSourceIfNotFound, - Request request, - Receiver receiver, - Instant deadline) { + private FileApiErrorCodes serveFileInternal(FileReference fileReference, + boolean downloadFromOtherSourceIfNotFound, + String client, + Receiver receiver, + Instant deadline, + Set<CompressionType> acceptedCompressionTypes) { if (Instant.now().isAfter(deadline)) { - log.log(Level.INFO, () -> "Deadline exceeded for request for file reference '" + fileReference + "' from " + request.target() + - " , giving up"); - return; + log.log(Level.INFO, () -> "Deadline exceeded for request for file reference '" + fileReference + "' from " + client); + return FileApiErrorCodes.TIMEOUT; } boolean fileExists; try { - String client = request.target().toString(); - FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(new FileReference(fileReference), - client, - downloadFromOtherSourceIfNotFound); + var fileReferenceDownload = new FileReferenceDownload(fileReference, client, downloadFromOtherSourceIfNotFound); fileExists = hasFileDownloadIfNeeded(fileReferenceDownload); - if (fileExists) startFileServing(fileReference, receiver); + if (fileExists) startFileServing(fileReference, receiver, acceptedCompressionTypes); } catch (IllegalArgumentException e) { fileExists = false; - log.warning("Failed serving file reference '" + fileReference + "', request was from " + request.target() + ", with error " + e.toString()); + log.warning("Failed serving file reference '" + fileReference + "', request from " + client + " failed with: " + e.getMessage()); } - FileApiErrorCodes result = fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND; - request.returnValues() - .add(new Int32Value(result.getCode())) - .add(new StringValue(result.getDescription())); - request.returnRequest(); + return (fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND); + } + + /* Choose the first compression type (list is in preferred order) that matches an accepted compression type, or fail */ + private CompressionType chooseCompressionType(Set<CompressionType> acceptedCompressionTypes) { + for (CompressionType compressionType : compressionTypes) { + if (acceptedCompressionTypes.contains(compressionType)) + return compressionType; + } + throw new RuntimeException("Could not find a compression type that can be used. Accepted compression types: " + + acceptedCompressionTypes + ", compression types server can use: " + compressionTypes); } boolean hasFileDownloadIfNeeded(FileReferenceDownload fileReferenceDownload) { @@ -199,17 +221,16 @@ public class FileServer { if (fileReferenceDownload.downloadFromOtherSourceIfNotFound()) { log.log(Level.FINE, "File not found, downloading from another source"); // Create new FileReferenceDownload with downloadFromOtherSourceIfNotFound set to false - // to avoid config servers requesting a file reference perpetually, e.g. for a file that - // does not exist anymore + // to avoid requesting a file reference perpetually, e.g. for a file that does not exist anymore FileReferenceDownload newDownload = new FileReferenceDownload(fileReference, fileReferenceDownload.client(), false); boolean fileExists = downloader.getFile(newDownload).isPresent(); if ( ! fileExists) - log.log(Level.WARNING, "Failed downloading '" + fileReferenceDownload + "'"); + log.log(Level.INFO, "Failed downloading '" + fileReferenceDownload + "'"); return fileExists; } else { - log.log(Level.FINE, "File not found, will not download from another source, since request came from another config server"); + log.log(Level.FINE, "File not found, will not download from another source"); return false; } } @@ -221,14 +242,27 @@ public class FileServer { executor.shutdown(); } - private static FileDownloader createFileDownloader(List<String> configServers) { + private static FileDownloader createFileDownloader(List<String> configServers, Set<CompressionType> acceptedCompressionTypes) { Supervisor supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true); return new FileDownloader(configServers.isEmpty() ? FileDownloader.emptyConnectionPool() : createConnectionPool(configServers, supervisor), supervisor, - timeout); + timeout, + acceptedCompressionTypes); + } + + private static LinkedHashSet<CompressionType> compressionTypes(List<String> compressionTypes) { + return compressionTypes.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + private static List<CompressionType> compressionTypesAsList(List<String> compressionTypes) { + return compressionTypes.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toList()); } private static ConnectionPool createConnectionPool(List<String> configServers, Supervisor supervisor) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java index f7042b49c3f..0d4baa7dc56 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java @@ -38,4 +38,9 @@ class ProxyResponse extends HttpResponse { } } + @Override + public long maxPendingBytes() { + return 1 << 25; // 32MB + } + } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java index ae4b205c06e..12972e5c465 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java @@ -18,20 +18,23 @@ import com.yahoo.vespa.filedistribution.FileDistributionConnectionPool; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.flags.FlagSource; - +import com.yahoo.vespa.flags.Flags; import java.io.File; import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; /** * Verifies that all active sessions has an application package on local disk. * If not, the package is downloaded with file distribution. This can happen e.g. - * if a configserver is down when the application is deployed. + * if a config server is down when the application is deployed. * * @author gjoranv */ @@ -53,7 +56,10 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { this.applicationRepository = applicationRepository; this.configserverConfig = applicationRepository.configserverConfig(); this.downloadDirectory = new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())); - this.fileDownloader = createFileDownloader(configserverConfig, downloadDirectory, supervisor); + this.fileDownloader = createFileDownloader(configserverConfig, + downloadDirectory, + supervisor, + Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value()); } @Override @@ -94,14 +100,18 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { private static FileDownloader createFileDownloader(ConfigserverConfig configserverConfig, File downloadDirectory, - Supervisor supervisor) { + Supervisor supervisor, + List<String> flagValues) { List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(configserverConfig); ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster); ConnectionPool connectionPool = (otherConfigServersInCluster.isEmpty()) ? FileDownloader.emptyConnectionPool() : new FileDistributionConnectionPool(configSourceSet, supervisor); - return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300)); + Set<CompressionType> acceptedCompressionTypes = flagValues.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toSet()); + return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300), acceptedCompressionTypes); } @Override diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/SessionsMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/SessionsMaintainer.java index d980fb079e7..38ac41d0eb9 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/SessionsMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/SessionsMaintainer.java @@ -16,22 +16,17 @@ import java.util.logging.Level; * @author hmusum */ public class SessionsMaintainer extends ConfigServerMaintainer { - private final boolean hostedVespa; SessionsMaintainer(ApplicationRepository applicationRepository, Curator curator, Duration interval, FlagSource flagSource) { super(applicationRepository, curator, flagSource, applicationRepository.clock().instant(), interval, true); - this.hostedVespa = applicationRepository.configserverConfig().hostedVespa(); } @Override protected double maintain() { applicationRepository.deleteExpiredLocalSessions(); - if (hostedVespa) { - Duration expiryTime = Duration.ofMinutes(90); - int deleted = applicationRepository.deleteExpiredRemoteSessions(expiryTime); - log.log(Level.FINE, () -> "Deleted " + deleted + " expired remote sessions older than " + expiryTime); - } + int deleted = applicationRepository.deleteExpiredRemoteSessions(applicationRepository.clock()); + log.log(Level.FINE, () -> "Deleted " + deleted + " expired remote sessions"); return 1.0; } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java index 7e6fccb6d2f..0b54a09d963 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java @@ -2,16 +2,14 @@ package com.yahoo.vespa.config.server.rpc; import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.config.provision.ApplicationId; import com.yahoo.jrt.Target; import com.yahoo.jrt.TargetWatcher; -import java.util.logging.Level; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.server.GetConfigContext; import com.yahoo.vespa.config.server.monitoring.MetricUpdater; import com.yahoo.vespa.config.server.monitoring.Metrics; -import com.yahoo.config.provision.ApplicationId; import com.yahoo.vespa.config.server.tenant.TenantRepository; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -22,6 +20,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -40,7 +39,7 @@ public class DelayedConfigResponses { private final Map<ApplicationId, MetricUpdater> metrics = new ConcurrentHashMap<>(); - /* Requests that resolve to config that has not changed are put on this queue. When reloading + /* Requests that resolve to config that has not changed are put on this queue. When activating config, all requests on this queue are reprocessed as if they were a new request */ private final Map<ApplicationId, BlockingQueue<DelayedConfigResponse>> delayedResponses = new ConcurrentHashMap<>(); @@ -183,7 +182,7 @@ public class DelayedConfigResponses { response.getRequest().getShortDescription()); } // Config will be resolved in the run() method of DelayedConfigResponse, - // when the timer expires or config is updated/reloaded. + // when the timer expires or config is updated/activated. response.schedule(Math.max(0, request.getTimeout())); metricDelayedResponses(context.applicationId(), delayedResponsesQueue.size()); } catch (InterruptedException e) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java index c31015b533a..b7327ef3aa7 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java @@ -6,13 +6,11 @@ import com.yahoo.collections.Pair; import com.yahoo.component.Version; import com.yahoo.config.provision.TenantName; import com.yahoo.container.di.config.ApplicationBundlesConfig; -import com.yahoo.net.HostName; -import com.yahoo.vespa.config.PayloadChecksum; -import com.yahoo.vespa.config.PayloadChecksum.Type; -import com.yahoo.vespa.config.PayloadChecksums; import com.yahoo.jrt.Request; +import com.yahoo.net.HostName; import com.yahoo.vespa.config.ConfigPayload; import com.yahoo.vespa.config.ErrorCode; +import com.yahoo.vespa.config.PayloadChecksums; import com.yahoo.vespa.config.UnknownConfigIdException; import com.yahoo.vespa.config.protocol.ConfigResponse; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; @@ -22,11 +20,13 @@ import com.yahoo.vespa.config.protocol.VespaVersion; import com.yahoo.vespa.config.server.GetConfigContext; import com.yahoo.vespa.config.server.UnknownConfigDefinitionException; import com.yahoo.vespa.config.server.tenant.TenantRepository; - import java.util.Optional; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; +import static com.yahoo.vespa.config.ErrorCode.APPLICATION_NOT_LOADED; +import static com.yahoo.vespa.config.ErrorCode.UNKNOWN_VESPA_VERSION; import static com.yahoo.vespa.config.protocol.SlimeConfigResponse.fromConfigPayload; /** @@ -56,7 +56,7 @@ class GetConfigProcessor implements Runnable { private void respond(JRTServerConfigRequest request) { Request req = request.getRequest(); if (req.isError()) { - Level logLevel = (req.errorCode() == ErrorCode.APPLICATION_NOT_LOADED) ? Level.FINE : Level.INFO; + Level logLevel = Set.of(APPLICATION_NOT_LOADED, UNKNOWN_VESPA_VERSION).contains(req.errorCode()) ? Level.FINE : Level.INFO; log.log(logLevel, () -> logPre + req.errorMessage()); } rpcServer.respond(request); @@ -83,9 +83,7 @@ class GetConfigProcessor implements Runnable { return null; } Trace trace = request.getRequestTrace(); - if (logDebug(trace)) { - debugLog(trace, "GetConfigProcessor.run() on " + localHostName); - } + debugLog(trace, "GetConfigProcessor.run() on " + localHostName); Optional<TenantName> tenant = rpcServer.resolveTenant(request, trace); @@ -98,23 +96,21 @@ class GetConfigProcessor implements Runnable { GetConfigContext context = rpcServer.createGetConfigContext(tenant, request, trace); if (context == null || ! context.requestHandler().hasApplication(context.applicationId(), Optional.empty())) { - handleError(request, ErrorCode.APPLICATION_NOT_LOADED, "No application exists"); + handleError(request, APPLICATION_NOT_LOADED, "No application exists"); return null; } + logPre = TenantRepository.logPre(context.applicationId()); Optional<Version> vespaVersion = rpcServer.useRequestVersion() ? request.getVespaVersion().map(VespaVersion::toString).map(Version::fromString) : Optional.empty(); - if (logDebug(trace)) { - debugLog(trace, "Using version " + printableVespaVersion(vespaVersion)); - } + debugLog(trace, "Using version " + printableVespaVersion(vespaVersion)); if ( ! context.requestHandler().hasApplication(context.applicationId(), vespaVersion)) { handleError(request, ErrorCode.UNKNOWN_VESPA_VERSION, "Unknown Vespa version in request: " + printableVespaVersion(vespaVersion)); return null; } - this.logPre = TenantRepository.logPre(context.applicationId()); ConfigResponse config; try { config = rpcServer.resolveConfig(request, context, vespaVersion); @@ -143,14 +139,10 @@ class GetConfigProcessor implements Runnable { // debugLog(trace, "config response before encoding:" + config.toString()); request.addOkResponse(request.payloadFromResponse(config), config.getGeneration(), config.applyOnRestart(), config.getPayloadChecksums()); - if (logDebug(trace)) { - debugLog(trace, "return response: " + request.getShortDescription()); - } + debugLog(trace, "return response: " + request.getShortDescription()); respond(request); } else { - if (logDebug(trace)) { - debugLog(trace, "delaying response " + request.getShortDescription()); - } + debugLog(trace, "delaying response " + request.getShortDescription()); return new Pair<>(context, config != null ? config.getGeneration() : 0); } return null; @@ -164,9 +156,9 @@ class GetConfigProcessor implements Runnable { if (delayed != null) { rpcServer.delayResponse(request, delayed.getFirst()); if (rpcServer.hasNewerGeneration(delayed.getFirst().applicationId(), delayed.getSecond())) { - // This will ensure that if the reload train left the station while I was boarding, another train will - // immediately be scheduled. - rpcServer.configReloaded(delayed.getFirst().applicationId()); + // This will ensure that if the config activation train left the station while I was boarding, + // another train will immediately be scheduled. + rpcServer.configActivated(delayed.getFirst().applicationId()); } } } @@ -177,7 +169,7 @@ class GetConfigProcessor implements Runnable { } private static String printableVespaVersion(Optional<Version> vespaVersion) { - return (vespaVersion.isPresent() ? vespaVersion.get().toFullString() : "LATEST"); + return vespaVersion.map(Version::toFullString).orElse("LATEST"); } private void returnEmpty(JRTServerConfigRequest request) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 6a8141032df..6518957a4ab 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.rpc; -import com.yahoo.component.annotation.Inject; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.component.Version; +import com.yahoo.component.annotation.Inject; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.config.FileReference; import com.yahoo.config.provision.ApplicationId; @@ -28,7 +28,7 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; import com.yahoo.vespa.config.protocol.Trace; import com.yahoo.vespa.config.server.GetConfigContext; -import com.yahoo.vespa.config.server.ReloadListener; +import com.yahoo.vespa.config.server.ConfigActivationListener; import com.yahoo.vespa.config.server.RequestHandler; import com.yahoo.vespa.config.server.SuperModelRequestHandler; import com.yahoo.vespa.config.server.application.ApplicationSet; @@ -44,13 +44,14 @@ import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReceiver; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; - import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -62,8 +63,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; + /** * An RPC server class that handles the config protocol RPC method "getConfigV3". * Mandatory hooks need to be implemented by subclasses. @@ -71,7 +75,7 @@ import java.util.stream.Stream; * @author hmusum */ // TODO: Split business logic out of this -public class RpcServer implements Runnable, ReloadListener, TenantListener { +public class RpcServer implements Runnable, ConfigActivationListener, TenantListener { static final String getConfigMethodName = "getConfigV3"; @@ -221,7 +225,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { getSupervisor().addMethod(new Method("printStatistics", "", "s", this::printStatistics) .methodDesc("printStatistics") .returnDesc(0, "statistics", "Statistics for server")); - getSupervisor().addMethod(new Method("filedistribution.serveFile", "si", "is", this::serveFile)); + getSupervisor().addMethod(new Method("filedistribution.serveFile", "si*", "is", this::serveFile)); getSupervisor().addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload) .methodDesc("set which file references to download") .paramDesc(0, "file references", "file reference to download") @@ -252,7 +256,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { /** * Checks all delayed responses for config changes and waits until all has been answered. - * This method should be called when config is reloaded in the server. + * This method should be called when config is activated in the server. */ @Override public void configActivated(ApplicationSet applicationSet) { @@ -260,19 +264,19 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { ApplicationState state = getState(applicationId); state.setActiveGeneration(applicationSet.getApplicationGeneration()); reloadSuperModel(applicationSet); - configReloaded(applicationId); + configActivated(applicationId); } private void reloadSuperModel(ApplicationSet applicationSet) { - superModelRequestHandler.reloadConfig(applicationSet); - configReloaded(ApplicationId.global()); + superModelRequestHandler.activateConfig(applicationSet); + configActivated(ApplicationId.global()); } - void configReloaded(ApplicationId applicationId) { + void configActivated(ApplicationId applicationId) { List<DelayedConfigResponses.DelayedConfigResponse> responses = delayedConfigResponses.drainQueue(applicationId); String logPre = TenantRepository.logPre(applicationId); if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, logPre + "Start of configReload: " + responses.size() + " requests on delayed requests queue"); + log.log(Level.FINE, logPre + "Start of configActivated: " + responses.size() + " requests on delayed requests queue"); } int responsesSent = 0; CompletionService<Boolean> completionService = new ExecutorCompletionService<>(executorService); @@ -303,7 +307,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } if (log.isLoggable(Level.FINE)) - log.log(Level.FINE, logPre + "Finished reloading " + responsesSent + " requests"); + log.log(Level.FINE, logPre + "Finished activating " + responsesSent + " requests"); } private void logRequestDebug(Level level, String message, JRTServerConfigRequest request) { @@ -326,8 +330,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void applicationRemoved(ApplicationId applicationId) { superModelRequestHandler.removeApplication(applicationId); - configReloaded(applicationId); - configReloaded(ApplicationId.global()); + configActivated(applicationId); + configActivated(ApplicationId.global()); } public void respond(JRTServerConfigRequest request) { @@ -487,6 +491,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { sendParts(session, fileData); sendEof(session, fileData, status); } + private void sendParts(int session, FileReferenceData fileData) { ByteBuffer bb = ByteBuffer.allocate(0x100000); for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) { @@ -500,12 +505,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { bb.clear(); } } + private int sendMeta(FileReferenceData fileData) { - Request request = new Request(FileReceiver.RECEIVE_META_METHOD); - request.parameters().add(new StringValue(fileData.fileReference().value())); - request.parameters().add(new StringValue(fileData.filename())); - request.parameters().add(new StringValue(fileData.type().name())); - request.parameters().add(new Int64Value(fileData.size())); + Request request = createMetaRequest(fileData); invokeRpcIfValidConnection(request); if (request.isError()) { log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + @@ -518,6 +520,20 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return request.returnValues().get(1).asInt32(); } } + + // non-private for testing + static Request createMetaRequest(FileReferenceData fileData) { + Request request = new Request(FileReceiver.RECEIVE_META_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new StringValue(fileData.filename())); + request.parameters().add(new StringValue(fileData.type().name())); + request.parameters().add(new Int64Value(fileData.size())); + // Only add paramter if not gzip, this is default and old clients will not handle the extra parameter + if (fileData.compressionType() != CompressionType.gzip) + request.parameters().add(new StringValue(fileData.compressionType().name())); + return request; + } + private void sendPart(int session, FileReference ref, int partId, byte [] buf) { Request request = new Request(FileReceiver.RECEIVE_PART_METHOD); request.parameters().add(new StringValue(ref.value())); @@ -534,6 +550,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } } } + private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) { Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD); request.parameters().add(new StringValue(fileData.fileReference().value())); @@ -566,7 +583,18 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { rpcAuthorizer.authorizeFileRequest(request) .thenRun(() -> { // okay to do in authorizer thread as serveFile is async FileServer.Receiver receiver = new ChunkedFileReceiver(request.target()); - fileServer.serveFile(request.parameters().get(0).asString(), request.parameters().get(1).asInt32() == 0, request, receiver); + + FileReference reference = new FileReference(request.parameters().get(0).asString()); + boolean downloadFromOtherSourceIfNotFound = request.parameters().get(1).asInt32() == 0; + Set<FileReferenceData.CompressionType> acceptedCompressionTypes = Set.of(CompressionType.gzip); + // Newer clients specify accepted compression types in request + if (request.parameters().size() > 2) + acceptedCompressionTypes = Arrays.stream(request.parameters().get(2).asStringArray()) + .map(CompressionType::valueOf) + .collect(Collectors.toSet()); + log.log(Level.FINE, "acceptedCompressionTypes=" + acceptedCompressionTypes); + + fileServer.serveFile(reference, downloadFromOtherSourceIfNotFound, acceptedCompressionTypes, request, receiver); }); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/Session.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/Session.java index edc166d0989..82faeae01e8 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/Session.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/Session.java @@ -61,10 +61,6 @@ public abstract class Session implements Comparable<Session> { return sessionZooKeeperClient.readStatus(); } - public SessionZooKeeperClient getSessionZooKeeperClient() { - return sessionZooKeeperClient; - } - @Override public String toString() { return "Session,id=" + sessionId + ",status=" + getStatus(); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 4aba071e2f1..2ee2cfe837a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -44,13 +44,11 @@ import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.vespa.flags.FlagSource; import com.yahoo.vespa.flags.Flags; -import com.yahoo.vespa.flags.StringFlag; import com.yahoo.yolean.Exceptions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.zookeeper.KeeperException; - import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -83,6 +81,7 @@ import java.util.logging.Logger; import java.util.stream.Collectors; import static com.yahoo.vespa.curator.Curator.CompletionWaiter; +import static com.yahoo.vespa.flags.FetchVector.Dimension.APPLICATION_ID; import static java.nio.file.Files.readAttributes; /** @@ -128,7 +127,6 @@ public class SessionRepository { private final ModelFactoryRegistry modelFactoryRegistry; private final ConfigDefinitionRepo configDefinitionRepo; private final int maxNodeSize; - private final StringFlag failDeploymentForFilesWithUnknownExtension; public SessionRepository(TenantName tenantName, TenantApplications applicationRepo, @@ -172,7 +170,6 @@ public class SessionRepository { this.modelFactoryRegistry = modelFactoryRegistry; this.configDefinitionRepo = configDefinitionRepo; this.maxNodeSize = maxNodeSize; - this.failDeploymentForFilesWithUnknownExtension = Flags.APPLICATION_FILES_WITH_UNKNOWN_EXTENSION.bindTo(flagSource); loadSessions(); // Needs to be done before creating cache below this.directoryCache = curator.createDirectoryCache(sessionsPath.getAbsolute(), false, false, zkCacheExecutor); @@ -347,39 +344,44 @@ public class SessionRepository { public RemoteSession createRemoteSession(long sessionId) { SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient); - RemoteSession newSession = loadSessionIfActive(session).orElse(session); - remoteSessionCache.put(sessionId, newSession); - updateSessionStateWatcher(sessionId, newSession); - return newSession; + loadSessionIfActive(session); + remoteSessionCache.put(sessionId, session); + updateSessionStateWatcher(sessionId); + return session; } - public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { + public int deleteExpiredRemoteSessions(Clock clock) { + Duration expiryTime = configserverConfig.hostedVespa() + ? sessionLifetime.multipliedBy(2) + : sessionLifetime.multipliedBy(24); // TODO: Remove when tested more (Sep. 2022 at the latest) + List<Long> remoteSessionsFromZooKeeper = getRemoteSessionsFromZooKeeper(); log.log(Level.FINE, () -> "Remote sessions for tenant " + tenantName + ": " + remoteSessionsFromZooKeeper); int deleted = 0; + // Avoid deleting too many in one run + int deleteMax = (int) Math.min(1000, Math.max(10, remoteSessionsFromZooKeeper.size() * 0.01)); for (long sessionId : remoteSessionsFromZooKeeper) { Session session = remoteSessionCache.get(sessionId); - if (session == null) { - log.log(Level.FINE, () -> "Remote session " + sessionId + " is null, creating a new one"); + if (session == null) session = new RemoteSession(tenantName, sessionId, createSessionZooKeeperClient(sessionId)); - } if (session.getStatus() == Session.Status.ACTIVATE) continue; if (sessionHasExpired(session.getCreateTime(), expiryTime, clock)) { log.log(Level.FINE, () -> "Remote session " + sessionId + " for " + tenantName + " has expired, deleting it"); deleteRemoteSessionFromZooKeeper(session); deleted++; } - // Avoid deleting too many in one run - if (deleted >= 2) + if (deleted >= deleteMax) break; } return deleted; } - public void deactivateAndUpdateCache(RemoteSession remoteSession) { - RemoteSession session = remoteSession.deactivated(); - remoteSessionCache.put(session.getSessionId(), session); + public void deactivateSession(long sessionId) { + var s = remoteSessionCache.get(sessionId); + if (s == null) return; + + remoteSessionCache.put(sessionId, s.deactivated()); } public void deleteRemoteSessionFromZooKeeper(Session session) { @@ -390,7 +392,7 @@ public class SessionRepository { } private boolean sessionHasExpired(Instant created, Duration expiryTime, Clock clock) { - return (created.plus(expiryTime).isBefore(clock.instant())); + return created.plus(expiryTime).isBefore(clock.instant()); } private List<Long> getSessionListFromDirectoryCache(List<ChildData> children) { @@ -441,8 +443,11 @@ public class SessionRepository { return session.getStatus() == Session.Status.DELETE; } - void activate(RemoteSession session) { - long sessionId = session.getSessionId(); + void activate(long sessionId) { + createLocalSessionFromDistributedApplicationPackage(sessionId); + RemoteSession session = remoteSessionCache.get(sessionId); + if (session == null) return; + CompletionWaiter waiter = createSessionZooKeeperClient(sessionId).getActiveWaiter(); log.log(Level.FINE, () -> session.logPre() + "Activating " + sessionId); applicationRepo.activateApplication(ensureApplicationLoaded(session), sessionId); @@ -451,21 +456,25 @@ public class SessionRepository { log.log(Level.INFO, session.logPre() + "Session activated: " + sessionId); } - private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) { + private void loadSessionIfActive(RemoteSession session) { for (ApplicationId applicationId : applicationRepo.activeApplications()) { Optional<Long> activeSession = applicationRepo.activeSessionOf(applicationId); if (activeSession.isPresent() && activeSession.get() == session.getSessionId()) { log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it"); applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId()); log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")"); - return Optional.ofNullable(remoteSessionCache.get(session.getSessionId())); + return; } } - return Optional.empty(); } - void prepareRemoteSession(RemoteSession session) { - SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId()); + void prepareRemoteSession(long sessionId) { + // Might need to create local session first + createLocalSessionFromDistributedApplicationPackage(sessionId); + RemoteSession session = remoteSessionCache.get(sessionId); + if (session == null) return; + + SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(sessionId); CompletionWaiter waiter = sessionZooKeeperClient.getPrepareWaiter(); ensureApplicationLoaded(session); notifyCompletion(waiter); @@ -482,13 +491,13 @@ public class SessionRepository { RemoteSession activated = session.activated(applicationSet); long sessionId = activated.getSessionId(); remoteSessionCache.put(sessionId, activated); - updateSessionStateWatcher(sessionId, activated); + updateSessionStateWatcher(sessionId); return applicationSet; } void confirmUpload(Session session) { - CompletionWaiter waiter = session.getSessionZooKeeperClient().getUploadWaiter(); + CompletionWaiter waiter = createSessionZooKeeperClient(session.getSessionId()).getUploadWaiter(); long sessionId = session.getSessionId(); log.log(Level.FINE, () -> "Notifying upload waiter for session " + sessionId); notifyCompletion(waiter); @@ -679,7 +688,10 @@ public class SessionRepository { try { app.validateFileExtensions(); } catch (IllegalArgumentException e) { - switch (failDeploymentForFilesWithUnknownExtension.value()) { + String flag = Flags.APPLICATION_FILES_WITH_UNKNOWN_EXTENSION.bindTo(flagSource) + .with(APPLICATION_ID, applicationId.serializedForm()) + .value(); + switch (flag) { case "FAIL": throw e; case "LOG": @@ -812,8 +824,9 @@ public class SessionRepository { } /** - * Returns a new local session for the given session id if it does not already exist. - * Will also add the session to the local session cache if necessary + * Create a new local session for the given session id if it does not already exist. + * Will also add the session to the local session cache if necessary. If there is no + * remote session matching the session it will also be created. */ public void createLocalSessionFromDistributedApplicationPackage(long sessionId) { if (applicationRepo.sessionExistsInFileSystem(sessionId)) { @@ -860,7 +873,7 @@ public class SessionRepository { return getSessionPath(sessionId).append(ZKApplication.SESSIONSTATE_ZK_SUBPATH); } - private SessionZooKeeperClient createSessionZooKeeperClient(long sessionId) { + public SessionZooKeeperClient createSessionZooKeeperClient(long sessionId) { return new SessionZooKeeperClient(curator, tenantName, sessionId, @@ -881,15 +894,12 @@ public class SessionRepository { return new TenantFileSystemDirs(configServerDB, tenantName).getUserApplicationDir(sessionId); } - private void updateSessionStateWatcher(long sessionId, RemoteSession remoteSession) { - SessionStateWatcher sessionStateWatcher = sessionStateWatchers.get(sessionId); - if (sessionStateWatcher == null) { - Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(sessionId).getAbsolute(), false); + private void updateSessionStateWatcher(long sessionId) { + sessionStateWatchers.computeIfAbsent(sessionId, (id) -> { + Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(id).getAbsolute(), false); fileCache.addListener(this::nodeChanged); - sessionStateWatchers.put(sessionId, new SessionStateWatcher(fileCache, remoteSession, metricUpdater, zkWatcherExecutor, this)); - } else { - sessionStateWatcher.updateRemoteSession(remoteSession); - } + return new SessionStateWatcher(fileCache, id, metricUpdater, zkWatcherExecutor, this); + }); } @Override diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java index 6c6f60426fe..082743e48f9 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java @@ -13,7 +13,7 @@ import java.util.logging.Logger; import static com.yahoo.vespa.config.server.session.Session.Status; /** - * Watches one particular session (/config/v2/tenants/<tenantName>/sessions/<n>/sessionState in ZooKeeper) + * Watches session state for a session (/config/v2/tenants/<tenantName>/sessions/<n>/sessionState in ZooKeeper) * The session must be in the session repo. * * @author Vegard Havdal @@ -24,18 +24,18 @@ public class SessionStateWatcher { private static final Logger log = Logger.getLogger(SessionStateWatcher.class.getName()); private final Curator.FileCache fileCache; - private volatile RemoteSession session; + private final long sessionId; private final MetricUpdater metrics; private final Executor zkWatcherExecutor; private final SessionRepository sessionRepository; SessionStateWatcher(Curator.FileCache fileCache, - RemoteSession session, + long sessionId, MetricUpdater metrics, Executor zkWatcherExecutor, SessionRepository sessionRepository) { this.fileCache = fileCache; - this.session = session; + this.sessionId = sessionId; this.metrics = metrics; this.fileCache.addListener(this::nodeChanged); this.fileCache.start(); @@ -44,39 +44,25 @@ public class SessionStateWatcher { } private synchronized void sessionStatusChanged(Status newStatus) { - long sessionId = session.getSessionId(); - switch (newStatus) { case NEW: case UNKNOWN: break; case DELETE: - sessionRepository.deactivateAndUpdateCache(session); + case DEACTIVATE: + sessionRepository.deactivateSession(sessionId); break; case PREPARE: - createLocalSession(sessionId); - sessionRepository.prepareRemoteSession(session); + sessionRepository.prepareRemoteSession(sessionId); break; case ACTIVATE: - createLocalSession(sessionId); - sessionRepository.activate(session); - break; - case DEACTIVATE: - sessionRepository.deactivateAndUpdateCache(session); + sessionRepository.activate(sessionId); break; default: throw new IllegalStateException("Unknown status " + newStatus); } } - private void createLocalSession(long sessionId) { - sessionRepository.createLocalSessionFromDistributedApplicationPackage(sessionId); - } - - public long getSessionId() { - return session.getSessionId(); - } - public void close() { try { fileCache.close(); @@ -92,27 +78,13 @@ public class SessionStateWatcher { ChildData node = fileCache.getCurrentData(); if (node != null) { newStatus = Status.parse(Utf8.toString(node.getData())); - - String debugMessage = log.isLoggable(Level.FINE) ? - session.logPre() + "Session " + session.getSessionId() - + " changed status to " + newStatus.name() : - null; - if (debugMessage != null) log.fine(debugMessage); - sessionStatusChanged(newStatus); - - if (debugMessage != null) log.fine(debugMessage + ": Done"); } } catch (Exception e) { - log.log(Level.WARNING, session.logPre() + "Error handling session change to " + - newStatus.name() + " for session " + getSessionId(), e); + log.log(Level.WARNING, "Error handling session change to " + newStatus.name() + " for session " + sessionId, e); metrics.incSessionChangeErrors(); } }); } - public synchronized void updateRemoteSession(RemoteSession session) { - this.session = session; - } - } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index 1325f065ebd..9cb90d7f50c 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -19,7 +19,7 @@ import com.yahoo.path.Path; import com.yahoo.text.Utf8; import com.yahoo.transaction.Transaction; import com.yahoo.vespa.config.server.ConfigServerDB; -import com.yahoo.vespa.config.server.ReloadListener; +import com.yahoo.vespa.config.server.ConfigActivationListener; import com.yahoo.vespa.config.server.application.PermanentApplicationPackage; import com.yahoo.vespa.config.server.application.TenantApplications; import com.yahoo.vespa.config.server.deploy.TenantFileSystemDirs; @@ -115,7 +115,7 @@ public class TenantRepository { private final Clock clock; private final ModelFactoryRegistry modelFactoryRegistry; private final ConfigDefinitionRepo configDefinitionRepo; - private final ReloadListener reloadListener; + private final ConfigActivationListener configActivationListener; private final ScheduledExecutorService checkForRemovedApplicationsService = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("check for removed applications")); private final Curator.DirectoryCache directoryCache; @@ -136,7 +136,7 @@ public class TenantRepository { Zone zone, ModelFactoryRegistry modelFactoryRegistry, ConfigDefinitionRepo configDefinitionRepo, - ReloadListener reloadListener, + ConfigActivationListener configActivationListener, TenantListener tenantListener, ZookeeperServerConfig zookeeperServerConfig) { this(hostRegistry, @@ -155,7 +155,7 @@ public class TenantRepository { Clock.systemUTC(), modelFactoryRegistry, configDefinitionRepo, - reloadListener, + configActivationListener, tenantListener, zookeeperServerConfig); } @@ -176,7 +176,7 @@ public class TenantRepository { Clock clock, ModelFactoryRegistry modelFactoryRegistry, ConfigDefinitionRepo configDefinitionRepo, - ReloadListener reloadListener, + ConfigActivationListener configActivationListener, TenantListener tenantListener, ZookeeperServerConfig zookeeperServerConfig) { this.hostRegistry = hostRegistry; @@ -196,7 +196,7 @@ public class TenantRepository { this.clock = clock; this.modelFactoryRegistry = modelFactoryRegistry; this.configDefinitionRepo = configDefinitionRepo; - this.reloadListener = reloadListener; + this.configActivationListener = configActivationListener; this.tenantListener = tenantListener; this.zookeeperServerConfig = zookeeperServerConfig; // This we should control with a feature flag. @@ -338,7 +338,7 @@ public class TenantRepository { zkApplicationWatcherExecutor, zkCacheExecutor, metrics, - reloadListener, + configActivationListener, configserverConfig, hostRegistry, new TenantFileSystemDirs(configServerDB, tenantName), diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplication.java b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplication.java index 107bd8be367..4c262379c35 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplication.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplication.java @@ -84,7 +84,7 @@ public class ZKApplication { } /** - * Retrieves a node relative to the node of the live application + * Retrieves a node relative to the node of the active application * * @param path a path relative to the currently active application * @return a Reader that can be used to get the data @@ -129,7 +129,7 @@ public class ZKApplication { } /** - * Checks if the given node exists under path under this live app + * Checks if the given node exists under path under this active app * * @param path a zookeeper path * @return true if the node exists in the path, false otherwise diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java index 0fa6e9a0704..9b23037ed19 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java @@ -5,7 +5,6 @@ import com.google.common.base.Joiner; import com.yahoo.component.Version; import com.yahoo.config.application.api.ApplicationFile; import com.yahoo.config.application.api.ApplicationMetaData; -import com.yahoo.config.application.api.ApplicationPackage; import com.yahoo.config.application.api.ComponentInfo; import com.yahoo.config.application.api.FileRegistry; import com.yahoo.config.application.api.UnparsedConfigDefinition; @@ -59,7 +58,7 @@ public class ZKApplicationPackage extends AbstractApplicationPackage { public ZKApplicationPackage(AddFileInterface fileManager, Curator curator, Path sessionPath, int maxNodeSize) { verifyAppPath(curator, sessionPath); zkApplication = new ZKApplication(curator, sessionPath, maxNodeSize); - metaData = readMetaDataFromLiveApp(zkApplication); + metaData = readMetaDataFromActiveApp(zkApplication); importFileRegistries(fileManager); allocatedHosts = importAllocatedHosts(); } @@ -102,13 +101,13 @@ public class ZKApplicationPackage extends AbstractApplicationPackage { } } - private ApplicationMetaData readMetaDataFromLiveApp(ZKApplication liveApp) { + private ApplicationMetaData readMetaDataFromActiveApp(ZKApplication activeApp) { Path metaPath = Path.fromString(ZKApplication.META_ZK_PATH); - String metaDataString = liveApp.getData(metaPath); + String metaDataString = activeApp.getData(metaPath); if (metaDataString == null || metaDataString.isEmpty()) { return null; } - return ApplicationMetaData.fromJsonString(liveApp.getData(metaPath)); + return ApplicationMetaData.fromJsonString(activeApp.getData(metaPath)); } @Override diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java index 0e9c14d10df..d1d8c165124 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java @@ -445,7 +445,7 @@ public class ApplicationRepositoryTest { assertEquals(3, localSession.getSessionId()); // All sessions except 3 should be removed after the call to deleteExpiredRemoteSessions - assertEquals(2, tester.applicationRepository().deleteExpiredRemoteSessions(clock, Duration.ofSeconds(0))); + assertEquals(2, tester.applicationRepository().deleteExpiredRemoteSessions(clock)); ArrayList<Long> remoteSessions = new ArrayList<>(sessionRepository.getRemoteSessionsFromZooKeeper()); Session remoteSession = sessionRepository.getRemoteSession(remoteSessions.get(0)); assertEquals(3, remoteSession.getSessionId()); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java index 3f329894cef..c3766ad9b83 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java @@ -58,14 +58,14 @@ public class SuperModelRequestHandlerTest { assertNotNull(controller.getHandler()); long gen = counter.get(); - controller.reloadConfig(createApp(foo, 3L)); + controller.activateConfig(createApp(foo, 3L)); assertNotNull(controller.getHandler()); assertEquals(gen + 1, controller.getHandler().getGeneration()); - controller.reloadConfig(createApp(foo, 4L)); + controller.activateConfig(createApp(foo, 4L)); assertEquals(gen + 2, controller.getHandler().getGeneration()); // Test that a new app is used when there already exist an application with the same id assertEquals(4, controller.getHandler().getSuperModel().applicationModels().get(foo).getGeneration()); - controller.reloadConfig(createApp(bar, 2L)); + controller.activateConfig(createApp(bar, 2L)); assertEquals(gen + 3, controller.getHandler().getGeneration()); } @@ -76,9 +76,9 @@ public class SuperModelRequestHandlerTest { ApplicationId baz = applicationId("b", "baz"); long gen = counter.get(); - controller.reloadConfig(createApp(foo, 3L)); - controller.reloadConfig(createApp(bar, 30L)); - controller.reloadConfig(createApp(baz, 9L)); + controller.activateConfig(createApp(foo, 3L)); + controller.activateConfig(createApp(bar, 30L)); + controller.activateConfig(createApp(baz, 9L)); assertEquals(gen + 3, controller.getHandler().getGeneration()); assertEquals(3, controller.getHandler().getSuperModel().applicationModels().size()); assertTrue(controller.getHandler().getSuperModel().applicationModels().keySet().containsAll(List.of(foo, bar, baz))); @@ -101,7 +101,7 @@ public class SuperModelRequestHandlerTest { controller = new SuperModelRequestHandler(new TestConfigDefinitionRepo(), configserverConfig, manager); long gen = counter.get(); - controller.reloadConfig(createApp(foo, 3L)); + controller.activateConfig(createApp(foo, 3L)); assertEquals(masterGen + gen + 1, controller.getHandler().getGeneration()); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/TenantApplicationsTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/TenantApplicationsTest.java index abb8f9a9df3..472c47c22eb 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/TenantApplicationsTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/TenantApplicationsTest.java @@ -3,7 +3,6 @@ package com.yahoo.vespa.config.server.application; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.component.Version; -import com.yahoo.component.Vtag; import com.yahoo.concurrent.InThreadExecutorService; import com.yahoo.concurrent.StripedExecutor; import com.yahoo.config.model.NullConfigModelRegistry; @@ -14,7 +13,7 @@ import com.yahoo.config.provision.TenantName; import com.yahoo.text.Utf8; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.server.ConfigServerDB; -import com.yahoo.vespa.config.server.ReloadListener; +import com.yahoo.vespa.config.server.ConfigActivationListener; import com.yahoo.vespa.config.server.ServerCache; import com.yahoo.vespa.config.server.deploy.TenantFileSystemDirs; import com.yahoo.vespa.config.server.host.HostRegistry; @@ -51,8 +50,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.IntStream; import static com.yahoo.vespa.config.server.application.TenantApplications.RemoveApplicationWaiter; @@ -195,14 +192,14 @@ public class TenantApplicationsTest { assertTrue("Node is compatible after upgrading", applications.compatibleWith(Optional.of(nodeVersion1), app1)); } - public static class MockReloadListener implements ReloadListener { - public final AtomicInteger reloaded = new AtomicInteger(0); + public static class MockConfigActivationListener implements ConfigActivationListener { + public final AtomicInteger activated = new AtomicInteger(0); final AtomicInteger removed = new AtomicInteger(0); final Map<String, Collection<String>> tenantHosts = new LinkedHashMap<>(); @Override public void configActivated(ApplicationSet application) { - reloaded.incrementAndGet(); + activated.incrementAndGet(); } @Override @@ -222,7 +219,7 @@ public class TenantApplicationsTest { @Test public void testListConfigs() throws IOException, SAXException { - TenantApplications applications = createTenantApplications(TenantName.defaultName(), new MockCurator(), configserverConfig, new MockReloadListener(), new InMemoryFlagSource()); + TenantApplications applications = createTenantApplications(TenantName.defaultName(), new MockCurator(), configserverConfig, new MockConfigActivationListener(), new InMemoryFlagSource()); assertFalse(applications.hasApplication(ApplicationId.defaultId(), Optional.of(vespaVersion))); VespaModel model = new VespaModel(FilesApplicationPackage.fromFile(new File("src/test/apps/app"))); @@ -250,7 +247,7 @@ public class TenantApplicationsTest { @Test public void testAppendIdsInNonRecursiveListing() { - TenantApplications applications = createTenantApplications(tenantName, curator, configserverConfig, new MockReloadListener(), new InMemoryFlagSource()); + TenantApplications applications = createTenantApplications(tenantName, curator, configserverConfig, new MockConfigActivationListener(), new InMemoryFlagSource()); assertEquals(applications.appendOneLevelOfId("search/music", "search/music/qrservers/default/qr.0"), "search/music/qrservers"); assertEquals(applications.appendOneLevelOfId("search", "search/music/qrservers/default/qr.0"), "search/music"); assertEquals(applications.appendOneLevelOfId("search/music/qrservers/default/qr.0", "search/music/qrservers/default/qr.0"), "search/music/qrservers/default/qr.0"); @@ -297,7 +294,7 @@ public class TenantApplicationsTest { } private TenantApplications createZKAppRepo(InMemoryFlagSource flagSource) { - return createTenantApplications(tenantName, curator, configserverConfig, new MockReloadListener(), flagSource); + return createTenantApplications(tenantName, curator, configserverConfig, new MockConfigActivationListener(), flagSource); } private static ApplicationId createApplicationId() { @@ -328,15 +325,15 @@ public class TenantApplicationsTest { // For testing only private TenantApplications createTenantApplications(TenantName tenantName, - Curator curator, - ConfigserverConfig configserverConfig, - ReloadListener reloadListener, InMemoryFlagSource flagSource) { + Curator curator, + ConfigserverConfig configserverConfig, + ConfigActivationListener configActivationListener, InMemoryFlagSource flagSource) { return new TenantApplications(tenantName, curator, new StripedExecutor<>(new InThreadExecutorService()), new InThreadExecutorService(), Metrics.createTestMetrics(), - reloadListener, + configActivationListener, configserverConfig, new HostRegistry(), new TenantFileSystemDirs(new ConfigServerDB(configserverConfig), tenantName), diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 67c40f94b6a..39219471bb1 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -8,21 +8,25 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; import com.yahoo.net.HostName; import com.yahoo.vespa.filedistribution.FileDownloader; +import com.yahoo.vespa.filedistribution.FileReferenceCompressor; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; +import com.yahoo.vespa.flags.InMemoryFlagSource; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -37,7 +41,7 @@ public class FileServerTest { @Before public void setup() throws IOException { File rootDir = new File(temporaryFolder.newFolder("fileserver-root").getAbsolutePath()); - fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir)); + fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir), List.of(gzip, lz4)); } @Test @@ -75,11 +79,30 @@ public class FileServerTest { File dir = getFileServerRootDir(); IOUtils.writeFile(dir + "/12y/f1", "dummy-data", true); CompletableFuture<byte []> content = new CompletableFuture<>(); - fileServer.startFileServing("12y", new FileReceiver(content)); + fileServer.startFileServing(new FileReference("12y"), new FileReceiver(content), Set.of(gzip)); assertEquals(new String(content.get()), "dummy-data"); } @Test + public void requireThatWeCanReplayDirWithLz4() throws IOException, InterruptedException, ExecutionException { + File rootDir = new File(temporaryFolder.newFolder("fileserver-root-3").getAbsolutePath()); + fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir), List.of(lz4, gzip)); // prefer lz4 + File dir = getFileServerRootDir(); + IOUtils.writeFile(dir + "/subdir/12z/f1", "dummy-data-2", true); + CompletableFuture<byte []> content = new CompletableFuture<>(); + fileServer.startFileServing(new FileReference("subdir"), new FileReceiver(content), Set.of(gzip, lz4)); + + // Decompress with lz4 and check contents + var compressor = new FileReferenceCompressor(FileReferenceData.Type.compressed, lz4); + File downloadedFileCompressed = new File(dir + "/downloaded-file-compressed"); + IOUtils.writeFile(downloadedFileCompressed, content.get()); + File downloadedFileUncompressed = new File(dir + "/downloaded-file-uncompressed"); + compressor.decompress(downloadedFileCompressed, downloadedFileUncompressed); + assertTrue(downloadedFileUncompressed.isDirectory()); + assertEquals("dummy-data-2", IOUtils.readFile(new File(downloadedFileUncompressed, "12z/f1"))); + } + + @Test public void requireThatDifferentNumberOfConfigServersWork() throws IOException { // Empty connection pool in tests etc. ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder() @@ -117,7 +140,7 @@ public class FileServerTest { private FileServer createFileServer(ConfigserverConfig.Builder configBuilder) throws IOException { File fileReferencesDir = temporaryFolder.newFolder(); configBuilder.fileReferencesDir(fileReferencesDir.getAbsolutePath()); - return new FileServer(new ConfigserverConfig(configBuilder)); + return new FileServer(new ConfigserverConfig(configBuilder), new InMemoryFlagSource()); } private static class FileReceiver implements FileServer.Receiver { @@ -142,7 +165,8 @@ public class FileServerTest { new Supervisor(new Transport("mock")).setDropEmptyBuffers(true), downloadDirectory, Duration.ofMillis(100), - Duration.ofMillis(100)); + Duration.ofMillis(100), + Set.of(gzip)); } } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcServerTest.java index 7ad237e45ed..8607fc0e2dc 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcServerTest.java @@ -5,6 +5,7 @@ import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.cloud.config.LbServicesConfig; import com.yahoo.cloud.config.SentinelConfig; import com.yahoo.component.Version; +import com.yahoo.config.FileReference; import com.yahoo.config.SimpletypesConfig; import com.yahoo.config.model.test.MockApplicationPackage; import com.yahoo.config.provision.ApplicationId; @@ -27,16 +28,20 @@ import com.yahoo.vespa.config.server.application.Application; import com.yahoo.vespa.config.server.application.ApplicationSet; import com.yahoo.vespa.config.server.monitoring.MetricUpdater; import com.yahoo.vespa.config.server.session.PrepareParams; +import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import com.yahoo.vespa.model.VespaModel; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.xml.sax.SAXException; - import java.io.File; import java.io.IOException; import java.util.Optional; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; +import static com.yahoo.vespa.config.server.rpc.RpcServer.ChunkedFileReceiver.createMetaRequest; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -97,6 +102,25 @@ public class RpcServerTest { } } + @Test + public void testFileReceiverMetaRequest() throws IOException { + File file = temporaryFolder.newFile(); + Request request = createMetaRequest(new LazyFileReferenceData(new FileReference("foo"), "fileA", compressed, file, gzip)); + assertEquals(4, request.parameters().size()); + assertEquals("foo", request.parameters().get(0).asString()); + assertEquals("fileA", request.parameters().get(1).asString()); + assertEquals("compressed", request.parameters().get(2).asString()); + assertEquals(0, request.parameters().get(3).asInt64()); + + request = createMetaRequest(new LazyFileReferenceData(new FileReference("foo"), "fileA", compressed, file, lz4)); + assertEquals(5, request.parameters().size()); + assertEquals("foo", request.parameters().get(0).asString()); + assertEquals("fileA", request.parameters().get(1).asString()); + assertEquals("compressed", request.parameters().get(2).asString()); + assertEquals(0, request.parameters().get(3).asInt64()); + assertEquals("lz4", request.parameters().get(4).asString()); + } + private JRTClientConfigRequest createSimpleRequest() { ConfigKey<?> key = new ConfigKey<>(SimpletypesConfig.class, ""); JRTClientConfigRequest clientReq = createRequest(new RawConfig(key, SimpletypesConfig.getDefMd5())); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java index 86f8dc03af3..1960ba43747 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java @@ -6,7 +6,6 @@ import com.yahoo.cloud.config.ZookeeperServerConfig; import com.yahoo.component.Version; import com.yahoo.concurrent.InThreadExecutorService; import com.yahoo.concurrent.StripedExecutor; -import com.yahoo.config.model.NullConfigModelRegistry; import com.yahoo.config.model.test.MockApplicationPackage; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ApplicationName; @@ -59,7 +58,7 @@ public class TenantRepositoryTest { private static final TenantName tenant3 = TenantName.from("tenant3"); private TenantRepository tenantRepository; - private TenantApplicationsTest.MockReloadListener listener; + private TenantApplicationsTest.MockConfigActivationListener listener; private MockTenantListener tenantListener; private Curator curator; private ConfigserverConfig configserverConfig; @@ -74,7 +73,7 @@ public class TenantRepositoryTest { @Before public void setupSessions() throws IOException { curator = new MockCurator(); - listener = new TenantApplicationsTest.MockReloadListener(); + listener = new TenantApplicationsTest.MockConfigActivationListener(); tenantListener = new MockTenantListener(); assertFalse(tenantListener.tenantsLoaded); configserverConfig = new ConfigserverConfig.Builder() @@ -83,7 +82,7 @@ public class TenantRepositoryTest { .build(); tenantRepository = new TestTenantRepository.Builder().withConfigserverConfig(configserverConfig) .withCurator(curator) - .withReloadListener(listener) + .withConfigActivationListener(listener) .withTenantListener(tenantListener) .build(); assertTrue(tenantListener.tenantsLoaded); @@ -115,7 +114,7 @@ public class TenantRepositoryTest { MetricUpdater.createTestUpdater(), id)), 4); - assertEquals(1, listener.reloaded.get()); + assertEquals(1, listener.activated.get()); } @Test @@ -224,7 +223,7 @@ public class TenantRepositoryTest { Clock.systemUTC(), new ModelFactoryRegistry(List.of(VespaModelFactory.createTestFactory())), new TestConfigDefinitionRepo(), - new TenantApplicationsTest.MockReloadListener(), + new TenantApplicationsTest.MockConfigActivationListener(), new MockTenantListener(), new ZookeeperServerConfig.Builder().myid(0).build()); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java index 42bb62b06c2..b8c46c2bdc4 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java @@ -5,12 +5,11 @@ import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.cloud.config.ZookeeperServerConfig; import com.yahoo.concurrent.InThreadExecutorService; import com.yahoo.concurrent.StripedExecutor; -import com.yahoo.config.model.NullConfigModelRegistry; import com.yahoo.config.model.api.ConfigDefinitionRepo; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.config.server.ConfigServerDB; import com.yahoo.vespa.config.server.MockSecretStore; -import com.yahoo.vespa.config.server.ReloadListener; +import com.yahoo.vespa.config.server.ConfigActivationListener; import com.yahoo.vespa.config.server.TestConfigDefinitionRepo; import com.yahoo.vespa.config.server.application.TenantApplicationsTest; import com.yahoo.vespa.config.server.filedistribution.FileDistributionFactory; @@ -44,7 +43,7 @@ public class TestTenantRepository extends TenantRepository { Clock clock, ModelFactoryRegistry modelFactoryRegistry, ConfigDefinitionRepo configDefinitionRepo, - ReloadListener reloadListener, + ConfigActivationListener configActivationListener, TenantListener tenantListener) { super(hostRegistry, curator, @@ -62,7 +61,7 @@ public class TestTenantRepository extends TenantRepository { clock, modelFactoryRegistry, configDefinitionRepo, - reloadListener, + configActivationListener, tenantListener, new ZookeeperServerConfig.Builder().myid(0).build()); } @@ -78,7 +77,7 @@ public class TestTenantRepository extends TenantRepository { HostProvisionerProvider hostProvisionerProvider = HostProvisionerProvider.empty(); ModelFactoryRegistry modelFactoryRegistry = new ModelFactoryRegistry(List.of(VespaModelFactory.createTestFactory())); ConfigserverConfig configserverConfig = new ConfigserverConfig.Builder().build(); - ReloadListener reloadListener = new TenantApplicationsTest.MockReloadListener(); + ConfigActivationListener configActivationListener = new TenantApplicationsTest.MockConfigActivationListener(); TenantListener tenantListener = new MockTenantListener(); Zone zone = Zone.defaultZone(); @@ -127,8 +126,8 @@ public class TestTenantRepository extends TenantRepository { return this; } - public Builder withReloadListener(ReloadListener reloadListener) { - this.reloadListener = reloadListener; + public Builder withConfigActivationListener(ConfigActivationListener configActivationListener) { + this.configActivationListener = configActivationListener; return this; } @@ -156,7 +155,7 @@ public class TestTenantRepository extends TenantRepository { clock, modelFactoryRegistry, configDefinitionRepo, - reloadListener, + configActivationListener, tenantListener); } |