diff options
Diffstat (limited to 'configserver/src/main/java/com')
7 files changed, 100 insertions, 30 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 47d1193cd4c..068323f7784 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -197,7 +197,6 @@ public class ModelContextImpl implements ModelContext { private final boolean useV8GeoPositions; private final int maxCompactBuffers; private final List<String> ignoredHttpUserAgents; - private final boolean enableServerOcspStapling; private final String mergeThrottlingPolicy; private final double persistenceThrottlingWsDecrementFactor; private final double persistenceThrottlingWsBackoff; @@ -215,6 +214,12 @@ public class ModelContextImpl implements ModelContext { private final boolean mbus_dispatch_on_encode; private final int mbus_threads; private final int mbus_network_threads; + private int mbus_java_num_targets; + private int mbus_java_events_before_wakeup; + private int mbus_cpp_num_targets; + private int mbus_cpp_events_before_wakeup; + private int rpc_num_targets; + private int rpc_events_before_wakeup; public FeatureFlags(FlagSource source, ApplicationId appId, Version version) { this.defaultTermwiseLimit = flagValue(source, appId, version, Flags.DEFAULT_TERM_WISE_LIMIT); @@ -252,7 +257,6 @@ public class ModelContextImpl implements ModelContext { this.useV8GeoPositions = flagValue(source, appId, version, Flags.USE_V8_GEO_POSITIONS); this.maxCompactBuffers = flagValue(source, appId, version, Flags.MAX_COMPACT_BUFFERS); this.ignoredHttpUserAgents = flagValue(source, appId, version, PermanentFlags.IGNORED_HTTP_USER_AGENTS); - this.enableServerOcspStapling = flagValue(source, appId, version, Flags.ENABLE_SERVER_OCSP_STAPLING); this.mergeThrottlingPolicy = flagValue(source, appId, version, Flags.MERGE_THROTTLING_POLICY); this.persistenceThrottlingWsDecrementFactor = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_DECREMENT_FACTOR); this.persistenceThrottlingWsBackoff = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_BACKOFF); @@ -266,6 +270,12 @@ public class ModelContextImpl implements ModelContext { this.enableProxyProtocolMixedMode = flagValue(source, appId, version, Flags.ENABLE_PROXY_PROTOCOL_MIXED_MODE); this.sharedStringRepoNoReclaim = flagValue(source, appId, version, Flags.SHARED_STRING_REPO_NO_RECLAIM); this.logFileCompressionAlgorithm = flagValue(source, appId, version, Flags.LOG_FILE_COMPRESSION_ALGORITHM); + this.mbus_java_num_targets = flagValue(source, appId, version, Flags.MBUS_JAVA_NUM_TARGETS); + this.mbus_java_events_before_wakeup = flagValue(source, appId, version, Flags.MBUS_JAVA_EVENTS_BEFORE_WAKEUP); + this.mbus_cpp_num_targets = flagValue(source, appId, version, Flags.MBUS_CPP_NUM_TARGETS); + this.mbus_cpp_events_before_wakeup = flagValue(source, appId, version, Flags.MBUS_CPP_EVENTS_BEFORE_WAKEUP); + this.rpc_num_targets = flagValue(source, appId, version, Flags.RPC_NUM_TARGETS); + this.rpc_events_before_wakeup = flagValue(source, appId, version, Flags.RPC_EVENTS_BEFORE_WAKEUP); } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @@ -305,7 +315,6 @@ public class ModelContextImpl implements ModelContext { @Override public boolean useV8GeoPositions() { return useV8GeoPositions; } @Override public int maxCompactBuffers() { return maxCompactBuffers; } @Override public List<String> ignoredHttpUserAgents() { return ignoredHttpUserAgents; } - @Override public boolean enableServerOcspStapling() { return enableServerOcspStapling; } @Override public String mergeThrottlingPolicy() { return mergeThrottlingPolicy; } @Override public double persistenceThrottlingWsDecrementFactor() { return persistenceThrottlingWsDecrementFactor; } @Override public double persistenceThrottlingWsBackoff() { return persistenceThrottlingWsBackoff; } @@ -318,6 +327,12 @@ public class ModelContextImpl implements ModelContext { @Override public Architecture adminClusterArchitecture() { return adminClusterArchitecture; } @Override public boolean enableProxyProtocolMixedMode() { return enableProxyProtocolMixedMode; } @Override public boolean sharedStringRepoNoReclaim() { return sharedStringRepoNoReclaim; } + @Override public int mbusJavaRpcNumTargets() { return mbus_java_num_targets; } + @Override public int mbusJavaEventsBeforeWakeup() { return mbus_java_events_before_wakeup; } + @Override public int mbusCppRpcNumTargets() { return mbus_cpp_num_targets; } + @Override public int mbusCppEventsBeforeWakeup() { return mbus_cpp_events_before_wakeup; } + @Override public int rpcNumTargets() { return rpc_num_targets; } + @Override public int rpcEventsBeforeWakeup() { return rpc_events_before_wakeup; } @Override public String logFileCompressionAlgorithm(String defVal) { var fflag = this.logFileCompressionAlgorithm; if (fflag != null && ! fflag.equals("")) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java index d3b7e53157d..7f120a88a05 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java @@ -141,7 +141,7 @@ public class FileDirectory { File destination = new File(tempDestinationDir.toFile(), source.getName()); if (!destinationDir.exists()) { destinationDir.mkdir(); - log.log(Level.FINE, () -> "file reference ' " + reference.value() + "', source: " + source.getAbsolutePath() ); + log.log(Level.FINE, () -> "file reference '" + reference.value() + "', source: " + source.getAbsolutePath() ); if (source.isDirectory()) { log.log(Level.FINE, () -> "Copying source " + source.getAbsolutePath() + " to " + destination.getAbsolutePath()); IOUtils.copyDirectory(source, destination, -1); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index f0c34e83713..770352e6bfc 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -21,6 +21,8 @@ import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import com.yahoo.vespa.filedistribution.LazyTemporaryStorageFileReferenceData; +import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.flags.Flags; import com.yahoo.yolean.Exceptions; import java.io.File; import java.io.IOException; @@ -28,6 +30,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -35,12 +38,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; -import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type; public class FileServer { @@ -52,6 +56,7 @@ public class FileServer { private final FileDirectory root; private final ExecutorService executor; private final FileDownloader downloader; + private final List<CompressionType> compressionTypes; // compression types to use, in preferred order // TODO: Move to filedistribution module, so that it can be used by both clients and servers private enum FileApiErrorCodes { @@ -86,21 +91,24 @@ public class FileServer { @SuppressWarnings("WeakerAccess") // Created by dependency injection @Inject - public FileServer(ConfigserverConfig configserverConfig) { + public FileServer(ConfigserverConfig configserverConfig, FlagSource flagSource) { this(new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())), - createFileDownloader(getOtherConfigServersInCluster(configserverConfig))); + createFileDownloader(getOtherConfigServersInCluster(configserverConfig), + compressionTypes(Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value())), + compressionTypesAsList(Flags.FILE_DISTRIBUTION_COMPRESSION_TYPES_TO_SERVE.bindTo(flagSource).value())); } // For testing only public FileServer(File rootDir) { - this(rootDir, createFileDownloader(List.of())); + this(rootDir, createFileDownloader(List.of(), Set.of(gzip)), List.of(gzip)); } - public FileServer(File rootDir, FileDownloader fileDownloader) { + FileServer(File rootDir, FileDownloader fileDownloader, List<CompressionType> compressionTypes) { this.downloader = fileDownloader; this.root = new FileDirectory(rootDir); this.executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()), new DaemonThreadFactory("file-server-")); + this.compressionTypes = compressionTypes; } boolean hasFile(String fileReference) { @@ -145,10 +153,11 @@ public class FileServer { if (file.isDirectory()) { Path tempFile = Files.createTempFile("filereferencedata", reference.value()); CompressionType compressionType = chooseCompressionType(acceptedCompressionTypes); + log.log(Level.FINE, () -> "accepted compression types=" + acceptedCompressionTypes + ", compression type to use=" + compressionType); File compressedFile = new FileReferenceCompressor(compressed, compressionType).compress(file.getParentFile(), tempFile.toFile()); - return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile); + return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile, compressionType); } else { - return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file); + return new LazyFileReferenceData(reference, file.getName(), Type.file, file, gzip); } } @@ -195,9 +204,14 @@ public class FileServer { return (fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND); } - // TODO: Use lz4 for testing only, add zstd when we have support for (de)compressing zstd input and output streams + /* Choose the first compression type (list is in preferred order) that matches an accepted compression type, or fail */ private CompressionType chooseCompressionType(Set<CompressionType> acceptedCompressionTypes) { - return acceptedCompressionTypes.contains(lz4) ? lz4 : gzip; + for (CompressionType compressionType : compressionTypes) { + if (acceptedCompressionTypes.contains(compressionType)) + return compressionType; + } + throw new RuntimeException("Could not find a compression type that can be used. Accepted compression types: " + + acceptedCompressionTypes + ", compression types server can use: " + compressionTypes); } boolean hasFileDownloadIfNeeded(FileReferenceDownload fileReferenceDownload) { @@ -228,14 +242,27 @@ public class FileServer { executor.shutdown(); } - private static FileDownloader createFileDownloader(List<String> configServers) { + private static FileDownloader createFileDownloader(List<String> configServers, Set<CompressionType> acceptedCompressionTypes) { Supervisor supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true); return new FileDownloader(configServers.isEmpty() ? FileDownloader.emptyConnectionPool() : createConnectionPool(configServers, supervisor), supervisor, - timeout); + timeout, + acceptedCompressionTypes); + } + + private static LinkedHashSet<CompressionType> compressionTypes(List<String> compressionTypes) { + return compressionTypes.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + private static List<CompressionType> compressionTypesAsList(List<String> compressionTypes) { + return compressionTypes.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toList()); } private static ConnectionPool createConnectionPool(List<String> configServers, Supervisor supervisor) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java index f7042b49c3f..0d4baa7dc56 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java @@ -38,4 +38,9 @@ class ProxyResponse extends HttpResponse { } } + @Override + public long maxPendingBytes() { + return 1 << 25; // 32MB + } + } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java index ae4b205c06e..12972e5c465 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java @@ -18,20 +18,23 @@ import com.yahoo.vespa.filedistribution.FileDistributionConnectionPool; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.flags.FlagSource; - +import com.yahoo.vespa.flags.Flags; import java.io.File; import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; /** * Verifies that all active sessions has an application package on local disk. * If not, the package is downloaded with file distribution. This can happen e.g. - * if a configserver is down when the application is deployed. + * if a config server is down when the application is deployed. * * @author gjoranv */ @@ -53,7 +56,10 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { this.applicationRepository = applicationRepository; this.configserverConfig = applicationRepository.configserverConfig(); this.downloadDirectory = new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())); - this.fileDownloader = createFileDownloader(configserverConfig, downloadDirectory, supervisor); + this.fileDownloader = createFileDownloader(configserverConfig, + downloadDirectory, + supervisor, + Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value()); } @Override @@ -94,14 +100,18 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { private static FileDownloader createFileDownloader(ConfigserverConfig configserverConfig, File downloadDirectory, - Supervisor supervisor) { + Supervisor supervisor, + List<String> flagValues) { List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(configserverConfig); ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster); ConnectionPool connectionPool = (otherConfigServersInCluster.isEmpty()) ? FileDownloader.emptyConnectionPool() : new FileDistributionConnectionPool(configSourceSet, supervisor); - return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300)); + Set<CompressionType> acceptedCompressionTypes = flagValues.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toSet()); + return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300), acceptedCompressionTypes); } @Override diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 687cb1d3cca..7993e58d06e 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -491,6 +491,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { sendParts(session, fileData); sendEof(session, fileData, status); } + private void sendParts(int session, FileReferenceData fileData) { ByteBuffer bb = ByteBuffer.allocate(0x100000); for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) { @@ -504,12 +505,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { bb.clear(); } } + private int sendMeta(FileReferenceData fileData) { - Request request = new Request(FileReceiver.RECEIVE_META_METHOD); - request.parameters().add(new StringValue(fileData.fileReference().value())); - request.parameters().add(new StringValue(fileData.filename())); - request.parameters().add(new StringValue(fileData.type().name())); - request.parameters().add(new Int64Value(fileData.size())); + Request request = createMetaRequest(fileData); invokeRpcIfValidConnection(request); if (request.isError()) { log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + @@ -522,6 +520,20 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return request.returnValues().get(1).asInt32(); } } + + // non-private for testing + static Request createMetaRequest(FileReferenceData fileData) { + Request request = new Request(FileReceiver.RECEIVE_META_METHOD); + request.parameters().add(new StringValue(fileData.fileReference().value())); + request.parameters().add(new StringValue(fileData.filename())); + request.parameters().add(new StringValue(fileData.type().name())); + request.parameters().add(new Int64Value(fileData.size())); + // Only add paramter if not gzip, this is default and old clients will not handle the extra parameter + if (fileData.compressionType() != CompressionType.gzip) + request.parameters().add(new StringValue(fileData.compressionType().name())); + return request; + } + private void sendPart(int session, FileReference ref, int partId, byte [] buf) { Request request = new Request(FileReceiver.RECEIVE_PART_METHOD); request.parameters().add(new StringValue(ref.value())); @@ -538,6 +550,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } } } + private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) { Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD); request.parameters().add(new StringValue(fileData.fileReference().value())); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index a144940e443..d803488cb0a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -44,13 +44,11 @@ import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.vespa.flags.FlagSource; import com.yahoo.vespa.flags.Flags; -import com.yahoo.vespa.flags.StringFlag; import com.yahoo.yolean.Exceptions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.zookeeper.KeeperException; - import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -83,6 +81,7 @@ import java.util.logging.Logger; import java.util.stream.Collectors; import static com.yahoo.vespa.curator.Curator.CompletionWaiter; +import static com.yahoo.vespa.flags.FetchVector.Dimension.APPLICATION_ID; import static java.nio.file.Files.readAttributes; /** @@ -128,7 +127,6 @@ public class SessionRepository { private final ModelFactoryRegistry modelFactoryRegistry; private final ConfigDefinitionRepo configDefinitionRepo; private final int maxNodeSize; - private final StringFlag failDeploymentForFilesWithUnknownExtension; public SessionRepository(TenantName tenantName, TenantApplications applicationRepo, @@ -172,7 +170,6 @@ public class SessionRepository { this.modelFactoryRegistry = modelFactoryRegistry; this.configDefinitionRepo = configDefinitionRepo; this.maxNodeSize = maxNodeSize; - this.failDeploymentForFilesWithUnknownExtension = Flags.APPLICATION_FILES_WITH_UNKNOWN_EXTENSION.bindTo(flagSource); loadSessions(); // Needs to be done before creating cache below this.directoryCache = curator.createDirectoryCache(sessionsPath.getAbsolute(), false, false, zkCacheExecutor); @@ -684,7 +681,10 @@ public class SessionRepository { try { app.validateFileExtensions(); } catch (IllegalArgumentException e) { - switch (failDeploymentForFilesWithUnknownExtension.value()) { + String flag = Flags.APPLICATION_FILES_WITH_UNKNOWN_EXTENSION.bindTo(flagSource) + .with(APPLICATION_ID, applicationId.serializedForm()) + .value(); + switch (flag) { case "FAIL": throw e; case "LOG": |